[Spark Core] saveAsTextFile is unable to rename a directory using hadoop-azure NativeAzureFileSystem

2021-09-13 Thread Abhishek Jindal
Hello,

I am trying to use the Spark rdd.saveAsTextFile function which calls the
FileSystem.rename() under the hood. This errors out with
“com.microsoft.azure.storage.StorageException: One of the request inputs is
not valid” when using hadoop-azure NativeAzureFileSystem. I have written a
small test program to rename a directory in Azure Blob Storage in Scala
that replicates this issue. Here is my code -

import java.net.URI

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.util.control.NonFatal

/**
  * A utility to test renaming a hadoop-azure path.
  */
object AzureRenameTester {

  def main(args: Array[String]): Unit = {

if (args.isEmpty) {
  throw new IllegalArgumentException("The Azure Blob storage key
must be provided!")
}

val key = args.head
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.azure",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem")
hadoopConfig.set("fs.AbstractFileSystem.wasbs.Impl",
"org.apache.hadoop.fs.azure.Wasbs")
hadoopConfig.set("fs.azure.account.key..blob.core.windows.net",
key)

val input = new
URI("wasbs://@.blob.core.windows.net/testing")
val inputPath = new Path(input)
val output = new
URI("wasbs://@.blob.core.windows.net/testingRenamed")
val outputPath = new Path(output)
val hadoopFs = FileSystem.get(input, hadoopConfig)

try {
  println(s"Renaming from $inputPath to $outputPath")
  hadoopFs.rename(inputPath, outputPath)
} catch {
  case NonFatal(ex) =>
println(s"${ExceptionUtils.getMessage(ex)}")
println(s"${ExceptionUtils.getRootCause(ex)}")
throw ex
}
  }
}

This code leads to the following error -

[error] Exception in thread "main"
org.apache.hadoop.fs.azure.AzureException:
com.microsoft.azure.storage.StorageException: One of the request
inputs is not valid.
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2849)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2721)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:460)
[error] at 
org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:3277)
[error] at 
com.qf.util.hdfs.AzureRenameTester$.main(AzureRenameTester.scala:40)
[error] at 
com.qf.util.hdfs.AzureRenameTester.main(AzureRenameTester.scala)
[error] Caused by: com.microsoft.azure.storage.StorageException: One
of the request inputs is not valid.
[error] at 
com.microsoft.azure.storage.StorageException.translateException(StorageException.java:87)
[error] at 
com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:315)
[error] at 
com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:185)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:735)
[error] at 
com.microsoft.azure.storage.blob.CloudBlob.startCopy(CloudBlob.java:691)
[error] at 
org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:434)
[error] at 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2788)
[error] ... 5 more

I am currently using spark-core-3.1.1.jar with hadoop-azure-3.2.2.jar but
this same issue also occurs in hadoop-azure-3.3.1.jar as well. Please
advise how I should solve this issue.

Thanks,
Abhishek


Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread ranju goel
Hi Attila,



I will check why INVALID is getting appended in mailing address.



What is your use case here?

Client Driver Application not using collect but  internally calling python
script which is reading part files records [comma separated string] of each
cluster separately and copying records in other final csv file, so merging
all part files data in single csv file. This script runs on every node and
later they all combine to single file.



*On the other hand is your data really just a collection of strings without
any repetitions*

[Ranju]:

Yes It is comma separated string.

And I just checked the 2nd argument of saveAsTextFile and I believe read
and write will be faster on disk after use of compression. I will try this.



So I think there is no special requirement on type of disk for execution of
saveAsTextFile as they are local I/O operations.



Regards

Ranju





Hi!

I would like to reflect only to the first part of your mail:

I have a large RDD dataset of around 60-70 GB which I cannot send to driver
using *collect* so first writing that to disk using  *saveAsTextFile* and
then this data gets saved in the form of multiple part files on each node
of the cluster and after that driver reads the data from that storage.


What is your use case here?

As you mention *collect()* I can assume you have to process the data
outside of Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it
back within the same application then you still cannot call *collect()* on
it as it is still 60-70GB data, right?

On the other hand is your data really just a collection of strings without
any repetitions? I ask this because of the fileformat you are using: text
file. Even for text file at least you can pass a compression codec as the
2nd argument of *saveAsTextFile()*
<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
(when
you use this link you might need to scroll up a little bit.. at least my
chrome displays the the *saveAsTextFile* method without the 2nd arg codec).
As IO is slow a compressed data could be read back quicker: as there will
be less data in the disk. Check the Snappy
<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this
data further within Spark then please consider something way better: a columnar
storage format namely ORC or Parquet.

Best Regards,

Attila





*From:* Ranju Jain 
*Sent:* Sunday, March 21, 2021 8:10 AM
*To:* user@spark.apache.org
*Subject:* Spark saveAsTextFile Disk Recommendation



Hi All,



I have a large RDD dataset of around 60-70 GB which I cannot send to driver
using *collect* so first writing that to disk using  *saveAsTextFile* and
then this data gets saved in the form of multiple part files on each node
of the cluster and after that driver reads the data from that storage.



I have a question like *spark.local.dir* is the directory which is used as
a scratch space where mapoutputs files and RDDs might need to write by
spark for shuffle operations etc.

And there it is strongly recommended to use *local and fast disk *to avoid
any failure or performance impact.



*Do we have any such recommendation for storing multiple part files of
large dataset [ or Big RDD ] in fast disk?*

This will help me to configure the write type of disk for resulting part
files.



Regards

Ranju


RE: Spark saveAsTextFile Disk Recommendation

2021-03-21 Thread Ranju Jain
Hi Attila,

What is your use case here?
Client Driver Application not using collect but  internally calling python 
script which is reading part files records [comma separated string] of each 
cluster separately and copying records in other final csv file, so merging all 
part files data in single csv file. This script runs on every node and later 
they all combine to single file.

On the other hand is your data really just a collection of strings without any 
repetitions
[Ranju]:
Yes It is comma separated string.
And I just checked the 2nd argument of saveAsTextFile and I believe read and 
write will be faster on disk after use of compression. I will try this.

So I think there is no special requirement on type of disk for execution of 
saveAsTextFile as they are local I/O operations.

Regards
Ranju


Hi!

I would like to reflect only to the first part of your mail:


I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

What is your use case here?

As you mention collect() I can assume you have to process the data outside of 
Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it back 
within the same application then you still cannot call collect() on it as it is 
still 60-70GB data, right?

On the other hand is your data really just a collection of strings without any 
repetitions? I ask this because of the fileformat you are using: text file. 
Even for text file at least you can pass a compression codec as the 2nd 
argument of 
saveAsTextFile()<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
 (when you use this link you might need to scroll up a little bit.. at least my 
chrome displays the the saveAsTextFile method without the 2nd arg codec). As IO 
is slow a compressed data could be read back quicker: as there will be less 
data in the disk. Check the 
Snappy<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this data 
further within Spark then please consider something way better: a columnar 
storage format namely ORC or Parquet.

Best Regards,
Attila


From: Ranju Jain 
Sent: Sunday, March 21, 2021 8:10 AM
To: user@spark.apache.org
Subject: Spark saveAsTextFile Disk Recommendation

Hi All,

I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

I have a question like spark.local.dir is the directory which is used as a 
scratch space where mapoutputs files and RDDs might need to write by spark for 
shuffle operations etc.
And there it is strongly recommended to use local and fast disk to avoid any 
failure or performance impact.

Do we have any such recommendation for storing multiple part files of large 
dataset [ or Big RDD ] in fast disk?
This will help me to configure the write type of disk for resulting part files.

Regards
Ranju


Re: Spark saveAsTextFile Disk Recommendation

2021-03-20 Thread Attila Zsolt Piros
Hi!

I would like to reflect only to the first part of your mail:

I have a large RDD dataset of around 60-70 GB which I cannot send to driver
> using *collect* so first writing that to disk using  *saveAsTextFile* and
> then this data gets saved in the form of multiple part files on each node
> of the cluster and after that driver reads the data from that storage.


What is your use case here?

As you mention *collect()* I can assume you have to process the data
outside of Spark maybe with a 3rd party tool, isn't it?

If you have 60-70 GB of data and you write it to text file then read it
back within the same application then you still cannot call *collect()* on
it as it is still 60-70GB data, right?

On the other hand is your data really just a collection of strings without
any repetitions? I ask this because of the fileformat you are using: text
file. Even for text file at least you can pass a compression codec as the
2nd argument of *saveAsTextFile()*
<https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/rdd/RDD.html#saveAsTextFile(path:String,codec:Class[_%3C:org.apache.hadoop.io.compress.CompressionCodec]):Unit>
(when
you use this link you might need to scroll up a little bit.. at least my
chrome displays the the *saveAsTextFile* method without the 2nd arg codec).
As IO is slow a compressed data could be read back quicker: as there will
be less data in the disk. Check the Snappy
<https://en.wikipedia.org/wiki/Snappy_(compression)> codec for example.

But if there is a structure of your data and you have plan to process this
data further within Spark then please consider something way better: a columnar
storage format namely ORC or Parquet.

Best Regards,
Attila


On Sun, Mar 21, 2021 at 3:40 AM Ranju Jain 
wrote:

> Hi All,
>
>
>
> I have a large RDD dataset of around 60-70 GB which I cannot send to
> driver using *collect* so first writing that to disk using
> *saveAsTextFile* and then this data gets saved in the form of multiple
> part files on each node of the cluster and after that driver reads the data
> from that storage.
>
>
>
> I have a question like *spark.local.dir* is the directory which is used
> as a scratch space where mapoutputs files and RDDs might need to write by
> spark for shuffle operations etc.
>
> And there it is strongly recommended to use *local and fast disk *to
> avoid any failure or performance impact.
>
>
>
> *Do we have any such recommendation for storing multiple part files of
> large dataset [ or Big RDD ] in fast disk?*
>
> This will help me to configure the write type of disk for resulting part
> files.
>
>
>
> Regards
>
> Ranju
>


Spark saveAsTextFile Disk Recommendation

2021-03-20 Thread Ranju Jain
Hi All,

I have a large RDD dataset of around 60-70 GB which I cannot send to driver 
using collect so first writing that to disk using  saveAsTextFile and then this 
data gets saved in the form of multiple part files on each node of the cluster 
and after that driver reads the data from that storage.

I have a question like spark.local.dir is the directory which is used as a 
scratch space where mapoutputs files and RDDs might need to write by spark for 
shuffle operations etc.
And there it is strongly recommended to use local and fast disk to avoid any 
failure or performance impact.

Do we have any such recommendation for storing multiple part files of large 
dataset [ or Big RDD ] in fast disk?
This will help me to configure the write type of disk for resulting part files.

Regards
Ranju


Re: How to improve performance of saveAsTextFile()

2017-03-11 Thread Yan Facai
How about increasing RDD's partitions / rebalancing data?

On Sat, Mar 11, 2017 at 2:33 PM, Parsian, Mahmoud 
wrote:

> How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
> This is taking over 30 minutes on a cluster of 10 nodes.
> Running Spark on YARN.
>
> JavaRDD has 120 million entries.
>
> Thank you,
> Best regards,
> Mahmoud
>


How to improve performance of saveAsTextFile()

2017-03-10 Thread Parsian, Mahmoud
How to improve performance of JavaRDD.saveAsTextFile(“hdfs://…“).
This is taking over 30 minutes on a cluster of 10 nodes.
Running Spark on YARN.

JavaRDD has 120 million entries.

Thank you,
Best regards,
Mahmoud


Re: Writing/Saving RDD to HDFS using saveAsTextFile

2016-10-07 Thread Deepak Sharma
Hi Mahendra
Did you tried mapping the X case class members further to a String object
and then saving the RDD[String] ?

Thanks
Deepak

On Oct 7, 2016 23:04, "Mahendra Kutare"  wrote:

> Hi,
>
> I am facing issue with writing RDD[X]  to HDFS file path. X is a simple
> case class with variable time as primitive long.
>
> When I run the driver program with - master as
>
> spark://:7077
>
> I get this  -
>
> Caused by: java.io.EOFException
> at java.io.ObjectInputStream$BlockDataInputStream.
> readFully(ObjectInputStream.java:2744)
> at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
> at org.apache.hadoop.io.Text.readString(Text.java:473)
> at org.apache.hadoop.io.Text.readString(Text.java:464)
> at org.apache.hadoop.io.WritableUtils.readEnum(WritableUtils.java:415)
> at org.apache.hadoop.mapreduce.TaskID.readFields(TaskID.java:223)
> at org.apache.hadoop.mapreduce.TaskAttemptID.readFields(
> TaskAttemptID.java:139)
> at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
> at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
> at org.apache.spark.SerializableWritable$$anonfun$
> readObject$1.apply$mcV$sp(SerializableWritable.scala:45)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
> at org.apache.spark.SerializableWritable.readObject(
> SerializableWritable.scala:41)
>
> Full stack trace - https://gist.github.com/imaxxs/
> 650efd1cb367783897e3d37c1103512b
>
> The same code works fine with I run the driver program in local mode with
> master as - local[4]
>
> Can anyone please provide some pointers to debug this issue? I have tried
> many options - writing to the local file, writing to the HDFS file,
> writing in different formats e.t.c.
>
> Thanks for the help in advance,
>
> Mahendra
> about.me/mahendrakutare
>
> 
> ~~
> Only those who will risk going too far can possibly find out how far one
> can go.
>


Writing/Saving RDD to HDFS using saveAsTextFile

2016-10-07 Thread Mahendra Kutare
Hi,

I am facing issue with writing RDD[X]  to HDFS file path. X is a simple
case class with variable time as primitive long.

When I run the driver program with - master as

spark://:7077

I get this  -

Caused by: java.io.EOFException
at
java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
at org.apache.hadoop.io.Text.readString(Text.java:473)
at org.apache.hadoop.io.Text.readString(Text.java:464)
at org.apache.hadoop.io.WritableUtils.readEnum(WritableUtils.java:415)
at org.apache.hadoop.mapreduce.TaskID.readFields(TaskID.java:223)
at
org.apache.hadoop.mapreduce.TaskAttemptID.readFields(TaskAttemptID.java:139)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:45)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205)
at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:41)

Full stack trace -
https://gist.github.com/imaxxs/650efd1cb367783897e3d37c1103512b

The same code works fine with I run the driver program in local mode with
master as - local[4]

Can anyone please provide some pointers to debug this issue? I have tried
many options - writing to the local file, writing to the HDFS file, writing
in different formats e.t.c.

Thanks for the help in advance,

Mahendra
about.me/mahendrakutare

~~
Only those who will risk going too far can possibly find out how far one
can go.


Re: saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed

2016-07-28 Thread Ascot Moss
Hi,

Thanks for your reply.

permissions (access) is not an issue in my case, it is because this issue
only happened when the bigger input file was used to generate the model,
i.e. with smaller input(s) all worked well.   It seems to me that ".save"
cannot save big file.

Q1: Any idea about the size  limit that ".save" can handle?
Q2: Any idea about how to check the size model that will be saved vis
".save" ?

Regards



On Thu, Jul 28, 2016 at 4:19 PM, Spico Florin  wrote:

> Hi!
>   There are many reasons that your task is failed. One could be that you
> don't have proper permissions (access) to  hdfs with your user. Please
> check your user rights to write in hdfs. Please have a look also :
>
> http://stackoverflow.com/questions/27427042/spark-unable-to-save-in-hadoop-permission-denied-for-user
> I hope it jelps.
>  Florin
>
>
> On Thu, Jul 28, 2016 at 3:49 AM, Ascot Moss  wrote:
>
>>
>> Hi,
>>
>> Please help!
>>
>> When saving the model, I got following error and cannot save the model to
>> hdfs:
>>
>> (my source code, my spark is v1.6.2)
>> my_model.save(sc, "/my_model")
>>
>> -
>> 16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose
>> tasks have all completed, from pool
>>
>> 16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at
>> treeEnsembleModels.scala:447) finished in 0.901 s
>>
>> 16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at
>> treeEnsembleModels.scala:447, took 2.513396 s
>>
>> Killed
>> -
>>
>>
>> Q1: Is there any limitation on saveAsTextFile?
>> Q2: or where to find the error log file location?
>>
>> Regards
>>
>>
>>
>>
>>
>


saveAsTextFile at treeEnsembleModels.scala:447, took 2.513396 s Killed

2016-07-27 Thread Ascot Moss
Hi,

Please help!

When saving the model, I got following error and cannot save the model to
hdfs:

(my source code, my spark is v1.6.2)
my_model.save(sc, "/my_model")

-
16/07/28 08:36:19 INFO TaskSchedulerImpl: Removed TaskSet 69.0, whose tasks
have all completed, from pool

16/07/28 08:36:19 INFO DAGScheduler: ResultStage 69 (saveAsTextFile at
treeEnsembleModels.scala:447) finished in 0.901 s

16/07/28 08:36:19 INFO DAGScheduler: Job 38 finished: saveAsTextFile at
treeEnsembleModels.scala:447, took 2.513396 s

Killed
-


Q1: Is there any limitation on saveAsTextFile?
Q2: or where to find the error log file location?

Regards


Re: problem about RDD map and then saveAsTextFile

2016-05-27 Thread Christian Hellström
Internally, saveAsTextFile uses saveAsHadoopFile:
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
.

The final bit in the method first creates the output path and then saves
the data set. However, if there is an issue with the saveAsHadoopDataset
call, the path still remains. Technically, we could add an
exception-handling section that removes the path in case of problems. I
think that would be a nice way of making sure that we don’t litter the FS
with empty files and directories in case of exceptions.

So, to your question: parameter to saveAsTextFile is a path (not a file)
and it has to be empty. Spark automatically names the files PART-N with N
the partition number. This follows immediately from the partitioning scheme
of the RDD itself.

The real problem is that there is a problem with the calculation. You might
want to fix that first. Just post the relevant bits from the log.
Hi all:
 I’ve tried to execute something as below:

 result.map(transform).saveAsTextFile(hdfsAddress)

 Result is a RDD caluculated from mlilib algorithm.


I submit this to yarn, and after two attempts , the application failed.

But the exception in log is very missleading. It said  hdfsAddress already
exits.

Actually, the first attempt log showed that the exception is from the
calculation of

result. Though the attempt failed it created the file. And then attempt 2
began with

exception ‘file already exists’.


 Why was RDD calculation before already failed but also the file created?
That’s not so good I think.


problem about RDD map and then saveAsTextFile

2016-05-27 Thread Reminia Scarlet
Hi all:
 I’ve tried to execute something as below:

 result.map(transform).saveAsTextFile(hdfsAddress)

 Result is a RDD caluculated from mlilib algorithm.


I submit this to yarn, and after two attempts , the application failed.

But the exception in log is very missleading. It said  hdfsAddress already
exits.

Actually, the first attempt log showed that the exception is from the
calculation of

result. Though the attempt failed it created the file. And then attempt 2
began with

exception ‘file already exists’.


 Why was RDD calculation before already failed but also the file created?
That’s not so good I think.


RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
If the data is not too big, one option is to call the collect method and then 
save the result to a local file using standard Java/Scala API. However, keep in 
mind that this will transfer data from all the worker nodes to the driver 
program. Looks like that is what you want to do anyway, but you need to be 
aware of how big that data is and related implications.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Monday, February 1, 2016 6:00 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking for 
something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller 
mailto:moham...@glassbeam.com>> wrote:
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
mailto:moham...@glassbeam.com>> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not w

Re: saveAsTextFile is not writing to local fs

2016-02-01 Thread Siva
Hi Mohamed,

Thanks for your response. Data is available in worker nodes. But looking
for something to write directly to local fs. Seems like it is not an option.

Thanks,
Sivakumar Bhavanari.

On Mon, Feb 1, 2016 at 5:45 PM, Mohammed Guller 
wrote:

> You should not be saving an RDD to local FS if Spark is running on a real
> cluster. Essentially, each Spark worker will save the partitions that it
> processes locally.
>
>
>
> Check the directories on the worker nodes and you should find pieces of
> your file on each node.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 5:40 PM
> *To:* Mohammed Guller
> *Cc:* spark users
> *Subject:* Re: saveAsTextFile is not writing to local fs
>
>
>
> Hi Mohammed,
>
>
>
> Thanks for your quick response. I m submitting spark job to Yarn in
> "yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
> mode. I see the below exception, but this exception occurred after
> saveAsTextfile function is finished.
>
>
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser:
>
> java.net.SocketException: Socket closed
>
> at java.net.SocketInputStream.read(SocketInputStream.java:190)
>
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>
> at
> org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
>
> at
> org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
>
> at
> org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
>
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
>
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>
> at
> org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
>
> at
> org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
>
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
>
> org.spark-project.jetty.io.EofException
>
>
>
> Do you think this one this causing this?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>
> On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
> wrote:
>
> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>
>
>


RE: saveAsTextFile is not writing to local fs

2016-02-01 Thread Mohammed Guller
You should not be saving an RDD to local FS if Spark is running on a real 
cluster. Essentially, each Spark worker will save the partitions that it 
processes locally.

Check the directories on the worker nodes and you should find pieces of your 
file on each node.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 5:40 PM
To: Mohammed Guller
Cc: spark users
Subject: Re: saveAsTextFile is not writing to local fs

Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in 
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG mode. 
I see the below exception, but this exception occurred after saveAsTextfile 
function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at 
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at 
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at 
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at 
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at 
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
mailto:moham...@glassbeam.com>> wrote:
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com<mailto:sbhavan...@gmail.com>]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.



Re: saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Mohammed,

Thanks for your quick response. I m submitting spark job to Yarn in
"yarn-client" mode on a 6 node cluster. I ran the job by turning on DEBUG
mode. I see the below exception, but this exception occurred after
saveAsTextfile function is finished.

16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser:
java.net.SocketException: Socket closed
at java.net.SocketInputStream.read(SocketInputStream.java:190)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at
org.spark-project.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391)
at
org.spark-project.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227)
at
org.spark-project.jetty.http.HttpParser.fill(HttpParser.java:1044)
at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:280)
at
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at
org.spark-project.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72)
at
org.spark-project.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264)
at
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:745)
16/01/29 20:26:57 DEBUG HttpParser: HttpParser{s=-14,l=0,c=-3}
org.spark-project.jetty.io.EofException

Do you think this one this causing this?

Thanks,
Sivakumar Bhavanari.

On Fri, Jan 29, 2016 at 3:55 PM, Mohammed Guller 
wrote:

> Is it a multi-node cluster or you running Spark on a single machine?
>
>
>
> You can change Spark’s logging level to INFO or DEBUG to see what is going
> on.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Siva [mailto:sbhavan...@gmail.com]
> *Sent:* Friday, January 29, 2016 3:38 PM
> *To:* spark users
> *Subject:* saveAsTextFile is not writing to local fs
>
>
>
> Hi Everyone,
>
>
>
> We are using spark 1.4.1 and we have a requirement of writing data local
> fs instead of hdfs.
>
>
>
> When trying to save rdd to local fs with saveAsTextFile, it is just
> writing _SUCCESS file in the folder with no part- files and also no error
> or warning messages on console.
>
>
>
> Is there any place to look at to fix this problem?
>
>
> Thanks,
>
> Sivakumar Bhavanari.
>


RE: saveAsTextFile is not writing to local fs

2016-01-29 Thread Mohammed Guller
Is it a multi-node cluster or you running Spark on a single machine?

You can change Spark’s logging level to INFO or DEBUG to see what is going on.

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Siva [mailto:sbhavan...@gmail.com]
Sent: Friday, January 29, 2016 3:38 PM
To: spark users
Subject: saveAsTextFile is not writing to local fs

Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs 
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing 
_SUCCESS file in the folder with no part- files and also no error or warning 
messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


saveAsTextFile is not writing to local fs

2016-01-29 Thread Siva
Hi Everyone,

We are using spark 1.4.1 and we have a requirement of writing data local fs
instead of hdfs.

When trying to save rdd to local fs with saveAsTextFile, it is just writing
_SUCCESS file in the folder with no part- files and also no error or
warning messages on console.

Is there any place to look at to fix this problem?

Thanks,
Sivakumar Bhavanari.


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Andy Davidson
Hi Unk1102

I also had trouble when I used coalesce(). Reparation() worked much better.
Keep in mind if you have a large number of portions you are probably going
have high communication costs.

Also my code works a lot better on 1.6.0. DataFrame memory was not be
spilled in 1.5.2. In 1.6.0 unpersist() actually frees up memory

Another strange thing I noticed in 1.5.1 was that I had thousands of
partitions. Many of them where empty. Have lots of empty partitions really
slowed things down

Andy

From:  unk1102 
Date:  Tuesday, January 5, 2016 at 11:58 AM
To:  "user @spark" 
Subject:  coalesce(1).saveAsTextfile() takes forever?

> hi I am trying to save many partitions of Dataframe into one CSV file and it
> take forever for large data sets of around 5-6 GB.
> 
> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzi
> p").save("/path/hadoop")
> 
> For small data above code works well but for large data it hangs forever
> does not move on because of only one partitions has to shuffle data of GBs
> please help me
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-
> takes-forever-tp25886.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Umesh Kacha
Hi dataframe has not boolean option for coalesce it is only for RDD I
believe

sourceFrame.coalesce(1,true) //gives compilation error



On Wed, Jan 6, 2016 at 1:38 AM, Alexander Pivovarov 
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102  wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Igor Berman
another option will be to try
rdd.toLocalIterator()
not sure if it will help though

I had same problem and ended up to move all parts to local disk(with Hadoop
FileSystem api) and then processing them locally


On 5 January 2016 at 22:08, Alexander Pivovarov 
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102  wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Alexander Pivovarov
try coalesce(1, true).

On Tue, Jan 5, 2016 at 11:58 AM, unk1102  wrote:

> hi I am trying to save many partitions of Dataframe into one CSV file and
> it
> take forever for large data sets of around 5-6 GB.
>
>
> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>
> For small data above code works well but for large data it hangs forever
> does not move on because of only one partitions has to shuffle data of GBs
> please help me
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread unk1102
hi I am trying to save many partitions of Dataframe into one CSV file and it
take forever for large data sets of around 5-6 GB.

sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")

For small data above code works well but for large data it hangs forever
does not move on because of only one partitions has to shuffle data of GBs
please help me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Improve saveAsTextFile performance

2015-12-07 Thread Akhil Das
In the code, instead of a map try to use the mapPartitions.
Can you look at the event timeline and see where its taking time?

[image: Inline image 1]
You can see it from the driver ui under Stages tab.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 11:14 PM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> I tried partitionBy with a Hashpartitioner still the same issue
> groupBy Operation:
> https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L51
> Join Operation:
> https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L80
>
> Best Regards,
> Ram
> --
>
> Date: Saturday, December 5, 2015 at 7:18 AM
> To: Akhil Das 
>
> Cc: user 
> Subject: Re: Improve saveAsTextFile performance
>
> >If you are doing a join/groupBy kind of operations then you need to make
> sure the keys are evenly distributed throughout the partitions.
>
> Yes I am doing join/groupBy operations.Can you point me to docs on how to
> do this?
>
> Spark 1.5.2
>
>
> First attempt
> Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks 
> Failed
> Tasks Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size /
> Records Shuffle Spill (Memory) Shuffle Spill (Disk) 32
> rc-spark-poc-w-3.c.dailymotion-data.internal:51748 1.2 h 18 0 18 4.4 MB /
> 167812 51.5 GB / 128713 153.1 GB 51.1 GB
>
> Second Attempt
>
> Aggregated Metrics by Executor Executor ID Address Task Time ▾ Total Tasks 
> Failed
> Tasks Succeeded Tasks Shuffle Read Size / Records 5
> rc-spark-poc-w-1.c.dailymotion-data.internal:41061 47 min 8 0 8 3.9 MB /
> 95334
>
>
> Best Regards,
> Ram
>
> From: Akhil Das 
> Date: Saturday, December 5, 2015 at 1:32 AM
> To: Ram VISWANADHA 
> Cc: user 
> Subject: Re: Improve saveAsTextFile performance
>
> Which version of spark are you using? Can you look at the event timeline
> and the DAG of the job and see where its spending more time? .save simply
> triggers your entire pipeline, If you are doing a join/groupBy kind of
> operations then you need to make sure the keys are evenly distributed
> throughout the partitions.
>
> Thanks
> Best Regards
>
> On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA <
> ram.viswana...@dailymotion.com> wrote:
>
>> That didn’t work :(
>> Any help I have documented some steps here.
>>
>> http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes
>>
>> Best Regards,
>> Ram
>>
>> From: Sahil Sareen 
>> Date: Wednesday, December 2, 2015 at 10:18 PM
>> To: Ram VISWANADHA 
>> Cc: Ted Yu , user 
>> Subject: Re: Improve saveAsTextFile performance
>>
>>
>> http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
>>
>
>


Re: Improve saveAsTextFile performance

2015-12-05 Thread Ram VISWANADHA
I tried partitionBy with a Hashpartitioner still the same issue
groupBy Operation: 
https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L51
Join Operation: 
https://gist.github.com/ramv-dailymotion/4e19b96b625c52d7ed3b#file-saveasparquet-java-L80

Best Regards,
Ram
--
Date: Saturday, December 5, 2015 at 7:18 AM
To: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

>If you are doing a join/groupBy kind of operations then you need to make sure 
>the keys are evenly distributed throughout the partitions.

Yes I am doing join/groupBy operations.Can you point me to docs on how to do 
this?

Spark 1.5.2


First attempt
Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records
Shuffle Spill (Memory)  Shuffle Spill (Disk)
32  rc-spark-poc-w-3.c.dailymotion-data.internal:51748  1.2 h   18  
0   18  4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB

Second Attempt

Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records
5   rc-spark-poc-w-1.c.dailymotion-data.internal:41061  47 min  8   
0   8   3.9 MB / 95334


Best Regards,
Ram

From: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Date: Saturday, December 5, 2015 at 1:32 AM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Which version of spark are you using? Can you look at the event timeline and 
the DAG of the job and see where its spending more time? .save simply triggers 
your entire pipeline, If you are doing a join/groupBy kind of operations then 
you need to make sure the keys are evenly distributed throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>> wrote:
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu mailto:yuzhih...@gmail.com>>, user 
mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per



Re: Improve saveAsTextFile performance

2015-12-05 Thread Ram VISWANADHA
>If you are doing a join/groupBy kind of operations then you need to make sure 
>the keys are evenly distributed throughout the partitions.

Yes I am doing join/groupBy operations.Can you point me to docs on how to do 
this?

Spark 1.5.2


First attempt
Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records Shuffle Write Size / Records
Shuffle Spill (Memory)  Shuffle Spill (Disk)
32  rc-spark-poc-w-3.c.dailymotion-data.internal:51748  1.2 h   18  
0   18  4.4 MB / 167812 51.5 GB / 128713153.1 GB51.1 GB

Second Attempt

Aggregated Metrics by Executor
Executor ID Address Task Time ▾ Total Tasks Failed Tasks
Succeeded Tasks Shuffle Read Size / Records
5   rc-spark-poc-w-1.c.dailymotion-data.internal:41061  47 min  8   
0   8   3.9 MB / 95334


Best Regards,
Ram

From: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Date: Saturday, December 5, 2015 at 1:32 AM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Which version of spark are you using? Can you look at the event timeline and 
the DAG of the job and see where its spending more time? .save simply triggers 
your entire pipeline, If you are doing a join/groupBy kind of operations then 
you need to make sure the keys are evenly distributed throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>> wrote:
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu mailto:yuzhih...@gmail.com>>, user 
mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per



Re: Improve saveAsTextFile performance

2015-12-05 Thread Akhil Das
Which version of spark are you using? Can you look at the event timeline
and the DAG of the job and see where its spending more time? .save simply
triggers your entire pipeline, If you are doing a join/groupBy kind of
operations then you need to make sure the keys are evenly distributed
throughout the partitions.

Thanks
Best Regards

On Sat, Dec 5, 2015 at 8:24 AM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> That didn’t work :(
> Any help I have documented some steps here.
>
> http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes
>
> Best Regards,
> Ram
>
> From: Sahil Sareen 
> Date: Wednesday, December 2, 2015 at 10:18 PM
> To: Ram VISWANADHA 
> Cc: Ted Yu , user 
> Subject: Re: Improve saveAsTextFile performance
>
>
> http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per
>


Re: Improve saveAsTextFile performance

2015-12-04 Thread Ram VISWANADHA
That didn’t work :(
Any help I have documented some steps here.
http://stackoverflow.com/questions/34048340/spark-saveastextfile-last-stage-almost-never-finishes

Best Regards,
Ram

From: Sahil Sareen mailto:sareen...@gmail.com>>
Date: Wednesday, December 2, 2015 at 10:18 PM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: Ted Yu mailto:yuzhih...@gmail.com>>, user 
mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per


Re: Improve saveAsTextFile performance

2015-12-02 Thread Sahil Sareen
PTAL:
http://stackoverflow.com/questions/29213404/how-to-split-an-rdd-into-multiple-smaller-rdds-given-a-max-number-of-rows-per

-Sahil

On Thu, Dec 3, 2015 at 9:18 AM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> Yes. That did not help.
>
> Best Regards,
> Ram
> From: Ted Yu 
> Date: Wednesday, December 2, 2015 at 3:25 PM
> To: Ram VISWANADHA 
> Cc: user 
> Subject: Re: Improve saveAsTextFile performance
>
> Have you tried calling coalesce() before saveAsTextFile ?
>
> Cheers
>
> On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA <
> ram.viswana...@dailymotion.com> wrote:
>
>> JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10
>> tasks, the first 9 complete in a reasonable time but the last task is
>> taking a long time to complete. The last task contains the maximum number
>> of records like 90% of the total number of records.  Is there any way to
>> parallelize the execution by increasing the number of tasks or evenly
>> distributing the number of records to different tasks?
>>
>> Thanks in advance.
>>
>> Best Regards,
>> Ram
>>
>
>


Re: Improve saveAsTextFile performance

2015-12-02 Thread Ram VISWANADHA
Yes. That did not help.

Best Regards,
Ram
From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Wednesday, December 2, 2015 at 3:25 PM
To: Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Improve saveAsTextFile performance

Have you tried calling coalesce() before saveAsTextFile ?

Cheers

On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA 
mailto:ram.viswana...@dailymotion.com>> wrote:
JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, 
the first 9 complete in a reasonable time but the last task is taking a long 
time to complete. The last task contains the maximum number of records like 90% 
of the total number of records.  Is there any way to parallelize the execution 
by increasing the number of tasks or evenly distributing the number of records 
to different tasks?

Thanks in advance.

Best Regards,
Ram



Re: Improve saveAsTextFile performance

2015-12-02 Thread Ted Yu
Have you tried calling coalesce() before saveAsTextFile ?

Cheers

On Wed, Dec 2, 2015 at 3:15 PM, Ram VISWANADHA <
ram.viswana...@dailymotion.com> wrote:

> JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10
> tasks, the first 9 complete in a reasonable time but the last task is
> taking a long time to complete. The last task contains the maximum number
> of records like 90% of the total number of records.  Is there any way to
> parallelize the execution by increasing the number of tasks or evenly
> distributing the number of records to different tasks?
>
> Thanks in advance.
>
> Best Regards,
> Ram
>


Improve saveAsTextFile performance

2015-12-02 Thread Ram VISWANADHA
JavaRDD.saveAsTextFile is taking a long time to succeed. There are 10 tasks, 
the first 9 complete in a reasonable time but the last task is taking a long 
time to complete. The last task contains the maximum number of records like 90% 
of the total number of records.  Is there any way to parallelize the execution 
by increasing the number of tasks or evenly distributing the number of records 
to different tasks?

Thanks in advance.

Best Regards,
Ram


Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-09 Thread Andy Davidson
Thank Gerard

I¹ll give that a try. It seems like this approach is going to create a very
large number of files. I guess I could write a cron job to concatenate the
files by hour or maybe days. I imagine this is a common problem. Do you know
of something that does this already ?

I am using the stand alone cluster manager. I do not think it directly
supports cron job/table functionality. It should be easy to use the hdfs api
and linux crontab or may https://quartz-scheduler.org/

Kind regards

andy

From:  Gerard Maas 
Date:  Sunday, November 8, 2015 at 2:13 AM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: streaming: missing data. does saveAsTextFile() append or
replace?

> Andy,
> 
> Using the rdd.saveAsTextFile(...)  will overwrite the data if your target is
> the same file.
> 
> If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
> suffix)  where a new file will be written at each streaming interval.
> Note that this will result in a saved file for each streaming interval. If you
> want to increase the file size (usually a good idea in HDFS), you can use a
> window function over the dstream and save the 'windowed'  dstream instead.
> 
> kind regards, Gerard.
> 
> On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson 
> wrote:
>> Hi
>> 
>> I just started a new spark streaming project. In this phase of the system all
>> we want to do is save the data we received to hdfs. I after running for a
>> couple of days it looks like I am missing a lot of data. I wonder if
>> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
>> in previous window? I noticed that after running for a couple of days  my
>> hdfs file system has 25 file. The names are something like ³part-6². I
>> used 'hadoop fs ­dus¹ to check the total data captured. While the system was
>> running I would periodically call Œdus¹ I was surprised sometimes the numbers
>> of total bytes actually dropped.
>> 
>> 
>> Is there a better way to save write my data to disk?
>> 
>> Any suggestions would be appreciated
>> 
>> Andy
>> 
>> 
>>public static void main(String[] args) {
>> 
>>   SparkConf conf = new SparkConf().setAppName(appName);
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
>> Duration(5 * 1000));
>> 
>> 
>> 
>> [ deleted code Š]
>> 
>> 
>> 
>> data.foreachRDD(new Function, Void>(){
>> 
>> private static final long serialVersionUID =
>> -7957854392903581284L;
>> 
>> 
>> 
>> @Override
>> 
>> public Void call(JavaRDD jsonStr) throws Exception {
>> 
>> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
>> /rawSteamingData is a directory
>> 
>> return null;
>> 
>> }   
>> 
>> });
>> 
>> 
>> 
>> ssc.checkpoint(checkPointUri);
>> 
>> 
>> 
>> ssc.start();
>> 
>> ssc.awaitTermination();
>> 
>> }
> 




Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-08 Thread Gerard Maas
Andy,

Using the rdd.saveAsTextFile(...)  will overwrite the data if your target
is the same file.

If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
suffix)  where a new file will be written at each streaming interval.
Note that this will result in a saved file for each streaming interval. If
you want to increase the file size (usually a good idea in HDFS), you can
use a window function over the dstream and save the 'windowed'  dstream
instead.

kind regards, Gerard.

On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I just started a new spark streaming project. In this phase of the system
> all we want to do is save the data we received to hdfs. I after running for
> a couple of days it looks like I am missing a lot of data. I wonder if
> saveAsTextFile("hdfs:///rawSteamingData”); is overwriting the data I
> capture in previous window? I noticed that after running for a couple of
> days  my hdfs file system has 25 file. The names are something like 
> “part-6”. I
> used 'hadoop fs –dus’ to check the total data captured. While the system
> was running I would periodically call ‘dus’ I was surprised sometimes the
> numbers of total bytes actually dropped.
>
>
> Is there a better way to save write my data to disk?
>
> Any suggestions would be appreciated
>
> Andy
>
>
>public static void main(String[] args) {
>
>SparkConf conf = new SparkConf().setAppName(appName);
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
> Duration(5 * 1000));
>
>
> [ deleted code …]
>
>
> data.foreachRDD(new Function, Void>(){
>
> private static final long serialVersionUID =
> -7957854392903581284L;
>
>
> @Override
>
> public Void call(JavaRDD jsonStr) throws Exception {
>
> jsonStr.saveAsTextFile("hdfs:///rawSteamingData”); // 
> /rawSteamingData
> is a directory
>
> return null;
>
> }
>
> });
>
>
>
> ssc.checkpoint(checkPointUri);
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> }
>


streaming: missing data. does saveAsTextFile() append or replace?

2015-11-07 Thread Andy Davidson
Hi

I just started a new spark streaming project. In this phase of the system
all we want to do is save the data we received to hdfs. I after running for
a couple of days it looks like I am missing a lot of data. I wonder if
saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
in previous window? I noticed that after running for a couple of days  my
hdfs file system has 25 file. The names are something like ³part-6². I
used 'hadoop fs ­dus¹ to check the total data captured. While the system was
running I would periodically call Œdus¹ I was surprised sometimes the
numbers of total bytes actually dropped.


Is there a better way to save write my data to disk?

Any suggestions would be appreciated

Andy


   public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName(appName);

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(5 * 1000));



[ deleted code Š]



data.foreachRDD(new Function, Void>(){

private static final long serialVersionUID =
-7957854392903581284L;



@Override

public Void call(JavaRDD jsonStr) throws Exception {

jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
/rawSteamingData is a directory

return null;

}  

});



ssc.checkpoint(checkPointUri);



ssc.start();

ssc.awaitTermination();

}




RE: error with saveAsTextFile in local directory

2015-11-03 Thread Jack Yang
Yes. My one is 1.4.0.

Then is this problem to do with the version?

I doubt that.  Any comments please?

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, 4 November 2015 11:52 AM
To: Jack Yang
Cc: user@spark.apache.org
Subject: Re: error with saveAsTextFile in local directory

Looks like you were running 1.4.x or earlier release because the allowLocal 
flag is deprecated as of Spark 1.5.0+.

Cheers

On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang 
mailto:j...@uow.edu.au>> wrote:
Hi all,

I am saving some hive- query results into the local directory:

val hdfsFilePath = "hdfs://master:ip/ tempFile ";
val localFilePath = 
"file:///home/hduser/tempFile";
hiveContext.sql(s"""my hql codes here""")
res.printSchema()  --working
res.show()   --working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)  
--still working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)  
--wrong!

then at last, I get the correct results in hdfsFilePath, but nothing in 
localFilePath.
Btw, the localFilePath was created, but the folder was only with a _SUCCESS 
file, no part file.

See the track: (any thougt?)

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at 
myApp.scala:112) with 1 output partitions (allowLocal=false)
// the 112 line is the place I am using saveAsTextFile function to save the 
results locally.

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 
42(saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 41)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 
(MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no 
missing parents
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with 
curMem=3889533, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values 
in memory (estimated size 156.9 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with 
curMem=4050165, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as 
bytes in memory (estimated size 54.8 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.135:32836<http://192.168.70.135:32836> (size: 54.8 KB, 
free: 266.8 MB)
15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast 
at DAGScheduler.scala:874
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from 
ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 
tasks
15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.129:54062<http://192.168.70.129:54062> (size: 54.8 KB, 
free: 1068.8 MB)
15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose 
tasks have all completed, from pool
15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile 
at MyApp.scala:112) finished in 6.360 s
15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile 
at MyApp.scala:112, took 6.588821 s
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/api,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/sto

Re: error with saveAsTextFile in local directory

2015-11-03 Thread Ted Yu
Looks like you were running 1.4.x or earlier release because the allowLocal
flag is deprecated as of Spark 1.5.0+.

Cheers

On Tue, Nov 3, 2015 at 3:07 PM, Jack Yang  wrote:

> Hi all,
>
>
>
> I am saving some hive- query results into the local directory:
>
>
>
> val hdfsFilePath = "hdfs://master:ip/ tempFile ";
>
> val localFilePath = "file:///home/hduser/tempFile";
>
> hiveContext.sql(s"""my hql codes here""")
>
> res.printSchema()  --working
>
> res.show()   --working
>
> res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)
> --still working
>
> res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)
> --wrong!
>
>
>
> then at last, I get the correct results in hdfsFilePath, but nothing in
> localFilePath.
>
> Btw, the localFilePath was created, but the folder was only with a
> _SUCCESS file, no part file.
>
>
>
> See the track: (any thougt?)
>
>
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile
> at myApp.scala:*112*) with 1 output partitions (allowLocal=false)
>
> *// the 112 line is the place I am using saveAsTextFile function to save
> the results locally.*
>
>
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage
> 42(saveAsTextFile at MyApp.scala:112)
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 41)
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42
> (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no
> missing parents
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called
> with curMem=3889533, maxMem=280248975
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as
> values in memory (estimated size 156.9 KB, free 263.4 MB)
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called
> with curMem=4050165, maxMem=280248975
>
> 15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0
> stored as bytes in memory (estimated size 54.8 KB, free 263.4 MB)
>
> 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0
> in memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB)
>
> 15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from
> broadcast at DAGScheduler.scala:874
>
> 15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
> from ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at
> MyApp.scala:112)
>
> 15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0
> with 1 tasks
>
> 15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in
> stage 42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
>
> 15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0
> in memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB)
>
> 15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in
> stage 42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
>
> 15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0,
> whose tasks have all completed, from pool
>
> 15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42
> (saveAsTextFile at MyApp.scala:112) finished in 6.360 s
>
> 15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished:
> saveAsTextFile at MyApp.scala:112, took 6.588821 s
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/executors,null}
>
> 15/11/04 09:57:47 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/environment/json,null}
>
&

error with saveAsTextFile in local directory

2015-11-03 Thread Jack Yang
Hi all,

I am saving some hive- query results into the local directory:

val hdfsFilePath = "hdfs://master:ip/ tempFile ";
val localFilePath = "file:///home/hduser/tempFile";
hiveContext.sql(s"""my hql codes here""")
res.printSchema()  --working
res.show()   --working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(hdfsFilePath)  
--still working
res.map{ x => tranRow2Str(x) }.coalesce(1).saveAsTextFile(localFilePath)  
--wrong!

then at last, I get the correct results in hdfsFilePath, but nothing in 
localFilePath.
Btw, the localFilePath was created, but the folder was only with a _SUCCESS 
file, no part file.

See the track: (any thougt?)

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Got job 4 (saveAsTextFile at 
myApp.scala:112) with 1 output partitions (allowLocal=false)
// the 112 line is the place I am using saveAsTextFile function to save the 
results locally.

15/11/04 09:57:41 INFO scheduler.DAGScheduler: Final stage: ResultStage 
42(saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 41)
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Missing parents: List()
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting ResultStage 42 
(MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112), which has no 
missing parents
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(160632) called with 
curMem=3889533, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28 stored as values 
in memory (estimated size 156.9 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.MemoryStore: ensureFreeSpace(56065) called with 
curMem=4050165, maxMem=280248975
15/11/04 09:57:41 INFO storage.MemoryStore: Block broadcast_28_piece0 stored as 
bytes in memory (estimated size 54.8 KB, free 263.4 MB)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.135:32836 (size: 54.8 KB, free: 266.8 MB)
15/11/04 09:57:41 INFO spark.SparkContext: Created broadcast 28 from broadcast 
at DAGScheduler.scala:874
15/11/04 09:57:41 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from 
ResultStage 42 (MapPartitionsRDD[106] at saveAsTextFile at MyApp.scala:112)
15/11/04 09:57:41 INFO scheduler.TaskSchedulerImpl: Adding task set 42.0 with 1 
tasks
15/11/04 09:57:41 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
42.0 (TID 2018, 192.168.70.129, PROCESS_LOCAL, 5097 bytes)
15/11/04 09:57:41 INFO storage.BlockManagerInfo: Added broadcast_28_piece0 in 
memory on 192.168.70.129:54062 (size: 54.8 KB, free: 1068.8 MB)
15/11/04 09:57:47 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 
42.0 (TID 2018) in 6362 ms on 192.168.70.129 (1/1)
15/11/04 09:57:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 42.0, whose 
tasks have all completed, from pool
15/11/04 09:57:47 INFO scheduler.DAGScheduler: ResultStage 42 (saveAsTextFile 
at MyApp.scala:112) finished in 6.360 s
15/11/04 09:57:47 INFO scheduler.DAGScheduler: Job 4 finished: saveAsTextFile 
at MyApp.scala:112, took 6.588821 s
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/api,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/static,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/executors,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/environment,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/rdd,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/storage,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
15/11/04 09:57:47 INFO handler.ContextHandler: stopped 
o.s.j.s.ServletContextHandler{/stages/pool,null}
15/11/04 09:57:47 INFO handler.ContextHandler: s

Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Ajay Chander
Hi Jacin,

If I was you, first thing that I would do is, write a sample java
application to write data into hdfs and see if it's working fine. Meta data
is being created in hdfs, that means, communication to namenode is working
fine but not to datanodes since you don't see any data inside the file. Why
don't you see hdfs logs and see what's happening when your application is
talking to namenode? I suspect some networking issue or check if the
datanodes are running fine.

Thank you,
Ajay

On Saturday, October 3, 2015, Jacinto Arias  wrote:

> Yes printing the result with collect or take is working,
>
> actually this is a minimal example, but also when working with real data
> the actions are performed, and the resulting RDDs can be printed out
> without problem. The data is there and the operations are correct, they
> just cannot be written to a file.
>
>
> On 03 Oct 2015, at 16:17, Ted Yu  > wrote:
>
> bq.  val dist = sc.parallelize(l)
>
> Following the above, can you call, e.g. count() on dist before saving ?
>
> Cheers
>
> On Fri, Oct 2, 2015 at 1:21 AM, jarias  > wrote:
>
>> Dear list,
>>
>> I'm experimenting a problem when trying to write any RDD to HDFS. I've
>> tried
>> with minimal examples, scala programs and pyspark programs both in local
>> and
>> cluster modes and as standalone applications or shells.
>>
>> My problem is that when invoking the write command, a task is executed but
>> it just creates an empty folder in the given HDFS path. I'm lost at this
>> point because there is no sign of error or warning in the spark logs.
>>
>> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
>> working properly when using the command tools or running MapReduce jobs.
>>
>>
>> Thank you for your time, I'm not sure if this is just a rookie mistake or
>> an
>> overall config problem.
>>
>> Just a working example:
>>
>> This sequence produces the following log and creates the empty folder
>> "test":
>>
>> scala> val l = Seq.fill(1)(nextInt)
>> scala> val dist = sc.parallelize(l)
>> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")
>>
>>
>> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer
>> Algorithm
>> version is 1
>> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
>> :27
>> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
>> :27) with 2 output partitions (allowLocal=false)
>> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile
>> at
>> :27)
>> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
>> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
>> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3
>> (MapPartitionsRDD[7]
>> at saveAsTextFile at :27), which has no missing parents
>> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
>> curMem=184615, maxMem=278302556
>> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
>> memory (estimated size 134.1 KB, free 265.1 MB)
>> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
>> curMem=321951, maxMem=278302556
>> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as
>> bytes
>> in memory (estimated size 46.6 KB, free 265.1 MB)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in
>> memory
>> on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
>> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
>> broadcast_3_piece0
>> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
>> DAGScheduler.scala:839
>> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from
>> Stage 3
>> (MapPartitionsRDD[7] at saveAsTextFile at :27)
>> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
>> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
>> 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
>> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
>> 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in
>> memory
>> on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
>> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in
>> memory
>> on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
>> 15/10/02 10:19:22 INFO TaskSetManager: Finished tas

Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Jacinto Arias
Yes printing the result with collect or take is working,

actually this is a minimal example, but also when working with real data the 
actions are performed, and the resulting RDDs can be printed out without 
problem. The data is there and the operations are correct, they just cannot be 
written to a file.


> On 03 Oct 2015, at 16:17, Ted Yu  <mailto:yuzhih...@gmail.com>> wrote:
> 
> bq.  val dist = sc.parallelize(l)
> 
> Following the above, can you call, e.g. count() on dist before saving ?
> 
> Cheers
> 
> On Fri, Oct 2, 2015 at 1:21 AM, jarias  <mailto:ja...@elrocin.es>> wrote:
> Dear list,
> 
> I'm experimenting a problem when trying to write any RDD to HDFS. I've tried
> with minimal examples, scala programs and pyspark programs both in local and
> cluster modes and as standalone applications or shells.
> 
> My problem is that when invoking the write command, a task is executed but
> it just creates an empty folder in the given HDFS path. I'm lost at this
> point because there is no sign of error or warning in the spark logs.
> 
> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
> working properly when using the command tools or running MapReduce jobs.
> 
> 
> Thank you for your time, I'm not sure if this is just a rookie mistake or an
> overall config problem.
> 
> Just a working example:
> 
> This sequence produces the following log and creates the empty folder
> "test":
> 
> scala> val l = Seq.fill(1)(nextInt)
> scala> val dist = sc.parallelize(l)
> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/ 
> <http://node1.i3a.info/user/jarias/test/>")
> 
> 
> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 1
> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
> :27
> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
> :27) with 2 output partitions (allowLocal=false)
> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
> :27)
> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7]
> at saveAsTextFile at :27), which has no missing parents
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
> curMem=184615, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 134.1 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
> curMem=321951, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
> in memory (estimated size 46.6 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo1.i3a.info:36330 <http://nodo1.i3a.info:36330/> (size: 46.6 KB, free: 
> 265.3 MB)
> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
> broadcast_3_piece0
> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:839
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
> (MapPartitionsRDD[7] at saveAsTextFile at :27)
> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
> 6, nodo2.i3a.info <http://nodo2.i3a.info/>, PROCESS_LOCAL, 25975 bytes)
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
> 7, nodo3.i3a.info <http://nodo3.i3a.info/>, PROCESS_LOCAL, 25963 bytes)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo2.i3a.info:37759 <http://nodo2.i3a.info:37759/> (size: 46.6 KB, free: 
> 530.2 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo3.i3a.info:54798 <http://nodo3.i3a.info:54798/> (size: 46.6 KB, free: 
> 530.2 MB)
> 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
> 6) in 312 ms on nodo2.i3a.info <http://nodo2.i3a.info/> (1/2)
> 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
> 7) in 313 ms on nodo3.i3a.info <http://nodo3.i3a.info/> (2/2)
> 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
> all completed, from pool
> 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
> :27) finished in 0.334 s
> 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
> :27, took 0.436388 s
> 
> 
> 
> 
> --
> View this message in context: 
> http://a

Re: saveAsTextFile creates an empty folder in HDFS

2015-10-03 Thread Ted Yu
bq.  val dist = sc.parallelize(l)

Following the above, can you call, e.g. count() on dist before saving ?

Cheers

On Fri, Oct 2, 2015 at 1:21 AM, jarias  wrote:

> Dear list,
>
> I'm experimenting a problem when trying to write any RDD to HDFS. I've
> tried
> with minimal examples, scala programs and pyspark programs both in local
> and
> cluster modes and as standalone applications or shells.
>
> My problem is that when invoking the write command, a task is executed but
> it just creates an empty folder in the given HDFS path. I'm lost at this
> point because there is no sign of error or warning in the spark logs.
>
> I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
> working properly when using the command tools or running MapReduce jobs.
>
>
> Thank you for your time, I'm not sure if this is just a rookie mistake or
> an
> overall config problem.
>
> Just a working example:
>
> This sequence produces the following log and creates the empty folder
> "test":
>
> scala> val l = Seq.fill(1)(nextInt)
> scala> val dist = sc.parallelize(l)
> scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")
>
>
> 15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
> version is 1
> 15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
> :27
> 15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
> :27) with 2 output partitions (allowLocal=false)
> 15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
> :27)
> 15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3
> (MapPartitionsRDD[7]
> at saveAsTextFile at :27), which has no missing parents
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
> curMem=184615, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
> memory (estimated size 134.1 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
> curMem=321951, maxMem=278302556
> 15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as
> bytes
> in memory (estimated size 46.6 KB, free 265.1 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
> 15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
> broadcast_3_piece0
> 15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
> DAGScheduler.scala:839
> 15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 3
> (MapPartitionsRDD[7] at saveAsTextFile at :27)
> 15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
> 6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
> 15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
> 7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
> 15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
> on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
> 15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
> 6) in 312 ms on nodo2.i3a.info (1/2)
> 15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
> 7) in 313 ms on nodo3.i3a.info (2/2)
> 15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
> all completed, from pool
> 15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
> :27) finished in 0.334 s
> 15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
> :27, took 0.436388 s
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


saveAsTextFile creates an empty folder in HDFS

2015-10-02 Thread jarias
Dear list,

I'm experimenting a problem when trying to write any RDD to HDFS. I've tried
with minimal examples, scala programs and pyspark programs both in local and
cluster modes and as standalone applications or shells.

My problem is that when invoking the write command, a task is executed but
it just creates an empty folder in the given HDFS path. I'm lost at this
point because there is no sign of error or warning in the spark logs.

I'm running a seven node cluster managed by cdh5.7, spark 1.3. HDFS is
working properly when using the command tools or running MapReduce jobs.


Thank you for your time, I'm not sure if this is just a rookie mistake or an
overall config problem.

Just a working example:

This sequence produces the following log and creates the empty folder
"test":

scala> val l = Seq.fill(1)(nextInt)
scala> val dist = sc.parallelize(l)
scala> dist.saveAsTextFile("hdfs://node1.i3a.info/user/jarias/test/")


15/10/02 10:19:22 INFO FileOutputCommitter: File Output Committer Algorithm
version is 1
15/10/02 10:19:22 INFO SparkContext: Starting job: saveAsTextFile at
:27
15/10/02 10:19:22 INFO DAGScheduler: Got job 3 (saveAsTextFile at
:27) with 2 output partitions (allowLocal=false)
15/10/02 10:19:22 INFO DAGScheduler: Final stage: Stage 3(saveAsTextFile at
:27)
15/10/02 10:19:22 INFO DAGScheduler: Parents of final stage: List()
15/10/02 10:19:22 INFO DAGScheduler: Missing parents: List()
15/10/02 10:19:22 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[7]
at saveAsTextFile at :27), which has no missing parents
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(137336) called with
curMem=184615, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3 stored as values in
memory (estimated size 134.1 KB, free 265.1 MB)
15/10/02 10:19:22 INFO MemoryStore: ensureFreeSpace(47711) called with
curMem=321951, maxMem=278302556
15/10/02 10:19:22 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes
in memory (estimated size 46.6 KB, free 265.1 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo1.i3a.info:36330 (size: 46.6 KB, free: 265.3 MB)
15/10/02 10:19:22 INFO BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/10/02 10:19:22 INFO SparkContext: Created broadcast 3 from broadcast at
DAGScheduler.scala:839
15/10/02 10:19:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
(MapPartitionsRDD[7] at saveAsTextFile at :27)
15/10/02 10:19:22 INFO YarnScheduler: Adding task set 3.0 with 2 tasks
15/10/02 10:19:22 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID
6, nodo2.i3a.info, PROCESS_LOCAL, 25975 bytes)
15/10/02 10:19:22 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID
7, nodo3.i3a.info, PROCESS_LOCAL, 25963 bytes)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo2.i3a.info:37759 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory
on nodo3.i3a.info:54798 (size: 46.6 KB, free: 530.2 MB)
15/10/02 10:19:22 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
6) in 312 ms on nodo2.i3a.info (1/2)
15/10/02 10:19:23 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
7) in 313 ms on nodo3.i3a.info (2/2)
15/10/02 10:19:23 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have
all completed, from pool 
15/10/02 10:19:23 INFO DAGScheduler: Stage 3 (saveAsTextFile at
:27) finished in 0.334 s
15/10/02 10:19:23 INFO DAGScheduler: Job 3 finished: saveAsTextFile at
:27, took 0.436388 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-creates-an-empty-folder-in-HDFS-tp24906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Exception during SaveAstextFile Stage

2015-09-24 Thread Chirag Dewan
Hi,

I have 2 stages in my job map and save as text file. During the save text file 
stage I am getting an exception :

15/09/24 15:38:16 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)

It might be too early to ask this since I haven't digged at all into why it is 
coming, any one has any idea about this?

Thanks in advance,

Chirag


RE: spark 1.4.1 saveAsTextFile (and Parquet) is slow on emr-4.0.0

2015-09-03 Thread Ewan Leith
For those who have similar issues on EMR writing Parquet files, if you update 
mapred-site.xml with the following lines:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
parquet.enable.summary-metadatafalse
spark.sql.parquet.output.committer.classorg.apache.spark.sql.parquet.DirectParquetOutputCommitter
 

Then you get Parquet files writing direct to S3 without use of temporary files 
too, and the disabled summary-metadata files which can cause a performance hit 
with writing large Parquet datasets on S3

The easiest way to add them across the cluster is via the –configurations flag 
on the “aws emr create-cluster” command

Thanks,
Ewan


From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: 03 September 2015 00:12
To: Neil Jonkers 
Cc: user@spark.apache.org
Subject: Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.  
saveAsTextFile is fast now.
2015-09-02 23:07:00,022 INFO  [task-result-getter-0] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 18.0 in stage 0.0 (TID 18) in 4398 
ms on ip-10-0-24-103.ec2.internal (1/24)
2015-09-02 23:07:01,887 INFO  [task-result-getter-2] scheduler.TaskSetManager 
(Logging.scala:logInfo(59)) - Finished task 5.0 in stage 0.0 (TID 5) in 6282 ms 
on ip-10-0-26-14.ec2.internal (24/24)
2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 0 (saveAsTextFile at :22) 
finished in 6.319 s
2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore 
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar 
tmp/test40_141_24_406/_SUCCESS 0

Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers 
mailto:neilod...@gmail.com>> wrote:
Hi,
Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue
You can also config this at cluster launch time with the following 
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
mailto:apivova...@gmail.com>> wrote:
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting
 
mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
 


On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
mailto:apivova...@gmail.com>> wrote:
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class  
com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
mailto:apivova...@gmail.com>> wrote:
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in comparison to 
emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I see 
lots of INFO messages with 404 error from com.amazonaws.latency logger for next 
90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" + 
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - ResultStage 5 (saveAsTextFile at :22) 
finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler 
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all 
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at :22, 
took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem 
(S3NativeFileSystem.java:listStatus(896)) - listStatus 
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404], 
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found 
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 
3B2F06FD11682D22), S3 Extended Request ID: 
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ], 
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], 
AWSRequestID=[3B2F06FD11682D22], 
ServiceEndpoint=[https://foo-bar.s3.amazonaws.com], Exception=1, 
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, 
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923], 
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544], 
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency 
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200], 
ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], 
ServiceEndpoint=[https://foo-ba

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Alexander Pivovarov
Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.
saveAsTextFile
is fast now.

2015-09-02 23:07:00,022 INFO  [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 18.0
in stage 0.0 (TID 18) in 4398 ms on ip-10-0-24-103.ec2.internal (1/24)

2015-09-02 23:07:01,887 INFO  [task-result-getter-2]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 5.0 in
stage 0.0 (TID 5) in 6282 ms on ip-10-0-26-14.ec2.internal (24/24)

2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 0
(saveAsTextFile at :22) finished in 6.319 s

2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar
tmp/test40_141_24_406/_SUCCESS 0


Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers  wrote:

> Hi,
>
> Can you set the following parameters in your mapred-site.xml file please:
>
>
> mapred.output.direct.EmrFileSystemtrue
>
> mapred.output.direct.NativeS3FileSystemtrue
>
> You can also config this at cluster launch time with the following
> Classification via EMR console:
>
>
> classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]
>
>
> Thank you
>
> On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
> wrote:
>
>> I checked previous emr config (emr-3.8)
>> mapred-site.xml has the following setting
>> 
>> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
>> 
>>
>>
>> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov > > wrote:
>>
>>> Should I use DirectOutputCommitter?
>>> spark.hadoop.mapred.output.committer.class
>>>  com.appsflyer.spark.DirectOutputCommitter
>>>
>>>
>>>
>>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <
>>> apivova...@gmail.com> wrote:
>>>
>>>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>>>
>>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>>>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>>>
>>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that
>>>> I see lots of INFO messages with 404 error from com.amazonaws.latency
>>>> logger for next 90 sec
>>>>
>>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>>>
>>>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>>>> (saveAsTextFile at :22) finished in 4.356 s
>>>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>>>> whose tasks have all completed, from pool
>>>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>>>> :22, took 4.547829 s
>>>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>>>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>>>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>>>> https://foo-bar.s3.amazonaws.com], HttpClientP

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Neil Jonkers
Hi,

Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue

You can also config this at cluster launch time with the following
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
wrote:

> I checked previous emr config (emr-3.8)
> mapred-site.xml has the following setting
> 
> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
> 
>
>
> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
> wrote:
>
>> Should I use DirectOutputCommitter?
>> spark.hadoop.mapred.output.committer.class
>>  com.appsflyer.spark.DirectOutputCommitter
>>
>>
>>
>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov > > wrote:
>>
>>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>>
>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>>
>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that
>>> I see lots of INFO messages with 404 error from com.amazonaws.latency
>>> logger for next 90 sec
>>>
>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>>
>>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>>> (saveAsTextFile at :22) finished in 4.356 s
>>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>>> whose tasks have all completed, from pool
>>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>>> :22, took 4.547829 s
>>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>>> RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
>>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
>>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
>>> HttpClientSendRequestTime=[0.089],
>>> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 62C6B413965447FD), S3 Extended Request ID:
>>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
>>> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
>>> RequestSigningTime=[0.271], 

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
I checked previous emr config (emr-3.8)
mapred-site.xml has the following setting

mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter



On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
wrote:

> Should I use DirectOutputCommitter?
> spark.hadoop.mapred.output.committer.class
>  com.appsflyer.spark.DirectOutputCommitter
>
>
>
> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
> wrote:
>
>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>
>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>
>> Actually saveAsTextFile says that it's done in 4.356 sec but after that I
>> see lots of INFO messages with 404 error from com.amazonaws.latency logger
>> for next 90 sec
>>
>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>
>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>> (saveAsTextFile at :22) finished in 4.356 s
>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>> whose tasks have all completed, from pool
>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>> :22, took 4.547829 s
>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], Exception=1,
>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>> RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
>> HttpClientSendRequestTime=[0.089],
>> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>> ID: 62C6B413965447FD), S3 Extended Request ID:
>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], Exception=1,
>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
>> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
>> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
>> 2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>> RequestCount=1, HttpClientPoolPendingCount=0,
>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
>> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
>> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
>> HttpClientSendRequestTime=[0.068],
>> 2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=

Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
Should I use DirectOutputCommitter?
spark.hadoop.mapred.output.committer.class
 com.appsflyer.spark.DirectOutputCommitter



On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov 
wrote:

> I run spark 1.4.1 in amazom aws emr 4.0.0
>
> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
> comparison to emr 3.8  (was 5 sec, now 95 sec)
>
> Actually saveAsTextFile says that it's done in 4.356 sec but after that I
> see lots of INFO messages with 404 error from com.amazonaws.latency logger
> for next 90 sec
>
> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>
> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
> (saveAsTextFile at :22) finished in 4.356 s
> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler
> (Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all
> completed, from pool
> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
> :22, took 4.547829 s
> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
> (S3NativeFileSystem.java:listStatus(896)) - listStatus
> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 3B2F06FD11682D22), S3 Extended Request ID:
> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
> RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
> HttpClientSendRequestTime=[0.089],
> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 62C6B413965447FD), S3 Extended Request ID:
> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
> 2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
> RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
> HttpClientSendRequestTime=[0.068],
> 2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 4846575A1C373BB9), S3 Extended Request ID:
> aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E],
> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
> AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[
> https://foo-bar.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAv

spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-01 Thread Alexander Pivovarov
I run spark 1.4.1 in amazom aws emr 4.0.0

For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
comparison to emr 3.8  (was 5 sec, now 95 sec)

Actually saveAsTextFile says that it's done in 4.356 sec but after that I
see lots of INFO messages with 404 error from com.amazonaws.latency logger
for next 90 sec

spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
"A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
(saveAsTextFile at :22) finished in 4.356 s
2015-09-01 21:16:17,637 INFO  [task-result-getter-2] cluster.YarnScheduler
(Logging.scala:logInfo(59)) - Removed TaskSet 5.0, whose tasks have all
completed, from pool
2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
:22, took 4.547829 s
2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:listStatus(896)) - listStatus
s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 3B2F06FD11682D22), S3 Extended Request ID:
C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
HttpClientSendRequestTime=[0.089],
2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 62C6B413965447FD), S3 Extended Request ID:
4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
HttpClientSendRequestTime=[0.068],
2015-09-01 21:16:17,786 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 4846575A1C373BB9), S3 Extended Request ID:
aw/MMKxKPmuDuxTj4GKyDbp8hgpQbTjipJBzdjdTgbwPgt5NsZS4z+tRf2bk3I2E],
ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
AWSRequestID=[4846575A1C373BB9], ServiceEndpoint=[
https://foo-bar.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.531],
HttpRequestTime=[11.134], HttpClientReceiveResponseTime=[9.434],
RequestSigningTime=[0.206], HttpClientSendRequestTime=[0.13],
2015-09-01 21:16:17,786 INFO  [main] s3n.S3NativeFileSystem
(S3NativeFileSystem.java:listStatus(896)) - listStatus
s3n://foo-bar/tmp/test40_20/_temporary/0/task_201509012116_0005_m_00
with recursive false
2015-09-01 21:16:17,798 INFO  [main] amazonaws.latency
(AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Se

Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
So, just before running the job, if you run the HDFS command at a shell
prompt: "hdfs dfs -ls hdfs://172.31.42.10:54310/./weblogReadResult".
Does it say the path doesn't exist?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:58 AM, Yasemin Kaya  wrote:

> Thanx Dean, i am giving unique output path and in every time i also delete
> the directory before i run the job.
>
> 2015-08-10 15:30 GMT+03:00 Dean Wampler :
>
>> Following Hadoop conventions, Spark won't overwrite an existing
>> directory. You need to provide a unique output path every time you run the
>> program, or delete or rename the target directory before you run the job.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya  wrote:
>>
>>> Hi,
>>>
>>> I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i
>>> submit at local there is no problem , but i run at cluster, saveAsTextFile
>>> doesn't work."*It says me User class threw exception: Output directory
>>> hdfs://172.31.42.10:54310/./weblogReadResult
>>> <http://172.31.42.10:54310/./weblogReadResult> already exists*"
>>>
>>> Is there anyone can help me about this issue ?
>>>
>>> Best,
>>> yasemin
>>>
>>>
>>>
>>> --
>>> hiç ender hiç
>>>
>>
>>
>
>
> --
> hiç ender hiç
>


Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Yasemin Kaya
Thanx Dean, i am giving unique output path and in every time i also delete
the directory before i run the job.

2015-08-10 15:30 GMT+03:00 Dean Wampler :

> Following Hadoop conventions, Spark won't overwrite an existing directory.
> You need to provide a unique output path every time you run the program, or
> delete or rename the target directory before you run the job.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya  wrote:
>
>> Hi,
>>
>> I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
>> at local there is no problem , but i run at cluster, saveAsTextFile doesn't
>> work."*It says me User class threw exception: Output directory
>> hdfs://172.31.42.10:54310/./weblogReadResult
>> <http://172.31.42.10:54310/./weblogReadResult> already exists*"
>>
>> Is there anyone can help me about this issue ?
>>
>> Best,
>> yasemin
>>
>>
>>
>> --
>> hiç ender hiç
>>
>
>


-- 
hiç ender hiç


Re: EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Dean Wampler
Following Hadoop conventions, Spark won't overwrite an existing directory.
You need to provide a unique output path every time you run the program, or
delete or rename the target directory before you run the job.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Mon, Aug 10, 2015 at 7:08 AM, Yasemin Kaya  wrote:

> Hi,
>
> I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
> at local there is no problem , but i run at cluster, saveAsTextFile doesn't
> work."*It says me User class threw exception: Output directory
> hdfs://172.31.42.10:54310/./weblogReadResult
> <http://172.31.42.10:54310/./weblogReadResult> already exists*"
>
> Is there anyone can help me about this issue ?
>
> Best,
> yasemin
>
>
>
> --
> hiç ender hiç
>


EC2 cluster doesn't work saveAsTextFile

2015-08-10 Thread Yasemin Kaya
Hi,

I have EC2 cluster, and am using spark 1.3, yarn and HDFS . When i submit
at local there is no problem , but i run at cluster, saveAsTextFile doesn't
work."*It says me User class threw exception: Output directory
hdfs://172.31.42.10:54310/./weblogReadResult
<http://172.31.42.10:54310/./weblogReadResult> already exists*"

Is there anyone can help me about this issue ?

Best,
yasemin



-- 
hiç ender hiç


Re: Combining Spark Files with saveAsTextFile

2015-08-06 Thread MEETHU MATHEW
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks & Regards, 
Meethu M 


 On Wednesday, 5 August 2015 7:53 AM, Brandon White 
 wrote:
   

 What is the best way to make saveAsTextFile save as only a single file?

  

Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
seems that coallesce do work, see following thread
https://www.mail-archive.com/user%40spark.apache.org/msg00928.html

On 5 August 2015 at 09:47, Igor Berman  wrote:

> using coalesce might be dangerous, since 1 worker process will need to
> handle whole file and if the file is huge you'll get OOM, however it
> depends on implementation, I'm not sure how it will be done
> nevertheless, worse to try the coallesce method(please post your results)
>
> another option would be to use FileUtil.copyMerge which copies each
> partition one after another into destination stream(file); so as soon as
> you've written your hdfs file with spark with multiple partitions in
> parallel(as usual), you can then make another step to merge it into any
> destination you want
>
> On 5 August 2015 at 07:43, Mohammed Guller  wrote:
>
>> Just to further clarify, you can first call coalesce with argument 1 and
>> then call saveAsTextFile. For example,
>>
>>
>>
>> rdd.coalesce(1).saveAsTextFile(...)
>>
>>
>>
>>
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Mohammed Guller
>> *Sent:* Tuesday, August 4, 2015 9:39 PM
>> *To:* 'Brandon White'; user
>> *Subject:* RE: Combining Spark Files with saveAsTextFile
>>
>>
>>
>> One options is to use the coalesce method in the RDD class.
>>
>>
>>
>> Mohammed
>>
>>
>>
>> *From:* Brandon White [mailto:bwwintheho...@gmail.com
>> ]
>> *Sent:* Tuesday, August 4, 2015 7:23 PM
>> *To:* user
>> *Subject:* Combining Spark Files with saveAsTextFile
>>
>>
>>
>> What is the best way to make saveAsTextFile save as only a single file?
>>
>
>


Re: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Igor Berman
using coalesce might be dangerous, since 1 worker process will need to
handle whole file and if the file is huge you'll get OOM, however it
depends on implementation, I'm not sure how it will be done
nevertheless, worse to try the coallesce method(please post your results)

another option would be to use FileUtil.copyMerge which copies each
partition one after another into destination stream(file); so as soon as
you've written your hdfs file with spark with multiple partitions in
parallel(as usual), you can then make another step to merge it into any
destination you want

On 5 August 2015 at 07:43, Mohammed Guller  wrote:

> Just to further clarify, you can first call coalesce with argument 1 and
> then call saveAsTextFile. For example,
>
>
>
> rdd.coalesce(1).saveAsTextFile(...)
>
>
>
>
>
>
>
> Mohammed
>
>
>
> *From:* Mohammed Guller
> *Sent:* Tuesday, August 4, 2015 9:39 PM
> *To:* 'Brandon White'; user
> *Subject:* RE: Combining Spark Files with saveAsTextFile
>
>
>
> One options is to use the coalesce method in the RDD class.
>
>
>
> Mohammed
>
>
>
> *From:* Brandon White [mailto:bwwintheho...@gmail.com
> ]
> *Sent:* Tuesday, August 4, 2015 7:23 PM
> *To:* user
> *Subject:* Combining Spark Files with saveAsTextFile
>
>
>
> What is the best way to make saveAsTextFile save as only a single file?
>


RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
Just to further clarify, you can first call coalesce with argument 1 and then 
call saveAsTextFile. For example,

rdd.coalesce(1).saveAsTextFile(...)



Mohammed

From: Mohammed Guller
Sent: Tuesday, August 4, 2015 9:39 PM
To: 'Brandon White'; user
Subject: RE: Combining Spark Files with saveAsTextFile

One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


RE: Combining Spark Files with saveAsTextFile

2015-08-04 Thread Mohammed Guller
One options is to use the coalesce method in the RDD class.

Mohammed

From: Brandon White [mailto:bwwintheho...@gmail.com]
Sent: Tuesday, August 4, 2015 7:23 PM
To: user
Subject: Combining Spark Files with saveAsTextFile


What is the best way to make saveAsTextFile save as only a single file?


Combining Spark Files with saveAsTextFile

2015-08-04 Thread Brandon White
What is the best way to make saveAsTextFile save as only a single file?


spark cache issue while doing saveAsTextFile and saveAsParquetFile

2015-07-14 Thread mathewvinoj
Hi There,

I am using cache mapPartition to do some processing and cache the result as
below

I am storing the file as both format (parquet and textfile) where 
recomputing is happening both time.Eventhough i put the  cache its not
working as expected.

below is the code snippet.Any help is really appreciated.

 val record = sql(sqlString)
   val outputRecords=record.repartition(1).mapPartitions{rows =>
   val finalList1 = ListBuffer[Row]()  
   while (rows.hasNext){
.
.
finalList1.add(xyz)
  }
 finalList1.iterator   
 }.cache()

 val l = applySchema(outputRecords, schemaName).cache()
  l.saveAsTextFile(filename + ".txt")
 l.saveAsParquetFile(filename+ ".parquet")

Expected result: When we do saveAsTextFile the computation should happen and
cache the result
and the second time when we do saveAsparquetFile it should get the result
from the cache.

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-cache-issue-while-doing-saveAsTextFile-and-saveAsParquetFile-tp23845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD saveAsTextFile() to local disk

2015-07-08 Thread Vijay Pawnarkar
Thanks for the help.

Following are the folders I was trying to write to

*saveAsTextFile("*file:///home/someuser/test2/testupload/20150708/0/")

*saveAsTextFile("f*ile:///home/someuser/test2/testupload/20150708/1/")

*saveAsTextFile("*file:///home/someuser/test2/testupload/20150708/2/")

*saveAsTextFile("*file:///home/someuser/test2/testupload/20150708/3/")


The folder name "test2" was causing issue, for whatever reason the the API
does not recognize file:///home/someuser/test2 as directory.

Once folder name was changed file:///home/someuser/batch/testupload/20150708/0/
, its been working well. I am able to reproduce the issue consistently with
folder name "test2"







On Jul 8, 2015 8:31 PM, "canan chen"  wrote:

> It works for me by using the following code. Could you share your code ?
>
>
> *val data =sc.parallelize(List(1,2,3))*
> *data.saveAsTextFile("file:Users/chen/Temp/c")*
>
> On Thu, Jul 9, 2015 at 4:05 AM, spok20nn  wrote:
>
>> Getting exception when wrting RDD to local disk using following function
>>
>>  saveAsTextFile("file:home/someuser/dir2/testupload/20150708/")
>>
>> The dir (/home/someuser/dir2/testupload/) was created before running the
>> job. The error message is misleading.
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in
>> stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
>> (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
>> Parent path is not a directory: file:/home/someuser/dir2
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
>> at
>>
>> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
>> at
>>
>> org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
>> at
>>
>> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
>> at
>>
>> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
>> at
>>
>> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
>> at
>> org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
>> at
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
>> at
>>
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsTextFile-to-local-disk-tp23725.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: RDD saveAsTextFile() to local disk

2015-07-08 Thread canan chen
It works for me by using the following code. Could you share your code ?


*val data =sc.parallelize(List(1,2,3))*
*data.saveAsTextFile("file:Users/chen/Temp/c")*

On Thu, Jul 9, 2015 at 4:05 AM, spok20nn  wrote:

> Getting exception when wrting RDD to local disk using following function
>
>  saveAsTextFile("file:home/someuser/dir2/testupload/20150708/")
>
> The dir (/home/someuser/dir2/testupload/) was created before running the
> job. The error message is misleading.
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
> (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
> Parent path is not a directory: file:/home/someuser/dir2
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
> at
> org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
> at
>
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
> at
> org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsTextFile-to-local-disk-tp23725.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RDD saveAsTextFile() to local disk

2015-07-08 Thread spok20nn
Getting exception when wrting RDD to local disk using following function

 saveAsTextFile("file:home/someuser/dir2/testupload/20150708/") 

The dir (/home/someuser/dir2/testupload/) was created before running the
job. The error message is misleading. 


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0
(TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
Parent path is not a directory: file:/home/someuser/dir2
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsTextFile-to-local-disk-tp23725.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD saveAsTextFile() to local disk

2015-07-08 Thread Vijay Pawnarkar
Getting exception when wrting RDD to local disk using following function

 saveAsTextFile("file:home/someuser/dir2/testupload/20150708/")

The dir (/home/someuser/dir2/testupload/) was created before running the
job. The error message is misleading.


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 6, xxx.yyy.com): org.apache.hadoop.fs.ParentNotDirectoryException:
Parent path is not a directory: file:/home/someuser/dir2
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:418)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:426)
at
org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:588)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:439)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:426)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1060)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1051)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

-- 
-Vijay


Re: Spark dramatically slow when I add "saveAsTextFile"

2015-05-24 Thread Joe Wass
This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie  wrote:

> *Problem Description*:
>
> The program running in  stand-alone spark cluster (1 master, 6 workers with
> 8g ram and 2 cores).
> Input: a 468MB file with 133433 records stored in HDFS.
> Output: just 2MB file will stored in HDFS
> The program has two map operations and one reduceByKey operation.
> Finally I save the result to HDFS using "*saveAsTextFile*".
> *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a
> few
> seconds), otherwise extremely slow until about 30 mins.
>
> *My program (is very Simple)*
> public static void main(String[] args) throws IOException{
> /**Parameter Setting***/
>  String localPointPath =
> "/home/hduser/skyrock/skyrockImageFeatures.csv";
>  String remoteFilePath =
> "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
>  String outputPath =
> "hdfs://HadoopV26Master:9000/user/sparkoutput/";
>  final int row = 133433;
>  final int col = 458;
>  final double dc = Double.valueOf(args[0]);
>
> SparkConf conf = new SparkConf().
> setAppName("distance")
> .set("spark.executor.memory",
> "4g").set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.eventLog.enabled", "true");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> JavaRDD textFile = sc.textFile(remoteFilePath);
>
> //Broadcast variable, the dimension of this double array:
> 133433*458
> final Broadcast broadcastPoints =
> sc.broadcast(createBroadcastPoints(localPointPath,row,col));
> /**
>  * Compute the distance in terms of each point on each
> instance.
>  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
>  */
> JavaPairRDD distance =
> textFile.flatMapToPair(new
> PairFlatMapFunction(){
> public Iterable>
> call(String v1) throws
> Exception{
> List al =
> Arrays.asList(v1.split(","));
> double[] featureVals = new
> double[al.size()];
> for(int j=0;j featureVals[j] =
> Double.valueOf(al.get(j+1));
> int jIndex = Integer.valueOf(al.get(0));
> double[][] allPoints =
> broadcastPoints.value();
> double sum = 0;
> List<Tuple2<Integer, Double>> list =
> new
> ArrayList>();
> for(int i=0;i sum = 0;
> for(int j=0;j<al.size()-1;j++){
> sum +=
> (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
> }
> list.add(new
> Tuple2<Integer,Double>(jIndex, Math.sqrt(sum) ));
> }
> return list;
> }
> });
>
> //Create zeroOne density
> JavaPairRDD densityZeroOne =
> distance.mapValues(new
> Function(){
> public Integer call(Double v1) throws Exception {
> if(v1 return 1;
> else return 0;
>     }
>
> });
> //  //Combine the density
>     JavaPairRDD<Integer, Integer> counts =
> densityZeroOne.reduceByKey(new
> Function2() {
> public Integer call(Integer v1, Integer
> v2) throws Exception {
> return v1+v2;
>  

Spark dramatically slow when I add "saveAsTextFile"

2015-05-24 Thread allanjie
*Problem Description*:

The program running in  stand-alone spark cluster (1 master, 6 workers with
8g ram and 2 cores).
Input: a 468MB file with 133433 records stored in HDFS.
Output: just 2MB file will stored in HDFS
The program has two map operations and one reduceByKey operation.
Finally I save the result to HDFS using "*saveAsTextFile*".
*Problem*: if I don't add "saveAsTextFile", the program runs very fast(a few
seconds), otherwise extremely slow until about 30 mins.

*My program (is very Simple)*
public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
"/home/hduser/skyrock/skyrockImageFeatures.csv";
 String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
 String outputPath = 
"hdfs://HadoopV26Master:9000/user/sparkoutput/";
 final int row = 133433;
 final int col = 458;
 final double dc = Double.valueOf(args[0]);

SparkConf conf = new SparkConf().
setAppName("distance")
.set("spark.executor.memory", 
"4g").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.eventLog.enabled", "true");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD textFile = sc.textFile(remoteFilePath);

//Broadcast variable, the dimension of this double array: 
133433*458
final Broadcast broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDD distance = 
textFile.flatMapToPair(new
PairFlatMapFunction(){
public Iterable> call(String 
v1) throws
Exception{
List al = Arrays.asList(v1.split(",")); 
double[] featureVals = new double[al.size()];
for(int j=0;j> list = new
ArrayList>();
for(int i=0;i(jIndex, Math.sqrt(sum) ));
}
return list;
}
});

//Create zeroOne density
JavaPairRDD densityZeroOne = 
distance.mapValues(new
Function(){
public Integer call(Double v1) throws Exception {
if(v1 counts = 
densityZeroOne.reduceByKey(new
Function2() {
public Integer call(Integer v1, Integer v2) 
throws Exception {
    return v1+v2;
}
});
counts.*saveAsTextFile*(outputPath+args[1]);
sc.stop();
}

*If I comment "saveAsTextFile", log will be:*
Picked up _JAVA_OPTIONS: -Xmx4g
15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:21:31 INFO Remoting: Starting remoting
15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@HadoopV26Master:57429]
15/05/24 15:21:31 INFO util.Utils: Successfully started service
'sparkDriver' on port 57429.
15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b
15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with
capacity 1966.1 MB
15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4
15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server
15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:21:31 INFO server.AbstractConne

Re: saveAsTextFile() part- files are missing

2015-05-21 Thread Tomasz Fruboes

Hi,

 it looks you are writing to a local filesystem. Could you try writing 
to a location visible by all nodes (master and workers), e.g. nfs share?


 HTH,
  Tomasz

W dniu 21.05.2015 o 17:16, rroxanaioana pisze:

Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file).
The file is stored locally. I loaded the file using native code and then
created the RDD from it.

 JavaRDD rddFromFile = context.parallelize(myFile,
2);
JavaRDD words = rddFromFile.flatMap(...);
JavaPairRDD pairs = words.mapToPair(...);
JavaPairRDD counter = pairs.reduceByKey(..);

counter.saveAsTextFile("file:///root/output");
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTextFile() part- files are missing

2015-05-21 Thread rroxanaioana
Hello!
I just started with Spark. I have an application which counts words in a
file (1 MB file). 
The file is stored locally. I loaded the file using native code and then
created the RDD from it.
   
JavaRDD rddFromFile = context.parallelize(myFile,
2);
JavaRDD words = rddFromFile.flatMap(...);
JavaPairRDD pairs = words.mapToPair(...);
JavaPairRDD counter = pairs.reduceByKey(..);

counter.saveAsTextFile("file:///root/output");
context.close();

I have one master and 2 slaves. I run the program from the master node.
The output directory is created on the master node and on the 2 nodes. On
the master node I have only one file _SUCCES (empty) and on the nodes I have
_temporary file. I printed the counter at the console, the result seems ok.
What am I doing wrong?
Thank you!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-part-files-are-missing-tp22974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
+user

If this was in cluster mode, you should provide a path on a shared file
system, e.g., HDFS, instead of a local path. If this is in local model, I'm
not sure what went wrong.

On Wed, May 20, 2015 at 2:09 PM, Eric Tanner 
wrote:

> Here is the stack trace. Thanks for looking at this.
>
> scala>
> model.freqItemsets.saveAsTextFile("c:///repository/trunk/Scala_210_wspace/fpGrowth/modelText1")
> 15/05/20 14:07:47 INFO SparkContext: Starting job: saveAsTextFile at
> :33
> 15/05/20 14:07:47 INFO DAGScheduler: Got job 15 (saveAsTextFile at
> :33) with 2 output partitions (allowLocal=false)
> 15/05/20 14:07:47 INFO DAGScheduler: Final stage: Stage 30(saveAsTextFile
> at :33)
> 15/05/20 14:07:47 INFO DAGScheduler: Parents of final stage: List(Stage 29)
> 15/05/20 14:07:47 INFO DAGScheduler: Missing parents: List()
> 15/05/20 14:07:47 INFO DAGScheduler: Submitting Stage 30
> (MapPartitionsRDD[21] at saveAsTextFile at :33), which has no
> missing parents
> 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(131288) called with
> curMem=724428, maxMem=278302556
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18 stored as values in
> memory (estimated size 128.2 KB, free 264.6 MB)
> 15/05/20 14:07:47 INFO MemoryStore: ensureFreeSpace(78995) called with
> curMem=855716, maxMem=278302556
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_18_piece0 stored as
> bytes in memory (estimated size 77.1 KB, free 264.5 MB)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Added broadcast_18_piece0 in
> memory on localhost:52396 (size: 77.1 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_18_piece0
> 15/05/20 14:07:47 INFO SparkContext: Created broadcast 18 from broadcast
> at DAGScheduler.scala:839
> 15/05/20 14:07:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 30 (MapPartitionsRDD[21] at saveAsTextFile at :33)
> 15/05/20 14:07:47 INFO TaskSchedulerImpl: Adding task set 30.0 with 2 tasks
> 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 17
> 15/05/20 14:07:47 INFO TaskSetManager: Starting task 0.0 in stage 30.0
> (TID 33, localhost, PROCESS_LOCAL, 1056 bytes)
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17_piece0
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17_piece0 of size 4737
> dropped from memory (free 277372582)
> 15/05/20 14:07:47 INFO TaskSetManager: Starting task 1.0 in stage 30.0
> (TID 34, localhost, PROCESS_LOCAL, 1056 bytes)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_17_piece0 on
> localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO Executor: Running task 1.0 in stage 30.0 (TID 34)
> 15/05/20 14:07:47 INFO Executor: Running task 0.0 in stage 30.0 (TID 33)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_17_piece0
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_17
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_17 of size 6696
> dropped from memory (free 277379278)
> 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 17
> 15/05/20 14:07:47 INFO BlockManager: Removing broadcast 16
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16_piece0
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16_piece0 of size 4737
> dropped from memory (free 277384015)
> 15/05/20 14:07:47 INFO BlockManagerInfo: Removed broadcast_16_piece0 on
> localhost:52396 in memory (size: 4.6 KB, free: 265.1 MB)
> 15/05/20 14:07:47 INFO BlockManagerMaster: Updated info of block
> broadcast_16_piece0
> 15/05/20 14:07:47 INFO BlockManager: Removing block broadcast_16
> 15/05/20 14:07:47 INFO MemoryStore: Block broadcast_16 of size 6696
> dropped from memory (free 277390711)
> 15/05/20 14:07:47 INFO ContextCleaner: Cleaned broadcast 16
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
> blocks out of 2 blocks
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 1 ms
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty
> blocks out of 2 blocks
> 15/05/20 14:07:47 INFO ShuffleBlockFetcherIterator: Started 0 remote
> fetches in 0 ms
> 15/05/20 14:07:47 ERROR Executor: Exception in task 1.0 in stage 30.0 (TID
> 34)
> java.lang.NullPointerException
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
> at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
> at
> or

Re: FP Growth saveAsTextFile

2015-05-20 Thread Xiangrui Meng
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it
would be easier to save freq itemsets as a Parquet file. -Xiangrui

On Wed, May 20, 2015 at 12:16 PM, Eric Tanner
 wrote:
> I am having trouble with saving an FP-Growth model as a text file.  I can
> print out the results, but when I try to save the model I get a
> NullPointerException.
>
> model.freqItemsets.saveAsTextFile("c://fpGrowth/model")
>
> Thanks,
>
> Eric

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



FP Growth saveAsTextFile

2015-05-20 Thread Eric Tanner
I am having trouble with saving an FP-Growth model as a text file.  I can
print out the results, but when I try to save the model I get a
NullPointerException.

model.freqItemsets.saveAsTextFile("c://fpGrowth/model")

Thanks,

Eric


Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-16 Thread Ilya Ganelin
All - this issue showed up when I was tearing down a spark context and
creating a new one. Often, I was unable to then write to HDFS due to this
error. I subsequently switched to a different implementation where instead
of tearing down and re initializing the spark context I'd instead submit a
separate request to YARN.
On Fri, May 15, 2015 at 2:35 PM Puneet Kapoor 
wrote:

> I am seeing this on hadoop 2.4.0 version.
>
> Thanks for your suggestions, i will try those and let you know if they
> help !
>
> On Sat, May 16, 2015 at 1:57 AM, Steve Loughran 
> wrote:
>
>>  What version of Hadoop are you seeing this on?
>>
>>
>>  On 15 May 2015, at 20:03, Puneet Kapoor 
>> wrote:
>>
>>  Hey,
>>
>>  Did you find any solution for this issue, we are seeing similar logs in
>> our Data node logs. Appreciate any help.
>>
>>
>>
>>
>>
>>  2015-05-15 10:51:43,615 ERROR
>> org.apache.hadoop.hdfs.server.datanode.DataNode:
>> NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
>>  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
>> java.net.SocketTimeoutException: 6 millis timeout while waiting for
>> channel to be ready for read. ch :
>> java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
>> remote=/192.168.112.190:46253]
>> at
>> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>> at
>> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>> at
>> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>> at java.io.BufferedInputStream.fill(Unknown Source)
>> at java.io.BufferedInputStream.read1(Unknown Source)
>> at java.io.BufferedInputStream.read(Unknown Source)
>> at java.io.DataInputStream.read(Unknown Source)
>> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
>> at
>> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
>> at
>> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
>> at
>> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
>> at
>> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
>> at java.lang.Thread.run(Unknown Source)
>>
>>
>>  That's being logged @ error level in DN. It doesn't mean the DN has
>> crashed, only that it timed out waiting for data: something has gone wrong
>> elsewhere.
>>
>>  https://issues.apache.org/jira/browse/HDFS-693
>>
>>
>> there's a couple of properties you can do to extend timeouts
>>
>>   
>>
>> dfs.socket.timeout
>>
>> 2
>>
>> 
>>
>>
>> 
>>
>> dfs.datanode.socket.write.timeout
>>
>> 2
>>
>> 
>>
>>
>>
>> You can also increase the number of data node tranceiver threads to
>> handle data IO across the network
>>
>>
>> 
>> dfs.datanode.max.xcievers
>> 4096
>> 
>>
>> Yes, that property has that explicit spellinng, it's easy to get wrong
>>
>>
>


Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
I am seeing this on hadoop 2.4.0 version.

Thanks for your suggestions, i will try those and let you know if they help
!

On Sat, May 16, 2015 at 1:57 AM, Steve Loughran 
wrote:

>  What version of Hadoop are you seeing this on?
>
>
>  On 15 May 2015, at 20:03, Puneet Kapoor 
> wrote:
>
>  Hey,
>
>  Did you find any solution for this issue, we are seeing similar logs in
> our Data node logs. Appreciate any help.
>
>
>
>
>
>  2015-05-15 10:51:43,615 ERROR
> org.apache.hadoop.hdfs.server.datanode.DataNode:
> NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
>  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
> java.net.SocketTimeoutException: 6 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
> remote=/192.168.112.190:46253]
> at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
> at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
> at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
> at java.io.BufferedInputStream.fill(Unknown Source)
> at java.io.BufferedInputStream.read1(Unknown Source)
> at java.io.BufferedInputStream.read(Unknown Source)
> at java.io.DataInputStream.read(Unknown Source)
> at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
> at
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
> at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
> at java.lang.Thread.run(Unknown Source)
>
>
>  That's being logged @ error level in DN. It doesn't mean the DN has
> crashed, only that it timed out waiting for data: something has gone wrong
> elsewhere.
>
>  https://issues.apache.org/jira/browse/HDFS-693
>
>
> there's a couple of properties you can do to extend timeouts
>
>   
>
> dfs.socket.timeout
>
> 2
>
> 
>
>
> 
>
> dfs.datanode.socket.write.timeout
>
> 2
>
> 
>
>
>
> You can also increase the number of data node tranceiver threads to handle
> data IO across the network
>
>
> 
> dfs.datanode.max.xcievers
> 4096
> 
>
> Yes, that property has that explicit spellinng, it's easy to get wrong
>
>


Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Steve Loughran
What version of Hadoop are you seeing this on?


On 15 May 2015, at 20:03, Puneet Kapoor 
mailto:puneet.cse.i...@gmail.com>> wrote:

Hey,

Did you find any solution for this issue, we are seeing similar logs in our 
Data node logs. Appreciate any help.





2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: 
NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation  src: 
/192.168.112.190:46253 dst: 
/192.168.151.104:50010
java.net.SocketTimeoutException: 6 millis timeout while waiting for channel 
to be ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/192.168.151.104:50010 
remote=/192.168.112.190:46253]
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read1(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at 
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at 
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Unknown Source)


That's being logged @ error level in DN. It doesn't mean the DN has crashed, 
only that it timed out waiting for data: something has gone wrong elsewhere.


https://issues.apache.org/jira/browse/HDFS-693


there's a couple of properties you can do to extend timeouts

  

dfs.socket.timeout

2






dfs.datanode.socket.write.timeout

2




You can also increase the number of data node tranceiver threads to handle data 
IO across the network



dfs.datanode.max.xcievers
4096


Yes, that property has that explicit spellinng, it's easy to get wrong



Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
Hey,

Did you find any solution for this issue, we are seeing similar logs in our
Data node logs. Appreciate any help.


2015-05-15 10:51:43,615 ERROR
org.apache.hadoop.hdfs.server.datanode.DataNode:
NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
 src: /192.168.112.190:46253 dst: /192.168.151.104:50010
java.net.SocketTimeoutException: 6 millis timeout while waiting for
channel to be ready for read. ch :
java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
remote=/192.168.112.190:46253]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read1(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Unknown Source)

Thanks
Puneet

On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya 
wrote:

> Hi all, as the last stage of execution, I am writing out a dataset to disk. 
> Before I do this, I force the DAG to resolve so this is the only job left in 
> the pipeline. The dataset in question is not especially large (a few 
> gigabytes). During this step however, HDFS will inevitable crash. I will lose 
> connection to data-nodes and get stuck in the loop of death – failure causes 
> job restart, eventually causing the overall job to fail. On the data node 
> logs I see the errors below. Does anyone have any ideas as to what is going 
> on here? Thanks!
>
>
> java.io.IOException: Premature EOF from inputStream
>   at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
>   at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455)
>   at 
> org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing 
> WRITE_BLOCK operation  src: /10.37.248.60:44676 dst: /10.37.248.59:1004
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for 
> channel to be ready for read. ch : java.nio.channels.SocketChannel[connected 
> local=/10.37.248.59:43692 remote=/10.37.248.63:1004]
>   at 
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>   at 
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>   at java.io.FilterInputStream.read(FilterInputStream.java:83)
>   at java.io.FilterInputStream.read(FilterInputStream.java:83)
>   at 
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101)
>   at 
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
>   at 
> org.apache.hadoop.hdfs.protocol.datatransfer.Re

Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan Murty
Thanks much for your help.

Here's what was happening ...
The HDP VM was running in VirtualBox and host was connected to the guest VM
in NAT mode. When I connected this in "Bridged Adapter" mode it worked !

On Tue, May 5, 2015 at 8:54 PM, ayan guha  wrote:

> Try to add one more data node or make minreplication to 0. Hdfs is trying
> to replicate at least one more copy and not able to find another DN to do
> thay
> On 6 May 2015 09:37, "Sudarshan Murty"  wrote:
>
>> Another thing - could it be a permission problem ?
>> It creates all the directory structure (in red)/tmp/wordcount/
>> _temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>> so I am guessing not.
>>
>> On Tue, May 5, 2015 at 7:27 PM, Sudarshan Murty 
>> wrote:
>>
>>> You are most probably right. I assumed others may have run into this.
>>> When I try to put the files in there, it creates a directory structure
>>> with the part-0 and part1 files but these files are of size 0 - no
>>> content. The client error and the server logs have  the error message shown
>>> - which seem to indicate that the system is aware that a datanode exists
>>> but is excluded from the operation. So, it looks like it is not partitioned
>>> and Ambari indicates that HDFS is in good health with one NN, one SN, one
>>> DN.
>>> I am unable to figure out what the issue is.
>>> thanks for your help.
>>>
>>> On Tue, May 5, 2015 at 6:39 PM, ayan guha  wrote:
>>>
>>>> What happens when you try to put files to your hdfs from local
>>>> filesystem? Looks like its a hdfs issue rather than spark thing.
>>>> On 6 May 2015 05:04, "Sudarshan"  wrote:
>>>>
>>>>> I have searched all replies to this question & not found an answer.
>>>>>
>>>>> I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
>>>>> side, on the same machine and trying to write output of wordcount program 
>>>>> into HDFS (works fine writing to a local file, /tmp/wordcount).
>>>>>
>>>>> Only line I added to the wordcount program is: (where 'counts' is the 
>>>>> JavaPairRDD)
>>>>> *counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
>>>>> <http://sandbox.hortonworks.com:8020/tmp/wordcount>");*
>>>>>
>>>>> When I check in HDFS at that location (/tmp) here's what I find.
>>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
>>>>> and
>>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>>>
>>>>> and *both part-000[01] are 0 size files*.
>>>>>
>>>>> The wordcount client output error is:
>>>>> [Stage 1:>  (0 + 
>>>>> 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
>>>>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
>>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>>>  *could only be replicated to 0 nodes instead of minReplication (=1).  
>>>>> There are 1 datanode(s) running and 1 node(s) are excluded in this 
>>>>> operation.*
>>>>>   at 
>>>>> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
>>>>>   at 
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
>>>>>   at 
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)
>>>>>
>>>>>
>>>>> I tried this with Spark 1.2.1 same error.
>>>>> I have plenty of space on the DFS.
>>>>> The Name Node, Sec Name Node & the one Data Node are all healthy.
>>>>>
>>>>> Any hint as to what may be the problem ?
>>>>> thanks in advance.
>>>>> Sudarshan
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: saveAsTextFile() to save output of
>>>>> Spark program to HDFS
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html>
>>>>> Sent from the Apache Spark User List mailing list archive
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>>
>>>>
>>>
>>


Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread ayan guha
Try to add one more data node or make minreplication to 0. Hdfs is trying
to replicate at least one more copy and not able to find another DN to do
thay
On 6 May 2015 09:37, "Sudarshan Murty"  wrote:

> Another thing - could it be a permission problem ?
> It creates all the directory structure (in red)/tmp/wordcount/
> _temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
> so I am guessing not.
>
> On Tue, May 5, 2015 at 7:27 PM, Sudarshan Murty  wrote:
>
>> You are most probably right. I assumed others may have run into this.
>> When I try to put the files in there, it creates a directory structure
>> with the part-0 and part1 files but these files are of size 0 - no
>> content. The client error and the server logs have  the error message shown
>> - which seem to indicate that the system is aware that a datanode exists
>> but is excluded from the operation. So, it looks like it is not partitioned
>> and Ambari indicates that HDFS is in good health with one NN, one SN, one
>> DN.
>> I am unable to figure out what the issue is.
>> thanks for your help.
>>
>> On Tue, May 5, 2015 at 6:39 PM, ayan guha  wrote:
>>
>>> What happens when you try to put files to your hdfs from local
>>> filesystem? Looks like its a hdfs issue rather than spark thing.
>>> On 6 May 2015 05:04, "Sudarshan"  wrote:
>>>
>>>> I have searched all replies to this question & not found an answer.
>>>>
>>>> I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
>>>> side, on the same machine and trying to write output of wordcount program 
>>>> into HDFS (works fine writing to a local file, /tmp/wordcount).
>>>>
>>>> Only line I added to the wordcount program is: (where 'counts' is the 
>>>> JavaPairRDD)
>>>> *counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
>>>> <http://sandbox.hortonworks.com:8020/tmp/wordcount>");*
>>>>
>>>> When I check in HDFS at that location (/tmp) here's what I find.
>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
>>>> and
>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>>
>>>> and *both part-000[01] are 0 size files*.
>>>>
>>>> The wordcount client output error is:
>>>> [Stage 1:>  (0 + 
>>>> 2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
>>>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
>>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>>  *could only be replicated to 0 nodes instead of minReplication (=1).  
>>>> There are 1 datanode(s) running and 1 node(s) are excluded in this 
>>>> operation.*
>>>>at 
>>>> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
>>>>at 
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
>>>>at 
>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)
>>>>
>>>>
>>>> I tried this with Spark 1.2.1 same error.
>>>> I have plenty of space on the DFS.
>>>> The Name Node, Sec Name Node & the one Data Node are all healthy.
>>>>
>>>> Any hint as to what may be the problem ?
>>>> thanks in advance.
>>>> Sudarshan
>>>>
>>>>
>>>> --
>>>> View this message in context: saveAsTextFile() to save output of Spark
>>>> program to HDFS
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html>
>>>> Sent from the Apache Spark User List mailing list archive
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>
>>>
>>
>


Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan Murty
Another thing - could it be a permission problem ?
It creates all the directory structure (in red)/tmp/wordcount/
_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
so I am guessing not.

On Tue, May 5, 2015 at 7:27 PM, Sudarshan Murty  wrote:

> You are most probably right. I assumed others may have run into this.
> When I try to put the files in there, it creates a directory structure
> with the part-0 and part1 files but these files are of size 0 - no
> content. The client error and the server logs have  the error message shown
> - which seem to indicate that the system is aware that a datanode exists
> but is excluded from the operation. So, it looks like it is not partitioned
> and Ambari indicates that HDFS is in good health with one NN, one SN, one
> DN.
> I am unable to figure out what the issue is.
> thanks for your help.
>
> On Tue, May 5, 2015 at 6:39 PM, ayan guha  wrote:
>
>> What happens when you try to put files to your hdfs from local
>> filesystem? Looks like its a hdfs issue rather than spark thing.
>> On 6 May 2015 05:04, "Sudarshan"  wrote:
>>
>>> I have searched all replies to this question & not found an answer.
>>>
>>> I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
>>> side, on the same machine and trying to write output of wordcount program 
>>> into HDFS (works fine writing to a local file, /tmp/wordcount).
>>>
>>> Only line I added to the wordcount program is: (where 'counts' is the 
>>> JavaPairRDD)
>>> *counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
>>> <http://sandbox.hortonworks.com:8020/tmp/wordcount>");*
>>>
>>> When I check in HDFS at that location (/tmp) here's what I find.
>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
>>> and
>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>
>>> and *both part-000[01] are 0 size files*.
>>>
>>> The wordcount client output error is:
>>> [Stage 1:>  (0 + 2) 
>>> / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
>>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
>>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>>  *could only be replicated to 0 nodes instead of minReplication (=1).  
>>> There are 1 datanode(s) running and 1 node(s) are excluded in this 
>>> operation.*
>>> at 
>>> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
>>> at 
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
>>> at 
>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)
>>>
>>>
>>> I tried this with Spark 1.2.1 same error.
>>> I have plenty of space on the DFS.
>>> The Name Node, Sec Name Node & the one Data Node are all healthy.
>>>
>>> Any hint as to what may be the problem ?
>>> thanks in advance.
>>> Sudarshan
>>>
>>>
>>> --
>>> View this message in context: saveAsTextFile() to save output of Spark
>>> program to HDFS
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>


Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan Murty
You are most probably right. I assumed others may have run into this.
When I try to put the files in there, it creates a directory structure with
the part-0 and part1 files but these files are of size 0 - no
content. The client error and the server logs have  the error message shown
- which seem to indicate that the system is aware that a datanode exists
but is excluded from the operation. So, it looks like it is not partitioned
and Ambari indicates that HDFS is in good health with one NN, one SN, one
DN.
I am unable to figure out what the issue is.
thanks for your help.

On Tue, May 5, 2015 at 6:39 PM, ayan guha  wrote:

> What happens when you try to put files to your hdfs from local filesystem?
> Looks like its a hdfs issue rather than spark thing.
> On 6 May 2015 05:04, "Sudarshan"  wrote:
>
>> I have searched all replies to this question & not found an answer.
>>
>> I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
>> side, on the same machine and trying to write output of wordcount program 
>> into HDFS (works fine writing to a local file, /tmp/wordcount).
>>
>> Only line I added to the wordcount program is: (where 'counts' is the 
>> JavaPairRDD)
>> *counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
>> <http://sandbox.hortonworks.com:8020/tmp/wordcount>");*
>>
>> When I check in HDFS at that location (/tmp) here's what I find.
>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
>> and
>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>
>> and *both part-000[01] are 0 size files*.
>>
>> The wordcount client output error is:
>> [Stage 1:>  (0 + 2) 
>> / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
>> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>>  *could only be replicated to 0 nodes instead of minReplication (=1).  There 
>> are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
>>  at 
>> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
>>  at 
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
>>  at 
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)
>>
>>
>> I tried this with Spark 1.2.1 same error.
>> I have plenty of space on the DFS.
>> The Name Node, Sec Name Node & the one Data Node are all healthy.
>>
>> Any hint as to what may be the problem ?
>> thanks in advance.
>> Sudarshan
>>
>>
>> --
>> View this message in context: saveAsTextFile() to save output of Spark
>> program to HDFS
>> <http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>


Re: saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread ayan guha
What happens when you try to put files to your hdfs from local filesystem?
Looks like its a hdfs issue rather than spark thing.
On 6 May 2015 05:04, "Sudarshan"  wrote:

>
> I have searched all replies to this question & not found an answer.
>
> I am running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by 
> side, on the same machine and trying to write output of wordcount program 
> into HDFS (works fine writing to a local file, /tmp/wordcount).
>
> Only line I added to the wordcount program is: (where 'counts' is the 
> JavaPairRDD)
> *counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount 
> <http://sandbox.hortonworks.com:8020/tmp/wordcount>");*
>
> When I check in HDFS at that location (/tmp) here's what I find.
> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0
> and
> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>
> and *both part-000[01] are 0 size files*.
>
> The wordcount client output error is:
> [Stage 1:>  (0 + 2) / 
> 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
> /tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
>  *could only be replicated to 0 nodes instead of minReplication (=1).  There 
> are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
>   at 
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)
>
>
> I tried this with Spark 1.2.1 same error.
> I have plenty of space on the DFS.
> The Name Node, Sec Name Node & the one Data Node are all healthy.
>
> Any hint as to what may be the problem ?
> thanks in advance.
> Sudarshan
>
>
> --
> View this message in context: saveAsTextFile() to save output of Spark
> program to HDFS
> <http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


saveAsTextFile() to save output of Spark program to HDFS

2015-05-05 Thread Sudarshan
I have searched all replies to this question & not found an answer.I am
running standalone Spark 1.3.1 and Hortonwork's HDP 2.2 VM, side by side, on
the same machine and trying to write output of wordcount program into HDFS
(works fine writing to a local file, /tmp/wordcount).Only line I added to
the wordcount program is: (where 'counts' is the
JavaPairRDD)*counts.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/tmp/wordcount");*When
I check in HDFS at that location (/tmp) here's what I
find./tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_00_2/part-0and/tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1and
*both part-000[01] are 0 size files*.The wordcount client output error
is:[Stage 1:>  (0 +
2) / 2]15/05/05 14:40:45 WARN DFSClient: DataStreamer
Exceptionorg.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/tmp/wordcount/_temporary/0/_temporary/attempt_201505051439_0001_m_01_3/part-1
*could only be replicated to 0 nodes instead of minReplication (=1).  There
are 1 datanode(s) running and 1 node(s) are excluded in this operation.*
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3447)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:642)I
tried this with Spark 1.2.1 same error.I have plenty of space on the DFS.The
Name Node, Sec Name Node & the one Data Node are all healthy.Any hint as to
what may be the problem ?thanks in advance.Sudarshan




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-to-save-output-of-Spark-program-to-HDFS-tp22774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Efficient saveAsTextFile by key, directory for each key?

2015-04-22 Thread Arun Luthra
I ended up post-processing the result in hive with a dynamic partition
insert query to get a table partitioned by the key.

Looking further, it seems that 'dynamic partition' insert is in Spark SQL
and working well in Spark SQL versions > 1.2.0:
https://issues.apache.org/jira/browse/SPARK-3007

On Tue, Apr 21, 2015 at 5:45 PM, Arun Luthra  wrote:

> Is there an efficient way to save an RDD with saveAsTextFile in such a way
> that the data gets shuffled into separated directories according to a key?
> (My end goal is to wrap the result in a multi-partitioned Hive table)
>
> Suppose you have:
>
> case class MyData(val0: Int, val1: string, directory_name: String)
>
> and an RDD called myrdd with type RDD[MyData]. Suppose that you already
> have an array of the distinct directory_name's, called distinct_directories.
>
> A very inefficient way to to this is:
>
> distinct_directories.foreach(
>   dir_name => myrdd.filter( mydata => mydata.directory_name == dir_name )
> .map( mydata => Array(mydata.val0.toString, mydata.val1).mkString(","))
> .coalesce(5)
> .saveAsTextFile("base_dir_name/" + f"$dir_name")
> )
>
> I tried this solution, and it does not do the multiple myrdd.filter's in
> parallel.
>
> I'm guessing partitionBy might be in the efficient solution if an easy
> efficient solution exists...
>
> Thanks,
> Arun
>


Efficient saveAsTextFile by key, directory for each key?

2015-04-21 Thread Arun Luthra
Is there an efficient way to save an RDD with saveAsTextFile in such a way
that the data gets shuffled into separated directories according to a key?
(My end goal is to wrap the result in a multi-partitioned Hive table)

Suppose you have:

case class MyData(val0: Int, val1: string, directory_name: String)

and an RDD called myrdd with type RDD[MyData]. Suppose that you already
have an array of the distinct directory_name's, called distinct_directories.

A very inefficient way to to this is:

distinct_directories.foreach(
  dir_name => myrdd.filter( mydata => mydata.directory_name == dir_name )
.map( mydata => Array(mydata.val0.toString, mydata.val1).mkString(","))
.coalesce(5)
.saveAsTextFile("base_dir_name/" + f"$dir_name")
)

I tried this solution, and it does not do the multiple myrdd.filter's in
parallel.

I'm guessing partitionBy might be in the efficient solution if an easy
efficient solution exists...

Thanks,
Arun


Re: Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-17 Thread Akhil Das
Not sure if this will help, but try clearing your jar cache (for sbt
~/.ivy2 and for maven ~/.m2) directories.

Thanks
Best Regards

On Wed, Apr 15, 2015 at 9:33 PM, Manoj Samel 
wrote:

> Env - Spark 1.3 Hadoop 2.3, Kerbeos
>
>  xx.saveAsTextFile(path, codec) gives following trace. Same works with
> Spark 1.2 in same environment
>
> val codec = classOf[]
>
> val a = sc.textFile("/some_hdfs_file")
>
> a.saveAsTextFile("/some_other_hdfs_file", codec) fails with following
> trace in Spark 1.3, works in Spark 1.2 in same env
>
> 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage
> 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
> authenticate the provider BC) [duplicate 7]
> 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
> tasks have all completed, from pool
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
> the provider BC
> at javax.crypto.Cipher.getInstance(Cipher.java:642)
> at javax.crypto.Cipher.getInstance(Cipher.java:580)
>  some codec calls 
> at
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
> at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.util.jar.JarException:
> file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
> entries - org/apache/spark/SparkHadoopWriter$.class
> at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
> at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
> at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
> at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
> at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
> at javax.crypto.Cipher.getInstance(Cipher.java:638)
> ... 16 more
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> 
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>


Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Copy should be doable but I'm not sure how to specify a prefix for the 
directory while keeping the filename (ie part-0) fixed in copy command.



> On Apr 16, 2015, at 1:51 PM, Sean Owen  wrote:
> 
> Just copy the files? it shouldn't matter that much where they are as
> you can find them easily. Or consider somehow sending the batches of
> data straight into Redshift? no idea how that is done but I imagine
> it's doable.
> 
> On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy
>  wrote:
>> Thanks Sean. I want to load each batch into Redshift. What's the best/most 
>> efficient way to do that?
>> 
>> Vadim
>> 
>> 
>>> On Apr 16, 2015, at 1:35 PM, Sean Owen  wrote:
>>> 
>>> You can't, since that's how it's designed to work. Batches are saved
>>> in different "files", which are really directories containing
>>> partitions, as is common in Hadoop. You can move them later, or just
>>> read them where they are.
>>> 
>>> On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
>>>  wrote:
>>>> I am using Spark Streaming where during each micro-batch I output data to 
>>>> S3
>>>> using
>>>> saveAsTextFile. Right now each batch of data is put into its own directory
>>>> containing
>>>> 2 objects, "_SUCCESS" and "part-0."
>>>> 
>>>> How do I output each batch into a common directory?
>>>> 
>>>> Thanks,
>>>> Vadim
>>>> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Also to juggle even further the multithreading model of both spark and HDFS you 
can even publish the data from spark first to a message broker e.g. kafka from 
where a predetermined number (from 1 to infinity) of parallel consumers will 
retrieve and store in HDFS in one or more finely controlled files and 
directories  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:45 PM
To: Evo Eftimov
Cc: 
Subject: Re: saveAsTextFile

 

Thanks Evo for your detailed explanation.


On Apr 16, 2015, at 1:38 PM, Evo Eftimov  wrote:

The reason for this is as follows:

 

1.  You are saving data on HDFS

2.  HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.  Hence each thread of execution in Spark has to write to a separate file 
in HDFS

4.  Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, "_SUCCESS" and "part-0."

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
<https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=057349bb-29a2-4296-82b7-c52b46ae19f6>
 ᐧ

  
<http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752&pi=ff283f35-99c4-4b15-dd07-91df78970bf8>
 



Re: saveAsTextFile

2015-04-16 Thread Sean Owen
Just copy the files? it shouldn't matter that much where they are as
you can find them easily. Or consider somehow sending the batches of
data straight into Redshift? no idea how that is done but I imagine
it's doable.

On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy
 wrote:
> Thanks Sean. I want to load each batch into Redshift. What's the best/most 
> efficient way to do that?
>
> Vadim
>
>
>> On Apr 16, 2015, at 1:35 PM, Sean Owen  wrote:
>>
>> You can't, since that's how it's designed to work. Batches are saved
>> in different "files", which are really directories containing
>> partitions, as is common in Hadoop. You can move them later, or just
>> read them where they are.
>>
>> On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
>>  wrote:
>>> I am using Spark Streaming where during each micro-batch I output data to S3
>>> using
>>> saveAsTextFile. Right now each batch of data is put into its own directory
>>> containing
>>> 2 objects, "_SUCCESS" and "part-0."
>>>
>>> How do I output each batch into a common directory?
>>>
>>> Thanks,
>>> Vadim
>>> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Thanks Evo for your detailed explanation.

> On Apr 16, 2015, at 1:38 PM, Evo Eftimov  wrote:
> 
> The reason for this is as follows:
>  
> 1.   You are saving data on HDFS
> 2.   HDFS as a cluster/server side Service has a Single Writer / Multiple 
> Reader multithreading model
> 3.   Hence each thread of execution in Spark has to write to a separate 
> file in HDFS
> 4.   Moreover the RDDs are partitioned across cluster nodes and operated 
> upon by multiple threads there and on top of that in Spark Streaming you have 
> many micro-batch RDDs streaming in all the time as part of a DStream  
>  
> If you want fine / detailed management of the writing to HDFS you can 
> implement your own HDFS adapter and invoke it in forEachRDD and foreach
>  
> Regards
> Evo Eftimov  
>  
> From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
> Sent: Thursday, April 16, 2015 6:33 PM
> To: user@spark.apache.org
> Subject: saveAsTextFile
>  
> I am using Spark Streaming where during each micro-batch I output data to S3 
> using
> saveAsTextFile. Right now each batch of data is put into its own directory 
> containing
> 2 objects, "_SUCCESS" and "part-0."
>  
> How do I output each batch into a common directory?
>  
> Thanks,
> Vadim
> ᐧ


RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Basically you need to unbundle the elements of the RDD and then store them 
wherever you want - Use foreacPartition and then foreach 

-Original Message-
From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:39 PM
To: Sean Owen
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

Thanks Sean. I want to load each batch into Redshift. What's the best/most 
efficient way to do that?

Vadim


> On Apr 16, 2015, at 1:35 PM, Sean Owen  wrote:
> 
> You can't, since that's how it's designed to work. Batches are saved 
> in different "files", which are really directories containing 
> partitions, as is common in Hadoop. You can move them later, or just 
> read them where they are.
> 
> On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy 
>  wrote:
>> I am using Spark Streaming where during each micro-batch I output 
>> data to S3 using saveAsTextFile. Right now each batch of data is put 
>> into its own directory containing
>> 2 objects, "_SUCCESS" and "part-0."
>> 
>> How do I output each batch into a common directory?
>> 
>> Thanks,
>> Vadim
>> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Nop Sir, it is possible - check my reply earlier 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, April 16, 2015 6:35 PM
To: Vadim Bichutskiy
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

You can't, since that's how it's designed to work. Batches are saved in 
different "files", which are really directories containing partitions, as is 
common in Hadoop. You can move them later, or just read them where they are.

On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy  
wrote:
> I am using Spark Streaming where during each micro-batch I output data 
> to S3 using saveAsTextFile. Right now each batch of data is put into 
> its own directory containing
> 2 objects, "_SUCCESS" and "part-0."
>
> How do I output each batch into a common directory?
>
> Thanks,
> Vadim
> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
The reason for this is as follows:

 

1.   You are saving data on HDFS

2.   HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.   Hence each thread of execution in Spark has to write to a separate 
file in HDFS

4.   Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, "_SUCCESS" and "part-0."

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
<https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3D&type=zerocontent&guid=057349bb-29a2-4296-82b7-c52b46ae19f6>
 ᐧ

  
<http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752&pi=ff283f35-99c4-4b15-dd07-91df78970bf8>
 



Re: saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
Thanks Sean. I want to load each batch into Redshift. What's the best/most 
efficient way to do that?

Vadim


> On Apr 16, 2015, at 1:35 PM, Sean Owen  wrote:
> 
> You can't, since that's how it's designed to work. Batches are saved
> in different "files", which are really directories containing
> partitions, as is common in Hadoop. You can move them later, or just
> read them where they are.
> 
> On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
>  wrote:
>> I am using Spark Streaming where during each micro-batch I output data to S3
>> using
>> saveAsTextFile. Right now each batch of data is put into its own directory
>> containing
>> 2 objects, "_SUCCESS" and "part-0."
>> 
>> How do I output each batch into a common directory?
>> 
>> Thanks,
>> Vadim
>> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-04-16 Thread Sean Owen
You can't, since that's how it's designed to work. Batches are saved
in different "files", which are really directories containing
partitions, as is common in Hadoop. You can move them later, or just
read them where they are.

On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy
 wrote:
> I am using Spark Streaming where during each micro-batch I output data to S3
> using
> saveAsTextFile. Right now each batch of data is put into its own directory
> containing
> 2 objects, "_SUCCESS" and "part-0."
>
> How do I output each batch into a common directory?
>
> Thanks,
> Vadim
> ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsTextFile

2015-04-16 Thread Vadim Bichutskiy
I am using Spark Streaming where during each micro-batch I output data to
S3 using
saveAsTextFile. Right now each batch of data is put into its own directory
containing
2 objects, "_SUCCESS" and "part-0."

How do I output each batch into a common directory?

Thanks,
Vadim
ᐧ


Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2

2015-04-15 Thread Manoj Samel
Env - Spark 1.3 Hadoop 2.3, Kerbeos

 xx.saveAsTextFile(path, codec) gives following trace. Same works with
Spark 1.2 in same environment

val codec = classOf[]

val a = sc.textFile("/some_hdfs_file")

a.saveAsTextFile("/some_other_hdfs_file", codec) fails with following trace
in Spark 1.3, works in Spark 1.2 in same env

15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0
(TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot
authenticate the provider BC) [duplicate 7]
15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose
tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate
the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:642)
at javax.crypto.Cipher.getInstance(Cipher.java:580)
 some codec calls 
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.jar.JarException:
file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned
entries - org/apache/spark/SparkHadoopWriter$.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462)
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322)
at javax.crypto.JarVerifier.verify(JarVerifier.java:250)
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161)
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187)
at javax.crypto.Cipher.getInstance(Cipher.java:638)
... 16 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org

$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


Re: Spark permission denied error when invoking saveAsTextFile

2015-04-01 Thread Kannan Rajah
Ignore the question. There was a Hadoop setting that needed to be set to
get it working.


--
Kannan

On Wed, Apr 1, 2015 at 1:37 PM, Kannan Rajah  wrote:

> Running a simple word count job in standalone mode as a non root user from
> spark-shell. The spark master, worker services are running as root user.
>
> The problem is the _temporary under /user/krajah/output2/_temporary/0 dir
> is being created with root permission even when running the job as non root
> user - krajah in this case. The higher level directories are getting
> created with right permission though. There was a similar question posted
> long time back, but there is no answer:
> http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E
>
>
> *Wrong permission for child directory*
> drwxr-xr-x   - root   root0 2015-04-01 11:20
> /user/krajah/output2/_temporary/0/_temporary
>
>
> *Right permission for parent directories*
> hadoop fs -ls -R /user/krajah/my_output
> drwxr-xr-x   - krajah krajah  1 2015-04-01 11:46
> /user/krajah/my_output/_temporary
> drwxr-xr-x   - krajah krajah  3 2015-04-01 11:46
> /user/krajah/my_output/_temporary/0
>
> *Job and Stacktrace*
>
> scala> val file = sc.textFile("/user/krajah/junk.txt")
> scala> val counts = file.flatMap(line => line.split(" "))
> scala> .map(word => (word, 1))
> scala> .reduceByKey(_ + _)
>
> scala> counts.saveAsTextFile("/user/krajah/count2")
> java.io.IOException: Error: Permission denied
> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
> at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853)
> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199)
> at $iwC$$iwC$$iwC$$iwC.(:17)
> at $iwC$$iwC$$iwC.(:22)
> at $iwC$$iwC.(:24)
> at $iwC.(:26)
> at (:28)
> at .(:32)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> --
> Kannan
>


Spark permission denied error when invoking saveAsTextFile

2015-04-01 Thread Kannan Rajah
Running a simple word count job in standalone mode as a non root user from
spark-shell. The spark master, worker services are running as root user.

The problem is the _temporary under /user/krajah/output2/_temporary/0 dir
is being created with root permission even when running the job as non root
user - krajah in this case. The higher level directories are getting
created with right permission though. There was a similar question posted
long time back, but there is no answer:
http://mail-archives.apache.org/mod_mbox/mesos-user/201408.mbox/%3CCAAeYHL2M9J9xEotf_0zXmZXy2_x-oBHa=xxl2naft203o6u...@mail.gmail.com%3E


*Wrong permission for child directory*
drwxr-xr-x   - root   root0 2015-04-01 11:20
/user/krajah/output2/_temporary/0/_temporary


*Right permission for parent directories*
hadoop fs -ls -R /user/krajah/my_output
drwxr-xr-x   - krajah krajah  1 2015-04-01 11:46
/user/krajah/my_output/_temporary
drwxr-xr-x   - krajah krajah  3 2015-04-01 11:46
/user/krajah/my_output/_temporary/0

*Job and Stacktrace*

scala> val file = sc.textFile("/user/krajah/junk.txt")
scala> val counts = file.flatMap(line => line.split(" "))
scala> .map(word => (word, 1))
scala> .reduceByKey(_ + _)

scala> counts.saveAsTextFile("/user/krajah/count2")
java.io.IOException: Error: Permission denied
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:926)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:345)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1079)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:853)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1199)
at $iwC$$iwC$$iwC$$iwC.(:17)
at $iwC$$iwC$$iwC.(:22)
at $iwC$$iwC.(:24)
at $iwC.(:26)
at (:28)
at .(:32)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


--
Kannan


Pyspark saveAsTextFile exceptions

2015-03-13 Thread Madabhattula Rajesh Kumar
Hi Team,

I'm getting below exception for saving the results into hadoop.


*Code :*
rdd.saveAsTextFile("hdfs://localhost:9000/home/rajesh/data/result.rdd")

Could you please help me how to resolve this issue.

15/03/13 17:19:31 INFO spark.SparkContext: Starting job: saveAsTextFile at
NativeMethodAccessorImpl.java:-2
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Got job 6 (saveAsTextFile at
NativeMethodAccessorImpl.java:-2) with 4 output partitions
(allowLocal=false)
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Final stage: Stage
10(saveAsTextFile at NativeMethodAccessorImpl.java:-2)
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Missing parents: List()
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting Stage 10
(MappedRDD[31] at saveAsTextFile at NativeMethodAccessorImpl.java:-2),
which has no missing parents
15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(98240) called
with curMem=203866, maxMem=280248975
15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9 stored as
values in memory (estimated size 95.9 KB, free 267.0 MB)
15/03/13 17:19:31 INFO storage.MemoryStore: ensureFreeSpace(59150) called
with curMem=302106, maxMem=280248975
15/03/13 17:19:31 INFO storage.MemoryStore: Block broadcast_9_piece0 stored
as bytes in memory (estimated size 57.8 KB, free 266.9 MB)
15/03/13 17:19:31 INFO storage.BlockManagerInfo: Added broadcast_9_piece0
in memory on localhost:57655 (size: 57.8 KB, free: 267.2 MB)
15/03/13 17:19:31 INFO storage.BlockManagerMaster: Updated info of block
broadcast_9_piece0
15/03/13 17:19:31 INFO spark.SparkContext: Created broadcast 9 from
broadcast at DAGScheduler.scala:838
15/03/13 17:19:31 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 10 (MappedRDD[31] at saveAsTextFile at
NativeMethodAccessorImpl.java:-2)
15/03/13 17:19:31 INFO scheduler.TaskSchedulerImpl: Adding task set 10.0
with 4 tasks
15/03/13 17:19:31 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
10.0 (TID 8, localhost, PROCESS_LOCAL, 1375 bytes)
15/03/13 17:19:31 INFO executor.Executor: Running task 0.0 in stage 10.0
(TID 8)
15/03/13 17:19:31 INFO executor.Executor: Fetching
http://10.0.2.15:54815/files/sftordd_pickle with timestamp 1426247370763
15/03/13 17:19:31 INFO util.Utils: Fetching
http://10.0.2.15:54815/files/sftordd_pickle to
/tmp/fetchFileTemp7846328782039551224.tmp
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.dir is
deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.key.class
is deprecated. Instead, use mapreduce.job.output.key.class
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.output.value.class
is deprecated. Instead, use mapreduce.job.output.value.class
15/03/13 17:19:31 INFO Configuration.deprecation: mapred.working.dir is
deprecated. Instead, use mapreduce.job.working.dir
terminate called after throwing an instance of 'std::invalid_argument'
  what():  stoi
15/03/13 17:19:31 ERROR python.PythonRDD: Python worker exited unexpectedly
(crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/home/rajesh/spark-1.2.0/python/pyspark/worker.py", line 90, in main
command = pickleSer._read_with_length(infile)
  File "/home/rajesh/spark-1.2.0/python/pyspark/serializers.py", line 145,
in _read_with_length
length = read_int(stream)
  File "/home/rajesh/spark-1.2.0/python/pyspark/serializers.py", line 511,
in read_int
raise EOFError
EOFError

at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Subprocess exited with status 134
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:161)
at scala.collecti

Re: saveAsTextFile extremely slow near finish

2015-03-11 Thread Imran Rashid
is your data skewed?  Could it be that there are a few keys with a huge
number of records?  You might consider outputting
(recordA, count)
(recordB, count)

instead of

recordA
recordA
recordA
...


you could do this with:

input = sc.textFile
pairsCounts = input.map{x => (x,1)}.reduceByKey{_ + _}
sorted = pairs.sortByKey
sorted.saveAsTextFile


On Mon, Mar 9, 2015 at 12:31 PM, mingweili0x  wrote:

> I'm basically running a sorting using spark. The spark program will read
> from
> HDFS, sort on composite keys, and then save the partitioned result back to
> HDFS.
> pseudo code is like this:
>
> input = sc.textFile
> pairs = input.mapToPair
> sorted = pairs.sortByKey
> values = sorted.values
> values.saveAsTextFile
>
>  Input size is ~ 160G, and I made 1000 partitions specified in
> JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
> splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
> in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
> and the last few jobs just took forever and never finishes.
>
> Cluster setup:
> 8 nodes
> on each node: 15gb memory, 8 cores
>
> running parameters:
> --executor-memory 12G
> --conf "spark.cores.max=60"
>
> Thank you for any help.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


  1   2   3   >