Structured Streaming - HDFS State Store Performance Issues

2020-01-14 Thread William Briggs
Hi all, I've got a problem that really has me stumped. I'm running a
Structured Streaming query that reads from Kafka, performs some
transformations and stateful aggregations (using flatMapGroupsWithState),
and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1.
Semi-regularly, all the tasks except one will complete, and the one
remaining task will take 1-2 minutes, instead of 1-2 seconds to complete.
I've checked the number of input records (and overall size) for that task,
and everything seems in-line with all the other tasks - there's no visible
skew.

The only thing I have to go on at the moment is that the thread dump on the
executor that is hung shows a 'state-store-maintenance-task' thread, which
is blocked on an "Executor task launch worker" thread - that second thread
shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}),
> Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}),
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}),
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>

And a stack of:

java.lang.Object.wait(Native Method)
>
> org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846)
> => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) =>
> holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316
> })
> net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
> java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287)
> => holding
> Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777
> })
>
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
>
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>

Based on this, I'm guessing that there's some kind of delay happening with
the HDFSStateStore, but my NameNode and DataNode metrics all look good (no
large GCs, plenty of free memory, network bandwidth isn't saturated, no
under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly
appreciated!

Regards,
Will


Exactly-Once delivery with Structured Streaming and Kafka

2019-01-31 Thread William Briggs
I noticed that Spark 2.4.0 implemented support for reading only committed
messages in Kafka, and was excited. Are there currently any plans to update
the Kafka output sink to support exactly-once delivery?

Thanks,
Will


Change in configuration settings?

2018-06-08 Thread William Briggs
I recently upgraded a Structured Streaming application from Spark 2.2.1 ->
Spark 2.3.0. This application runs in yarn-cluster mode, and it made use of
the spark.yarn.{driver|executor}.memoryOverhead properties. I noticed the
job started crashing unexpectedly, and after doing a bunch of digging, it
seems that these properties were migrated to simply be
"spark.driver.memoryOverhead" and "spark.executor.memoryOverhead" - I see
that they existed in the 2.2.1 configuration documentation, but not the
2.3.0 docs.

However, I can't find anything in the release notes between versions that
references this change - should the old spark.yarn.* settings still work,
or were they completely removed in favor the new settings?

Regards,
Will


Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

2018-01-04 Thread William Briggs
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The
job sources data from a Kafka topic, performs a variety of filters and
transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs
with the fsimage, because Structured Streaming creates and destroys a
significant number of HDFS files, and EMR doesn't support a secondary or HA
namenode for fsimage compaction (AWS support directed us to do this, as
Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the
most recent file in the "commits" or "offsets" checkpoint subdirectory is
empty. This seems like an undesirable behavior, as it requires manual
intervention to remove the empty files in order to force the job to fall
back onto the last good values. Has anyone run into this behavior? The only
similar issue I can find is SPARK-21760
, which appears to have
no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will


Re: How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread William Briggs
When submitting to YARN, you can specify two different operation modes for
the driver with the --master parameter: yarn-client or yarn-cluster. For
more information on submitting to YARN, see this page in the Spark docs:
http://spark.apache.org/docs/latest/running-on-yarn.html

yarn-cluster mode will run the driver inside of the Application Master,
which will be retried on failure. The number of retries is dependent on the
yarn.resourcemanager.am.max-attempts configuration setting for the YARN
ResourceManager.

Regards,
Will

On Wed, Aug 19, 2015 at 2:55 AM, Spark Enthusiast sparkenthusi...@yahoo.in
wrote:

 Folks,

 As I see, the Driver program is a single point of failure. Now, I have
 seen ways as to how to make it recover from failures on a restart (using
 Checkpointing) but I have not seen anything as to how to restart it
 automatically if it crashes.

 Will running the Driver as a Hadoop Yarn Application do it? Can someone
 educate me as to how?



Re: Scala: How to match a java object????

2015-08-18 Thread William Briggs
Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:

 Hi all,

 I am trying to run a spark job, in which I receive *java.math.BigDecimal* 
 objects,
 instead of the scala equivalents, and I am trying to convert them into
 Doubles.
 If I try to match-case this object class, I get: *“**error: object
 java.math.BigDecimal is not a value**”*

 How could I get around matching java objects? I would like to avoid a
 multiple try-catch on ClassCastExceptions for all my checks.

 Thank you,
 Saif




Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
There are a lot of variables to consider. I'm not an expert on Spark, and
my ML knowledge is rudimentary at best, but here are some questions whose
answers might help us to help you:

   - What type of Spark cluster are you running (e.g., Stand-alone, Mesos,
   YARN)?
   - What does the HTTP UI tell you in terms of number of stages / tasks,
   number of exectors, and task execution time / memory used / amount of data
   shuffled over the network?

As I said, I'm not all that familiar with the ML side of Spark, but in
general, if I were adding more resources, and not seeing an improvement,
here are a few things I would consider:

   1. Is your data set partitioned to allow the parallelism you are
   seeking? Spark's parallelism comes from processing RDD partitions in
   parallel, not processing individual RDD items in parallel; if you don't
   have enough partitions to take advantage of the extra hardware, you will
   see no benefit from adding capacity to your cluster.
   2. Do you have enough Spark executors to process your partitions in
   parallel? This depends on  your configuration and on your cluster type
   (doubtful this is an issue here, since you are adding more executors and
   seeing very little benefit).
   3. Are your partitions small enough (and/or your executor memory
   configuration large enough) so that each partition fits into the memory of
   an executor? If not, you will be constantly spilling to disk, which will
   have a severe impact on performance.
   4. Are you shuffling over the network? If so, how frequently and how
   much? Are you using efficient serialization (e.g., Kryo) and registering
   your serialized classes in order to minimize shuffle overhead?

There are plenty more variables, and some very good performance tuning
documentation https://spark.apache.org/docs/latest/tuning.html is
available. Without any more information to go on, my best guess would be
that you hit your maximum level of parallelism with the addition of the
second node (and even that was not fully utilized), and thus you see no
difference when adding a third node.

Regards,
Will


On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I try to measure how spark standalone cluster performance scale out with
 multiple machines. I did a test of training the SVM model which is heavy in
 memory computation. I measure the run time for spark standalone cluster of
 1 – 3 nodes, the result is following



 1 node: 35 minutes

 2 nodes: 30.1 minutes

 3 nodes: 30.8 minutes



 So the speed does not seems to increase much with more machines. I know
 there are overhead for coordinating tasks among different machines. Seem to
 me the overhead is over 30% of the total run time.



 Is this typical? Does anybody see significant performance increase with
 more machines? Is there anything I can tune my spark cluster to make it
 scale out with more machines?



 Thanks

 Ningjun





Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
I just wanted to clarify - when I said you hit your maximum level of
parallelism, I meant that the default number of partitions might not be
large enough to take advantage of more hardware, not that there was no way
to increase your parallelism - the documentation I linked gives a few
suggestions on how to increase the number of partitions.

-Will

On Mon, Jun 15, 2015 at 5:00 PM, William Briggs wrbri...@gmail.com wrote:

 There are a lot of variables to consider. I'm not an expert on Spark, and
 my ML knowledge is rudimentary at best, but here are some questions whose
 answers might help us to help you:

- What type of Spark cluster are you running (e.g., Stand-alone,
Mesos, YARN)?
- What does the HTTP UI tell you in terms of number of stages / tasks,
number of exectors, and task execution time / memory used / amount of data
shuffled over the network?

 As I said, I'm not all that familiar with the ML side of Spark, but in
 general, if I were adding more resources, and not seeing an improvement,
 here are a few things I would consider:

1. Is your data set partitioned to allow the parallelism you are
seeking? Spark's parallelism comes from processing RDD partitions in
parallel, not processing individual RDD items in parallel; if you don't
have enough partitions to take advantage of the extra hardware, you will
see no benefit from adding capacity to your cluster.
2. Do you have enough Spark executors to process your partitions in
parallel? This depends on  your configuration and on your cluster type
(doubtful this is an issue here, since you are adding more executors and
seeing very little benefit).
3. Are your partitions small enough (and/or your executor memory
configuration large enough) so that each partition fits into the memory of
an executor? If not, you will be constantly spilling to disk, which will
have a severe impact on performance.
4. Are you shuffling over the network? If so, how frequently and how
much? Are you using efficient serialization (e.g., Kryo) and registering
your serialized classes in order to minimize shuffle overhead?

 There are plenty more variables, and some very good performance tuning
 documentation https://spark.apache.org/docs/latest/tuning.html is
 available. Without any more information to go on, my best guess would be
 that you hit your maximum level of parallelism with the addition of the
 second node (and even that was not fully utilized), and thus you see no
 difference when adding a third node.

 Regards,
 Will


 On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

  I try to measure how spark standalone cluster performance scale out
 with multiple machines. I did a test of training the SVM model which is
 heavy in memory computation. I measure the run time for spark standalone
 cluster of 1 – 3 nodes, the result is following



 1 node: 35 minutes

 2 nodes: 30.1 minutes

 3 nodes: 30.8 minutes



 So the speed does not seems to increase much with more machines. I know
 there are overhead for coordinating tasks among different machines. Seem to
 me the overhead is over 30% of the total run time.



 Is this typical? Does anybody see significant performance increase with
 more machines? Is there anything I can tune my spark cluster to make it
 scale out with more machines?



 Thanks

 Ningjun







Re: Can a Spark App run with spark-submit write pdf files to HDFS

2015-06-09 Thread William Briggs
I don't know anything about your use case, so take this with a grain of
salt, but typically if you are operating at a scale that benefits from
Spark, then you likely will not want to write your output records as
individual files into HDFS. Spark has built-in support for the Hadoop
SequenceFile container format, which is a more scalable way to handle
writing out your results; you could write your Spark RDD transformations in
such a way that your final RDD is a PairRDD with a unique key (possibly
what would normally have been the standalone file name) and the value (in
this case, likely the byte array of the PDF you generated).

It looks like PDFBox's PDDocument class allows you to save the document
to an OutputStream
https://pdfbox.apache.org/docs/1.8.9/javadocs/org/apache/pdfbox/pdmodel/PDDocument.html#save(java.io.OutputStream),
so you could probably get away with saving to a ByteArrayOutputStream, and
snagging the bytes that comprise the final document. You can see more about
how to write SequenceFiles from Spark here
https://spark.apache.org/docs/latest/programming-guide.html#actions.

As an aside, one hint that I have found helpful since I starting working
with Spark is that if your transformation requires classes that are
expensive to instantiate, you may want to look into mapPartitions, which
allows you to do the setup once per partition instead of once per record. I
haven't used PDFBox, but it wouldn't surprise me to learn that there's some
non-neglible overhead involved.

Hope that helps,
Will

On Tue, Jun 9, 2015 at 5:57 PM, Richard Catlin richard.m.cat...@gmail.com
wrote:

 I would like to write pdf files using pdfbox to HDFS from my Spark
 application.  Can this be done?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-a-Spark-App-run-with-spark-submit-write-pdf-files-to-HDFS-tp23233.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkContext Threading

2015-06-06 Thread William Briggs
Hi Lee, I'm stuck with only mobile devices for correspondence right now, so
I can't get to shell to play with this issue - this is all supposition; I
think that the lambdas are closing over the context because it's a
constructor parameter to your Runnable class, which is why inlining the
lambdas into your main method doesn't show this issue.

On Sat, Jun 6, 2015, 10:55 AM Lee McFadden splee...@gmail.com wrote:

 Hi Will,

 That doesn't seem to be the case and was part of the source of my
 confusion. The code currently in the run method of the runnable works
 perfectly fine with the lambda expressions when it is invoked from the main
 method. They also work when they are invoked from within a separate method
 on the Transforms object.

 It was only when putting that same code into another thread that the
 serialization exception occurred.

 Examples throughout the spark docs also use lambda expressions a lot -
 surely those examples also would not work if this is always an issue with
 lambdas?

 On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote:

 Hi Lee, it's actually not related to threading at all - you would still
 have the same problem even if you were using a single thread. See this
 section (
 https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs.


 On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are
 closing around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


 Ah, I see!  So if I broke out the lambda expressions into a method on an
 object it would prevent this issue.  Essentially, don't use lambda
 expressions when using threads.

 Thanks again, I appreciate the help.




Re: Deduping events using Spark

2015-06-04 Thread William Briggs
Hi Lee,

You should be able to create a PairRDD using the Nonce as the key, and the
AnalyticsEvent as the value. I'm very new to Spark, but here is some
uncompilable pseudo code that may or may not help:

events.map(event = (event.getNonce, event)).reduceByKey((a, b) =
a).map(_._2)

The above code is more Scala-like, since that's the syntax with which I
have more familiarity - it looks like the Spark Java 8 API is similar, but
you won't get implicit conversion to a PairRDD when you use a 2-Tuple as
the mapped value. Instead, will need to use the mapToPair function -
there's a good example in the Spark Programming Guide under Working With
Key-Value Pairs
https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
.

Hope this helps!

Regards,
Will

On Thu, Jun 4, 2015 at 1:10 PM, lbierman leebier...@gmail.com wrote:

 I'm still a bit new to Spark and am struggilng to figure out the best way
 to
 Dedupe my events.

 I load my Avro files from HDFS and then I want to dedupe events that have
 the same nonce.

 For example my code so far:

  JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent)
 context.newAPIHadoopRDD(
 context.hadoopConfiguration(),
 AvroKeyInputFormat.class,
 AvroKey.class,
 NullWritable.class
 ).keys())
 .map(event - AnalyticsEvent.newBuilder(event.datum()).build())
 .filter(key - { return
 Optional.ofNullable(key.getStepEventKey()).isPresent(); })

 Now I want to get back an RDD of AnalyticsEvents that are unique. So I
 basically want to do:
 if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
 them.

 I'm not sure how to do this? If I do reduceByKey it reduces by
 AnalyticsEvent not by the values inside?

 Any guidance would be much appreciated how I can walk this list of events
 and only return a filtered version of unique nocnes.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Make HTTP requests from within Spark

2015-06-03 Thread William Briggs
Hi Kaspar,

This is definitely doable, but in my opinion, it's important to remember
that, at its core, Spark is based around a functional programming paradigm
- you're taking input sets of data and, by applying various
transformations, you end up with a dataset that represents your answer.
Without knowing more about your use case, and keeping in mind that I'm very
new to Spark, here are a few things I would want to think about if I were
writing this as a non-Streaming Spark application:

   1. What is your starting dataset? Do you have an initial set of
   parameters or a data source that is used to define each of the millions of
   requests? If so, then that should comprise your first RDD and you can
   perform subsequent transformations to prepare your HTTP requests (e.g.,
   start with the information that drives the generation of the requests, and
   use map/flatMap to create an RDD that has the full list of requests you
   want to run).
   2. Are the HTTP requests read-only, and/or idempotent (are you only
   looking up data, or are you performing requests that cause some sort of
   side effect)? Spark operations against RDDs work by defining a lineage
   graph, and transformations will be re-run if a partition in the lineage
   needs to be recalculated for any reason. If your HTTP requests are causing
   side-effects that should not be repeated, then Spark may not be the best
   fit for that portion of the job, and you might want to use something else,
   pipe the results into HDFS, and then analyze those using Spark..
   3. If your web service requests are lookups or are idempotent, then
   we're on the right track. Keep in mind that your web service probably will
   not scale as well as the Spark job - a naive first-pass implementation
   could easily overwhelm many services, particularly if/when partitions need
   to be recalculated. There are a few mechanisms you can use to mitigate this
   - one is to use mapPartitions rather than map when transforming the set of
   requests to the set of results, initialize an HTTP connection for each
   partition, and transform the data that defines the request into your
   desired dataset by invoking the web service. Using mapPartitions allows you
   to limit the number of concurrent HTTP connections to one per partition
   (although this may be too slow if your service is slow... there is
   obviously a bit of analysis, testing and profiling that would need to be
   done on the entire job). Another consideration would be to look at
   persisting or caching the intermediate results after you've successfully
   retrieved your results from the service, to reduce the likelihood of
   hitting the web service more than necessary.
   4. Just realized you might be looking for help invoking an HTTP service
   programmatically from Scala / Spark - if so, you might want to look at the
   spray-client http://spray.io/documentation/1.2.3/spray-client/ library.
   5. With millions of web service requests, it's highly likely some will
   fail, for a variety of reasons. Look into using Scala's Try
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or
   Either
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads
   to encode success / failure, and treat failed requests as first-class
   citizens in your RDD of results (by retrying them, filtering them, logging
   them, etc., based on your specific needs and use case). Make sure you are
   setting reasonable timeouts on your service calls to prevent the jSpark ob
   from getting stuck if the service turns into a black hole.

As I said above, I'm pretty new to Spark, so others may have some better
advice, or even tell you to ignore mine completely (no hard feelings, I
promise - this is all very new to me).

Good luck!

Regards,
Will

On Wed, Jun 3, 2015 at 3:49 AM, kasparfischer kaspar.fisc...@dreizak.com
wrote:

 Hi everybody,

 I'm new to Spark, apologies if my question is very basic.

 I have a need to send millions of requests to a web service and analyse and
 store the responses in an RDD. I can easy express the analysing part using
 Spark's filter/map/etc. primitives but I don't know how to make the
 requests. Is that something I can do from within Spark? Or Spark Streaming?
 Or does it conflict with the way Spark works?

 I've found a similar question but am not sure whether the answer applies
 here:



 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

 Any clarifications or pointers would be super helpful!

 Thanks,
 Kaspar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: