Re: Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-27 Thread Mikhail Pryakhin
Thank you all for your help.
The issue was caused by few failed disks in the cluster. Right after they
had been replaced everything worked well. Looking forward to moving to
spark 3.0 which is able to manage corrupted shuffle blocks

Cheers, Mike Pryakhin.


On Wed, 28 Aug 2019 at 03:44, Darshan Pandya 
wrote:

> you can also try to
>
> set "spark.io.compression.codec" to "snappy" to try a different
> compression codec
>
> On Fri, Aug 16, 2019 at 10:14 AM Vadim Semenov 
> wrote:
>
>> This is what you're looking for:
>>
>> Handle large corrupt shuffle blocks
>> https://issues.apache.org/jira/browse/SPARK-26089
>>
>> So until 3.0 the only way I can think of is to reduce the size/split your
>> job into many
>>
>> On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin 
>> wrote:
>>
>>> Hello, Spark community!
>>>
>>> I've been struggling with my job which constantly fails due to inability
>>> to uncompress some previously compressed blocks while shuffling data.
>>> I use spark 2.2.0 with all the configuration settings left by default
>>> (no specific compression codec is specified). I've ascertained that
>>> LZ4CompressionCodec is used as a default codec. The job fails as soon as
>>> the limit of attempts exceeded with the following  message:
>>>
>>> Caused by: java.io.IOException: Stream is corrupted
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>>> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>>> ... 28 more
>>> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
>>> input buffer
>>>
>>>
>>> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on
>>> how to workaround this issue?  I've tried the Snappy codec but it fails
>>> likewise with a bit different message)
>>>
>>> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
>>> chunk: FAILED_TO_UNCOMPRESS(5)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
>>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>> at
>>> 

Re: web access to sparkUI on docker or k8s pods

2019-08-27 Thread Yaniv Harpaz
thank you, I will check it out


Yaniv Harpaz
[ yaniv.harpaz at gmail.com ]


On Wed, Aug 28, 2019 at 7:14 AM Rao, Abhishek (Nokia - IN/Bangalore) <
abhishek@nokia.com> wrote:

> Hi,
>
>
>
> We have seen this issue when we tried to bringup the UI on custom ingress
> path (default ingress path “/” works). Do you also have similar
> configuration?
>
> We tired setting spark.ui.proxyBase and spark.ui.reverseProxy but did not
> help.
>
>
>
> As a workaround, we’re using ingress port (port on edge node) for now.
> There is option of using nodeport as well. That also works.
>
>
>
> Thanks and Regards,
>
> Abhishek
>
>
>
> *From:* Yaniv Harpaz 
> *Sent:* Tuesday, August 27, 2019 7:34 PM
> *To:* user@spark.apache.org
> *Subject:* web access to sparkUI on docker or k8s pods
>
>
>
> hello guys,
>
> when I launch driver pods or even when I use docker run with the spark
> image,
>
> the spark master UI (8080) works great,
>
> but the sparkUI (4040) is loading w/o the CSS
>
>
>
> when I dig a bit deeper I see
>
> "Refused to apply style from '' because its MIME type ('text/html')
> is not supported stylesheet MIME type, and strict MIME checking is enabled."
>
>
>
> what am I missing here?
>
> Yaniv
>
>
> Yaniv Harpaz
> [ yaniv.harpaz at gmail.com ]
>


Is groupBy and partition are similar in this scenario? Still I need to do paritioning here to save into Cassandra ?

2019-08-27 Thread Shyam P
Hi,
Is groupBy and partition are similar in this scenario?
I  know they are not similar and mean for different purpose but I am
confused here.
Still I need to do partitioning here to save into Cassandra ?

Below is my scenario.

I am using spark-sql-2.4.1 ,spark-cassandra-connector_2.11-2.4.1 with java8
and apache cassandra 3.0 version.

I have my spark-submit or spark cluster enviroment as below to load *2
billion records*.

--executor-cores 3
--executor-memory 9g
--num-executors 5
--driver-cores 2
--driver-memory 4g

I am loading using spark dataframe into cassandra tables. After reading
into spark data set I am grouping by on certain columns as below.

Dataset dataDf = //read data from source .

Dataset groupedDf = dataDf.groupBy("id","type","value"
,"load_date","fiscal_year","fiscal_quarter" , "create_user_txt",
"create_date")



 groupedDf.write().format("org.apache.spark.sql.cassandra")
.option("table","product")
.option("keyspace", "dataload")
.mode(SaveMode.Append)
.save();

Cassandra table(
PRIMARY KEY (( id, type, value, item_code ), load_date)
) WITH CLUSTERING ORDER BY ( load_date DESC )

Basically I am groupBy "id","type","value" ,"load_date" columns. As the
other columns ( "fiscal_year","fiscal_quarter" , "create_user_txt",
"create_date") should be available for storing into cassandra table I have
to include them also in the groupBy clause.

1) Frankly speaking I dont know how to get those columns after groupBy into
resultant dataframe i.e groupedDf to store. Any advice here to how to
tackle this please ?

2) With above process/steps , my spark job for loading is pretty slow due
to lot of shuffling i.e. read shuffle and write shuffle processes.

What should I do here to improve the speed ?

While reading from source (into dataDf) do I need to do anything here to
improve performance?

Is groupBy and partition are similar ? Should I still need to do any
partitioning ? If so , what is the best way/approach given the above
cassandra table?

Please advice me.

thanks,
Shyam


https://stackoverflow.com/questions/57684972/is-groupby-and-partition-are-similar-how-to-improve-performance-my-spark-job-h


RE: web access to sparkUI on docker or k8s pods

2019-08-27 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi,

We have seen this issue when we tried to bringup the UI on custom ingress path 
(default ingress path “/” works). Do you also have similar configuration?
We tired setting spark.ui.proxyBase and spark.ui.reverseProxy but did not help.

As a workaround, we’re using ingress port (port on edge node) for now. There is 
option of using nodeport as well. That also works.

Thanks and Regards,
Abhishek

From: Yaniv Harpaz 
Sent: Tuesday, August 27, 2019 7:34 PM
To: user@spark.apache.org
Subject: web access to sparkUI on docker or k8s pods

hello guys,
when I launch driver pods or even when I use docker run with the spark image,
the spark master UI (8080) works great,
but the sparkUI (4040) is loading w/o the CSS

when I dig a bit deeper I see
"Refused to apply style from '' because its MIME type ('text/html') is not 
supported stylesheet MIME type, and strict MIME checking is enabled."

what am I missing here?
[https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
Yaniv

Yaniv Harpaz
[ yaniv.harpaz at gmail.com ]


question about pyarrow.Table to pyspark.DataFrame conversion

2019-08-27 Thread Artem Kozhevnikov
I wonder if there's some recommended method to convert in memory
pyarrow.Table (or pyarrow.BatchRecord) to pyspark.Dataframe without using
pandas ?
My motivation is about converting nested data (like List[int]) that have an
efficient representation in pyarrow which is not possible with Pandas (I
don't want to pass by python list of int ...).

Thanks in advance !
Artem


Re: Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-27 Thread Darshan Pandya
you can also try to

set "spark.io.compression.codec" to "snappy" to try a different compression
codec

On Fri, Aug 16, 2019 at 10:14 AM Vadim Semenov 
wrote:

> This is what you're looking for:
>
> Handle large corrupt shuffle blocks
> https://issues.apache.org/jira/browse/SPARK-26089
>
> So until 3.0 the only way I can think of is to reduce the size/split your
> job into many
>
> On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin 
> wrote:
>
>> Hello, Spark community!
>>
>> I've been struggling with my job which constantly fails due to inability
>> to uncompress some previously compressed blocks while shuffling data.
>> I use spark 2.2.0 with all the configuration settings left by default (no
>> specific compression codec is specified). I've ascertained that
>> LZ4CompressionCodec is used as a default codec. The job fails as soon as
>> the limit of attempts exceeded with the following  message:
>>
>> Caused by: java.io.IOException: Stream is corrupted
>> at
>> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
>> at
>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
>> at
>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
>> at
>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>> at
>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>> at
>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>> ... 28 more
>> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
>> input buffer
>>
>>
>> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on
>> how to workaround this issue?  I've tried the Snappy codec but it fails
>> likewise with a bit different message)
>>
>> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
>> chunk: FAILED_TO_UNCOMPRESS(5)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
>> at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>> Source)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: failed to uncompress the chunk:
>> FAILED_TO_UNCOMPRESS(5)
>> at
>> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
>> at 

Re: Structured Streaming Dataframe Size

2019-08-27 Thread Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

*Note that Structured Streaming does not materialize the entire table*. It
> reads the latest available data from the streaming data source, processes
> it incrementally to update the result, and then discards the source data.
> It only keeps around the minimal intermediate *state* data as required to
> update the result (e.g. intermediate counts in the earlier example).
>


On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes  wrote:

> I have a quick newbie question.
>
> Spark Structured Streaming creates an unbounded dataframe that keeps
> appending rows to it.
>
> So what's the max size of data it can hold? What if the size becomes
> bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So
> will it write temp data on S3 or on local file system of the cluster?
>
> Nick
>


Structured Streaming Dataframe Size

2019-08-27 Thread Nick Dawes
I have a quick newbie question.

Spark Structured Streaming creates an unbounded dataframe that keeps
appending rows to it.

So what's the max size of data it can hold? What if the size becomes bigger
than the JVM? Will it spill to disk? I'm using S3 as storage. So will it
write temp data on S3 or on local file system of the cluster?

Nick


Driver - Stops Scheduling Streaming Jobs

2019-08-27 Thread Bryan Jeffrey
Hello!

We're running Spark 2.3.0 on Scala 2.11.  We have a number of Spark
Streaming jobs that are using MapWithState.  We've observed that these jobs
will complete some set of stages, and then not schedule the next set of
stages.  It looks like the DAG Scheduler correctly identifies required
stages:

19/08/27 15:29:48 INFO YarnClusterScheduler: Removed TaskSet 79.0, whose
tasks have all completed, from pool
19/08/27 15:29:48 INFO DAGScheduler: ShuffleMapStage 79 (map at
SomeCode.scala:121) finished in 142.985 s
19/08/27 15:29:48 INFO DAGScheduler: looking for newly runnable stages
19/08/27 15:29:48 INFO DAGScheduler: running: Set()
19/08/27 15:29:48 INFO DAGScheduler: waiting: Set(ShuffleMapStage 81,
ResultStage 82, ResultStage 83, ShuffleMapStage 54, ResultStage 61,
ResultStage 55, ShuffleMapStage 48, ShuffleMapStage 84, Result
Stage 49, ShuffleMapStage 85, ShuffleMapStage 56, ResultStage 86,
ShuffleMapStage 57, ResultStage 58, ResultStage 80)
19/08/27 15:29:48 INFO DAGScheduler: failed: Set()

However, we see no stages that begin execution.  This happens semi-rarely
(every couple of days), which makes repro difficult.  I checked known bugs
fixed in 2.3.x and did not see anything pop out.  Has anyone else seen this
behavior? Any thoughts on debugging?

Regards,

Bryan Jeffrey


Blue-Green Deployment of Structured Streaming

2019-08-27 Thread Cressy, Taylor
Hi all,

We are attempting to come up with a blue-green deployment strategy for our 
structured streaming job to minimize down time. The general flow would be:


  1.  Job A is currently streaming
  2.  Job B comes up and starts loading Job A state without starting its query.
  3.  Job B completes the loading of Job A state then informs Job A to 
gracefully shutdown.
  4.  Job A acknowledges its own graceful shutdown.
  5.  Job B becomes the active query

Has anyone had any success in doing something similar? Every attempt we have 
made has ended up in some form of race condition against the state files.

Regards,
Taylor Cressy


web access to sparkUI on docker or k8s pods

2019-08-27 Thread Yaniv Harpaz
hello guys,
when I launch driver pods or even when I use docker run with the spark
image,
the spark master UI (8080) works great,
but the sparkUI (4040) is loading w/o the CSS

when I dig a bit deeper I see
"Refused to apply style from '' because its MIME type ('text/html') is
not supported stylesheet MIME type, and strict MIME checking is enabled."

what am I missing here?
Yaniv

Yaniv Harpaz
[ yaniv.harpaz at gmail.com ]


Spark on k8s: Mount config map in executor

2019-08-27 Thread Steven Stetzler
Hello everyone,

I am wondering if there is a way to mount a Kubernetes ConfigMap into a
directory in a Spark executor on Kubernetes. Poking around the docs, the
only volume mounting options I can find are for a PVC, a directory on the
host machine, and an empty volume. I am trying to pass in configuration
files that alter the start up of the container for a specialized Spark
executor image, and a ConfigMap seems to be the natural Kubernetes solution
for storing and accessing these files in the cluster. I have no way for the
Spark executor to access them however.

I appreciate any help or insight the userbase can offer for this issue.

Thanks,
Steven Stetzler