http://www.meetup.com/Cincinnati-Apache-Spark-Meetup/
Thanks.
Darin.
I've run into a situation where it would appear that foreachPartition is only
running on one of my executors.
I have a small cluster (2 executors with 8 cores each).
When I run a job with a small file (with 16 partitions) I can see that the 16
partitions are initialized but they all appear to b
st the job dies.
Darin.
From: Jacek Laskowski
To: Darin McBeath
Cc: user
Sent: Friday, March 11, 2016 1:24 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one
executor
Hi,
How do you check which executor is used? Can you include a
fo("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}
From: Jacek L
tanding for how repartition
should work or if this is a bug. Thanks Jacek for starting to dig into this.
Darin.
- Original Message -----
From: Darin McBeath
To: Jacek Laskowski
Cc: user
Sent: Friday, March 11, 2016 1:57 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running o
I started playing round with Datasets on Spark 2.0 this morning and I'm
surprised by the significant performance difference I'm seeing between an RDD
and a Dataset for a very basic example.
I've defined a simple case class called AnnotationText that has a handful of
fields.
I create a Datase
I tried using Spark 1.6 in a stand-alone cluster this morning.
I submitted 2 jobs (and they both executed fine). In fact, they are the exact
same jobs with just some different parameters.
I was able to view the application history for the first job.
However, when I tried to view the second job
: Don Drake
To: Darin McBeath
Cc: User
Sent: Wednesday, January 13, 2016 10:10 AM
Subject: Re: Spark 1.6 and Application History not working correctly
I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.
I resolved it be setting the following command-line param
I'm looking for some suggestions based on other's experiences.
I currently have a job that I need to run periodically where I need to read on
the order of 1+ million files from an S3 bucket. It is not the entire bucket
(nor does it match a pattern). Instead, I have a list of random keys that a
The problem isn't really with DTD validation (by default validation is
disabled). The underlying problem is that the DTD can't be found (which is
indicated in your stack trace below). The underlying parser will try and
retrieve the DTD (regardless of validation) because things such as entit
e more information on
the various features.
Darin.
____
From: Darin McBeath
To: "user@spark.apache.org"
Sent: Tuesday, December 1, 2015 11:51 AM
Subject: Re: Turning off DTD Validation using XML Utils package - Spark
The problem isn't really with DTD validat
Another option might be to leverage spark-xml-utils
(https://github.com/dmcbeath/spark-xml-utils)
This is a collection of xml utilities that I've recently revamped that make it
relatively easy to use xpath, xslt, or xquery within the context of a Spark
application (or at least I think so). My
Another option would be to look at spark-xml-utils. We use this extensively in
the manipulation of our XML content.
https://github.com/elsevierlabs-os/spark-xml-utils
There are quite a few examples. Depending on your preference (and what you
want to do), you could use xpath, xquery, or xslt
that it
returns a string. So, you have to be a little creative when returning multiple
values (such as delimiting the values with a special character and then
splitting on this delimiter).
Darin.
From: Diwakar Dhanuskodi
To: Darin McBeath ; Hyukjin Kwon ;
Jörn Franke
Cc: Felix
I've been trying to understand the performance of Datasets (and filters) in
Spark 2.0.
I have a Dataset which I've read from a parquet file and cached into memory
(deser). This is spread across 8 partitions and consumes a total of 826MB of
memory on my cluster. I verified that the dataset wa
How do you find the partitioner for a Dataset?
I have a Dataset (om) which I created and repartitioned using one of the fields
(docId). Reading the documentation, I would assume the om Dataset should be
hash partitioned. But, how can I verify this?
When I do om.rdd.partitioner I get
Option[
I have a Dataset (om) which I created and repartitioned (and cached) using
one of the fields (docId). Reading the Spark documentation, I would assume
the om Dataset should be hash partitioned. But, how can I verify this?
When I do om.rdd.partitioner I get
Option[org.apache.spark.Partitioner] =
I'm not familiar with EDI, but perhaps one option might be spark-xml-utils
(https://github.com/elsevierlabs-os/spark-xml-utils). You could transform the
XML to the XML format required by the xml-to-json function and then return the
json. Spark-xml-utils wraps the open source Saxon project an
I have the following code in a Spark Job.
// Get the baseline input file(s) JavaPairRDD
hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile,
SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDD hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new
ConvertFr
I have the following code in a Spark Job.
// Get the baseline input file(s)
JavaPairRDD hsfBaselinePairRDDReadable =
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
Text.class, Text.class);
JavaPairRDD hsfBaselinePairRDD =
hsfBaselinePairRDDReadable.mapToPair(newConvertFr
I've tried various ideas, but I'm really just shooting in the dark.
I have an 8 node cluster of r3.8xlarge machines. The RDD (with 1024 partitions)
I'm trying to save off to S3 is approximately 1TB in size (with the partitions
pretty evenly distributed in size).
I just tried a test to dial back
ased
from 4 to 16
.set("spark.task.maxFailures","64") // Didn't really matter as I had no
failures in this run
.set("spark.storage.blockManagerSlaveTimeoutMs","30");
From: Sven Krasser
To: Darin McBeath
Cc: User
Sent:
In the following code, I read in a large sequence file from S3 (1TB) spread
across 1024 partitions. When I look at the job/stage summary, I see about
400GB of shuffle writes which seems to make sense as I'm doing a hash partition
on this file.
// Get the baseline input file
JavaPairRDD hsfBase
would really want to do in the first place.
Thanks again for your insights.
Darin.
From: Imran Rashid
To: Darin McBeath
Cc: User
Sent: Tuesday, February 17, 2015 3:29 PM
Subject: Re: MapValues and Shuffle Reads
Hi Darin,
When you say you "see 400GB
In an 'early release' of the Learning Spark book, there is the following
reference:
In Scala and Java, you can determine how an RDD is partitioned using its
partitioner property (or partitioner() method in Java)
However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way
of
Thanks Imran. That's exactly what I needed to know.
Darin.
From: Imran Rashid
To: Darin McBeath
Cc: User
Sent: Tuesday, February 17, 2015 8:35 PM
Subject: Re: How do you get the partitioner for an RDD in Java?
a JavaRDD is just a wrapper aro
Consider the following left outer join
potentialDailyModificationsRDD =
reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new
HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER());
Below are the record counts for the RDDs involved
Number of records for reducedDailyPai
Aaron. Thanks for the class. Since I'm currently writing Java based Spark
applications, I tried converting your class to Java (it seemed pretty
straightforward).
I set up the use of the class as follows:
SparkConf conf = new SparkConf()
.set("spark.hadoop.mapred.output.committer.class",
"com
I'll try it and post a response.
- Original Message -
From: Mingyu Kim
To: Darin McBeath ; Aaron Davidson
Cc: "user@spark.apache.org"
Sent: Monday, February 23, 2015 3:06 PM
Subject: Re: Which OutputCommitter to use for S3?
Cool, we will start from there. Thanks Aaron
Just to close the loop in case anyone runs into the same problem I had.
By setting --hadoop-major-version=2 when using the ec2 scripts, everything
worked fine.
Darin.
- Original Message -
From: Darin McBeath
To: Mingyu Kim ; Aaron Davidson
Cc: "user@spark.apache.org"
Se
I'm using Spark 1.2, stand-alone cluster on ec2 I have a cluster of 8
r3.8xlarge machines but limit the job to only 128 cores. I have also tried
other things such as setting 4 workers per r3.8xlarge and 67gb each but this
made no difference.
The job frequently fails at the end in this step (sa
I have a fairly large Spark job where I'm essentially creating quite a few
RDDs, do several types of joins using these RDDS resulting in a final RDD which
I write back to S3.
Along the way, I would like to capture record counts for some of these RDDs. My
initial approach was to use the count a
Thanks for you quick reply. Yes, that would be fine. I would rather wait/use
the optimal approach as opposed to hacking some one-off solution.
Darin.
From: Kostas Sakellis
To: Darin McBeath
Cc: User
Sent: Friday, February 27, 2015 12:19 PM
Subject: Re
I've downloaded spark 1.2.0 to my laptop. In the lib directory, it includes
spark-assembly-1.2.0-hadoop2.4.0.jar
When I spin up a cluster using the ec2 scripts with 1.2.0 (and set
--hadoop-major-version=2) I notice that in the lib directory for the
master/slaves the assembly is for hadoop2.0.
I am using repartitionAndSortWithinPartitions to partition my content and then
sort within each partition. I've also created a custom partitioner that I use
with repartitionAndSortWithinPartitions. I created a custom partitioner as my
key consist of something like 'groupid|timestamp' and I only
I decided to play around with DataFrames this morning but I'm running into
quite a few issues. I'm assuming that I must be doing something wrong so would
appreciate some advice.
First, I create my Data Frame.
import sqlContext.implicits._
case class Entity(InternalId: Long, EntityId: Long, Ent
I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.
I have an RDD which I've repartitioned so it has 100 partitions (hoping
to increase the parallelism).
When I do a transformation (such as filter) on this RDD, I can't seem to get
more than 24 tasks (my total number of cores a
sed on what the documentation states). What
would I want that value to be based on my configuration below? Or, would I
leave that alone?
From: Daniel Siegmann
To: user@spark.apache.org; Darin McBeath
Sent: Wednesday, July 30, 2014 5:58 PM
Subject: Re: Numbe
00 --spot-price=.08 -z us-east-1e --worker-instances=2
my-cluster
From: Daniel Siegmann
To: Darin McBeath
Cc: Daniel Siegmann ; "user@spark.apache.org"
Sent: Thursday, July 31, 2014 10:04 AM
Subject: Re: Number of partitions and Number of concu
e AWS cluster
(m3.2xlarge) the above will execute in around 10 minutes. I currently use
spot instances (.08/hr each) so this is very economical.
More complex XPath expressions could be used.
Assume a sample record structure of the following
32
tennis
golf
programming
Darin
McBeath
800
I started up a cluster on EC2 (using the provided scripts) and specified a
different instance type for the master and the the worker nodes. The cluster
started fine, but when I looked at the cluster (via port 8080), it showed that
the amount of memory available to the worker nodes did not match
I've seen a couple of issues posted about this, but I never saw a resolution.
When I'm using Spark 1.0.2 (and the spark-submit script to submit my jobs) and
AWS SDK 1.8.7, I get the stack trace below. However, if I drop back to AWS SDK
1.3.26 (or anything from the AWS SDK 1.4.* family) then eve
Can't seem to figure this out. I've tried several different approaches without
success. For example, I've tried setting spark.executor.extraJavaOptions in the
spark-default.conf (prior to starting the spark-shell) but this seems to have
no effect.
Outside of spark-shell (within a java applicat
For weeks, I've been using the following trick to successfully disable log4j in
the spark-shell when running a cluster on ec2 that was started by the Spark
provided ec2 scripts.
cp ./conf/log4j.properties.template ./conf/log4j.properties
I then change log4j.rootCategory=INFO to log4j.rootCateg
I have a PairRDD of type which I persist to S3 (using the
following code).
JavaPairRDD aRDDWritable = aRDD.mapToPair(new
ConvertToWritableTypes());aRDDWritable.saveAsHadoopFile(outputFile, Text.class,
Text.class, SequenceFileOutputFormat.class);
class ConvertToWritableTypes implements PairFunct
I have some code that I only need to be executed once per executor in my spark
application. My current approach is to do something like the following:
scala> xmlKeyPair.foreachPartition(i => XPathProcessor.init("ats",
"Namespaces/NamespaceContext"))
So, If I understand correctly, the XPathProces
I developed the spark-xml-utils library because we have a large amount of XML
in big datasets and I felt this data could be better served by providing some
helpful xml utilities. This includes the ability to filter documents based on
an xpath/xquery expression, return specific nodes for an xpath
I have a SchemaRDD with 100 records in 1 partition. We'll call this baseline.
I have a SchemaRDD with 11 records in 1 partition. We'll call this daily.
After a fairly basic join of these two tables
JavaSchemaRDD results = sqlContext.sql("SELECT id, action, daily.epoch,
daily.version FROM baselin
ok. after reading some documentation, it would appear the issue is the default
number of partitions for a join (200).
After doing something like the following, I was able to change the value.
From: Darin McBeath
To: User
Sent: Wednesday, October 29, 2014 1:55 PM
Subject: Spark SQL
Sorry, hit the send key a bitt too early.
Anyway, this is the code I set.
sqlContext.sql("set spark.sql.shuffle.partitions=10");
From: Darin McBeath
To: Darin McBeath ; User
Sent: Wednesday, October 29, 2014 2:47 PM
Subject: Re: Spark SQL and confused about number of partit
Let me know if you are interested in participating in a meet up in Cincinnati,
OH to discuss Apache Spark.
We currently have 4-5 different companies expressing interest but would like a
few more.
Darin.
I have the following code where I'm using RDD 'union' and 'subtractByKey' to
create a new baseline RDD. All of my RDDs are a key pair with the 'key' a
String and the 'value' a String (xml document).
// **// Merge the daily
deletes/updates/adds
Assume the following where both updatePairRDD and deletePairRDD are both
HashPartitioned. Before the union, each one of these has 512 partitions. The
new created updateDeletePairRDD has 1024 partitions. Is this the
general/expected behavior for a union (the number of partitions to double)?
J
I'm using spark 1.1 and the provided ec2 scripts to start my cluster
(r3.8xlarge machines). From the spark-shell, I can verify that the environment
variables are set
scala> System.getenv("SPARK_LOCAL_DIRS")res0: String = /mnt/spark,/mnt2/spark
However, when I look on the workers, the directories
For one of my Spark jobs, my workers/executors are dying and leaving the
cluster.
On the master, I see something like the following in the log file. I'm
surprised to see the '60' seconds in the master log below because I explicitly
set it to '600' (or so I thought) in my spark job (see below).
Take a look at the O'Reilly Learning Spark (Early Release) book. I've found
this very useful.
Darin.
From: Saurabh Agrawal
To: "user@spark.apache.org"
Sent: Thursday, November 20, 2014 9:04 AM
Subject: Please help me get started on Apache Spark
Friends, I
am pretty new to
56 matches
Mail list logo