Re: Spark Shell issue on HDInsight

2017-05-11 Thread ayan guha
Works for me tooyou are a life-saver :)

But the question: should/how we report this to Azure team?

On Fri, May 12, 2017 at 10:32 AM, Denny Lee  wrote:

> I was able to repro your issue when I had downloaded the jars via blob but
> when I downloaded them as raw, I was able to get everything up and
> running.  For example:
>
> wget https://github.com/Azure/azure-documentdb-spark/*blob*/
> master/releases/azure-documentdb-spark-0.0.3_2.0.2_
> 2.11/azure-documentdb-1.10.0.jar
> wget https://github.com/Azure/azure-documentdb-spark/*blob*/
> master/releases/azure-documentdb-spark-0.0.3_2.0.2_
> 2.11/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
> spark-shell --master yarn --jars azure-documentdb-spark-0.0.3-
> SNAPSHOT.jar,azure-documentdb-1.10.0.jar
>
> resulted in the error:
> SPARK_MAJOR_VERSION is set to 2, using Spark2
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel).
> [init] error: error while loading , Error accessing
> /home/sshuser/jars/test/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
>
> Failed to initialize compiler: object java.lang.Object in compiler mirror
> not found.
> ** Note that as of 2.8 scala does not assume use of the java classpath.
> ** For the old behavior pass -usejavacp to scala, or if using a Settings
> ** object programmatically, settings.usejavacp.value = true.
>
> But when running:
> wget https://github.com/Azure/azure-documentdb-spark/raw/
> master/releases/azure-documentdb-spark-0.0.3_2.0.2_
> 2.11/azure-documentdb-1.10.0.jar
> wget https://github.com/Azure/azure-documentdb-spark/raw/
> master/releases/azure-documentdb-spark-0.0.3_2.0.2_
> 2.11/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
> spark-shell --master yarn --jars azure-documentdb-spark-0.0.3-
> SNAPSHOT.jar,azure-documentdb-1.10.0.jar
>
> it was up and running:
> spark-shell --master yarn --jars azure-documentdb-spark-0.0.3-
> SNAPSHOT.jar,azure-documentdb-1.10.0.jar
> SPARK_MAJOR_VERSION is set to 2, using Spark2
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel).
> 17/05/11 22:54:06 WARN SparkContext: Use an existing SparkContext, some
> configuration may not take effect.
> Spark context Web UI available at http://10.0.0.22:4040
> Spark context available as 'sc' (master = yarn, app id =
> application_1494248502247_0013).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.2.2.5.4.0-121
>   /_/
>
> Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala>
>
> HTH!
>
>
> On Wed, May 10, 2017 at 11:49 PM ayan guha  wrote:
>
>> Hi
>>
>> Thanks for reply, but unfortunately did not work. I am getting same error.
>>
>> sshuser@ed0-svochd:~/azure-spark-docdb-test$ spark-shell --jars
>> azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar
>> SPARK_MAJOR_VERSION is set to 2, using Spark2
>> Setting default log level to "WARN".
>> To adjust logging level use sc.setLogLevel(newLevel).
>> [init] error: error while loading , Error accessing
>> /home/sshuser/azure-spark-docdb-test/azure-documentdb-
>> spark-0.0.3-SNAPSHOT.jar
>>
>> Failed to initialize compiler: object java.lang.Object in compiler mirror
>> not found.
>> ** Note that as of 2.8 scala does not assume use of the java classpath.
>> ** For the old behavior pass -usejavacp to scala, or if using a Settings
>> ** object programmatically, settings.usejavacp.value = true.
>>
>> Failed to initialize compiler: object java.lang.Object in compiler mirror
>> not found.
>> ** Note that as of 2.8 scala does not assume use of the java classpath.
>> ** For the old behavior pass -usejavacp to scala, or if using a Settings
>> ** object programmatically, settings.usejavacp.value = true.
>> Exception in thread "main" java.lang.NullPointerException
>> at scala.reflect.internal.SymbolTable.exitingPhase(
>> SymbolTable.scala:256)
>> at scala.tools.nsc.interpreter.IMain$Request.x$20$lzycompute(
>> IMain.scala:896)
>> at scala.tools.nsc.interpreter.IMain$Request.x$20(IMain.
>> scala:895)
>> at scala.tools.nsc.interpreter.IMain$Request.headerPreamble$
>> lzycompute(IMain.scala:895)
>> at scala.tools.nsc.interpreter.IMain$Request.headerPreamble(
>> IMain.scala:895)
>> at scala.tools.nsc.interpreter.IMain$Request$Wrapper.
>> preamble(IMain.scala:918)
>> at scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$
>> apply$23.apply(IMain.scala:1337)
>> at scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$
>> apply$23.apply(IMain.scala:1336)
>> at scala.tools.nsc.util.package$.stringFromWriter(package.
>> scala:64)
>> at scala.tools.nsc.interpreter.IMain$CodeAssembler$class.
>> apply(IMain.scala:1336)
>> at 

Best Practice for Enum in Spark SQL

2017-05-11 Thread Mike Wheeler
Hi Spark Users,

I want to store Enum type (such as Vehicle Type: Car, SUV, Wagon)  in my
data. My storage format will be parquet and I need to access the data from
Spark-shell, Spark SQL CLI, and hive. My questions:

1) Should I store my Enum type as String or store it as numeric encoding
(aka 1=Car, 2=SUV, 3=Wagon)?

2) If I choose String, any penalty in hard drive space or memory?

Thank you!

Mike


Re: Spark Shell issue on HDInsight

2017-05-11 Thread Denny Lee
I was able to repro your issue when I had downloaded the jars via blob but
when I downloaded them as raw, I was able to get everything up and
running.  For example:

wget https://github.com/Azure/azure-documentdb-spark/*blob*
/master/releases/azure-documentdb-spark-0.0.3_2.0.2_2.11/azure-documentdb-1.10.0.jar
wget https://github.com/Azure/azure-documentdb-spark/*blob*
/master/releases/azure-documentdb-spark-0.0.3_2.0.2_2.11/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
spark-shell --master yarn --jars
azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar

resulted in the error:
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
[init] error: error while loading , Error accessing
/home/sshuser/jars/test/azure-documentdb-spark-0.0.3-SNAPSHOT.jar

Failed to initialize compiler: object java.lang.Object in compiler mirror
not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.

But when running:
wget
https://github.com/Azure/azure-documentdb-spark/raw/master/releases/azure-documentdb-spark-0.0.3_2.0.2_2.11/azure-documentdb-1.10.0.jar
wget
https://github.com/Azure/azure-documentdb-spark/raw/master/releases/azure-documentdb-spark-0.0.3_2.0.2_2.11/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
spark-shell --master yarn --jars
azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar

it was up and running:
spark-shell --master yarn --jars
azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/05/11 22:54:06 WARN SparkContext: Use an existing SparkContext, some
configuration may not take effect.
Spark context Web UI available at http://10.0.0.22:4040
Spark context available as 'sc' (master = yarn, app id =
application_1494248502247_0013).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.2.2.5.4.0-121
  /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

HTH!


On Wed, May 10, 2017 at 11:49 PM ayan guha  wrote:

> Hi
>
> Thanks for reply, but unfortunately did not work. I am getting same error.
>
> sshuser@ed0-svochd:~/azure-spark-docdb-test$ spark-shell --jars
> azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar
> SPARK_MAJOR_VERSION is set to 2, using Spark2
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel).
> [init] error: error while loading , Error accessing
> /home/sshuser/azure-spark-docdb-test/azure-documentdb-spark-0.0.3-SNAPSHOT.jar
>
> Failed to initialize compiler: object java.lang.Object in compiler mirror
> not found.
> ** Note that as of 2.8 scala does not assume use of the java classpath.
> ** For the old behavior pass -usejavacp to scala, or if using a Settings
> ** object programmatically, settings.usejavacp.value = true.
>
> Failed to initialize compiler: object java.lang.Object in compiler mirror
> not found.
> ** Note that as of 2.8 scala does not assume use of the java classpath.
> ** For the old behavior pass -usejavacp to scala, or if using a Settings
> ** object programmatically, settings.usejavacp.value = true.
> Exception in thread "main" java.lang.NullPointerException
> at
> scala.reflect.internal.SymbolTable.exitingPhase(SymbolTable.scala:256)
> at
> scala.tools.nsc.interpreter.IMain$Request.x$20$lzycompute(IMain.scala:896)
> at scala.tools.nsc.interpreter.IMain$Request.x$20(IMain.scala:895)
> at
> scala.tools.nsc.interpreter.IMain$Request.headerPreamble$lzycompute(IMain.scala:895)
> at
> scala.tools.nsc.interpreter.IMain$Request.headerPreamble(IMain.scala:895)
> at
> scala.tools.nsc.interpreter.IMain$Request$Wrapper.preamble(IMain.scala:918)
> at
> scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1337)
> at
> scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1336)
> at scala.tools.nsc.util.package$.stringFromWriter(package.scala:64)
> at
> scala.tools.nsc.interpreter.IMain$CodeAssembler$class.apply(IMain.scala:1336)
> at
> scala.tools.nsc.interpreter.IMain$Request$Wrapper.apply(IMain.scala:908)
> at
> scala.tools.nsc.interpreter.IMain$Request.compile$lzycompute(IMain.scala:1002)
> at
> scala.tools.nsc.interpreter.IMain$Request.compile(IMain.scala:997)
> at scala.tools.nsc.interpreter.IMain.compile(IMain.scala:579)
> at 

Matrix multiplication and cluster / partition / blocks configuration

2017-05-11 Thread John Compitello
Hey all, 

I’ve found myself in a position where I need to do a relatively large matrix 
multiply (at least, compared to what I normally have to do). I’m looking to 
multiply a 100k by 500k dense matrix by its transpose to yield 100k by 100k 
matrix. I’m trying to do this on Google Cloud, so I don’t have any real limits 
on cluster size or memory. However, I have no idea where to begin as far as 
number of cores / number of partitions / how big to make the block size for 
best performance. Is there anywhere where Spark users collect optimal 
configurations for methods relative to data input size? Does anyone have any 
suggestions? I’ve tried throwing 900 cores at a 100k by 100k matrix multiply 
with 1000 by 1000 sized blocks, and that seemed to hang forever and eventually 
fail. 

Thanks ,

John
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Spark consumes more memory

2017-05-11 Thread Anantharaman, Srinatha (Contractor)
Rick,

Thank you for the input. Now space issue is resolved. 
yarn.nodemanager.local.dirs and yarn.nodemanager.log.dirs was filling up.

For 5Gb of data why it should take 10 mins to load with 7-8 executors with 2 
cores and I also see all the executors memory is upto 7-20 GB
If 5 GB of data takes so much resources what will happen if I load 50 GB of data

I tried reducing the partitions to 64 but it takes more than 10 mins.

Is there any configuration which help me to improve loading process and consume 
less memory?

Regards,
~Sri

From: Rick Moritz [mailto:rah...@gmail.com]
Sent: Thursday, May 11, 2017 1:34 PM
To: Anantharaman, Srinatha (Contractor) ; 
user 
Subject: Re: Spark consumes more memory

I would try to track down the "no space left on device" - find out where that 
originates from, since you should be able to allocate 10 executors with 4 cores 
and 15GB RAM each quite easily. In that case,you may want to increase overhead, 
so yarn doesn't kill your executors.
Check that no local drives are filling up with temporary data, by runnning a 
watch df on all nodes,
Also check that no quotas are being enforced, and that your log-partitions 
aren't flowing over.

Depending on your disk and network speed, as well as the time it takes yarn to 
allocate resources and spark to initialize the spark context, 10 minutes 
doesn't sound too bad. Also, I don't think 150 partitions are a helpful 
partition size, if you have 7G RAM per executor, and aren't doing any joining 
or other memory intensive calculation. Try again with 64 partitions, and see if 
the reduced overhead helps.
Also, track which action/task are running longer than expected in SparkUI. That 
sohuld help ID where your bottleneck is located.

On Thu, May 11, 2017 at 5:46 PM, Anantharaman, Srinatha (Contractor) 
> 
wrote:
Hi,

I am reading a Hive Orc table into memory, StorageLevel is set to 
(StorageLevel.MEMORY_AND_DISK_SER)
Total size of the Hive table is 5GB
Started the spark-shell as below

spark-shell --master yarn --deploy-mode client --num-executors 8 
--driver-memory 5G --executor-memory 7G --executor-cores 2 --conf 
spark.yarn.executor.memoryOverhead=512
I have 10 node cluster each with 35 GB memory and 4 cores running on HDP 2.5
SPARK_LOCAL_DIRS location has enough space

My concern is below simple code to load data to memory takes approx. 10-12 mins.
If I change values for 
num-executors/driver-memory/executor-memory/executor-cores other than above 
mentioned I get “No space left on device” error
While running each nodes consumes varying size of memory from 7GB to 20 GB

import org.apache.spark.storage.StorageLevel


val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("SET hive.mapred.supports.subdirectories=true")
sqlContext.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")
val tab1 =  sqlContext.sql("select * from 
xyz").repartition(150).persist(StorageLevel.MEMORY_AND_DISK_SER)
tab1.registerTempTable("AUDIT")
tab1.count()

kindly advice how to improve the performance of loading Hive table to Spark 
memory and avoid the space issue

Regards,
~Sri



Re: Spark <--> S3 flakiness

2017-05-11 Thread Vadim Semenov
Use the official mailing list archive

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3ccajyeq0gh1fbhbajb9gghognhqouogydba28lnn262hfzzgf...@mail.gmail.com%3e

On Thu, May 11, 2017 at 2:50 PM, lucas.g...@gmail.com 
wrote:

> Also, and this is unrelated to the actual question... Why don't these
> messages show up in the archive?
>
> http://apache-spark-user-list.1001560.n3.nabble.com/
>
> Ideally I'd want to post a link to our internal wiki for these questions,
> but can't find them in the archive.
>
> On 11 May 2017 at 07:16, lucas.g...@gmail.com 
> wrote:
>
>> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
>> pretty sure I came across this blog and ignored it due to that.
>>
>> Any other thoughts?  The linked tickets in: https://issues.apache.org/
>> jira/browse/SPARK-10063 https://issues.apache.org/jira/brows
>> e/HADOOP-13786 https://issues.apache.org/jira/browse/HADOOP-9565 look
>> relevant too.
>>
>> On 10 May 2017 at 22:24, Miguel Morales  wrote:
>>
>>> Try using the DirectParquetOutputCommiter:
>>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>>
>>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>>>  wrote:
>>> > Hi users, we have a bunch of pyspark jobs that are using S3 for
>>> loading /
>>> > intermediate steps and final output of parquet files.
>>> >
>>> > We're running into the following issues on a semi regular basis:
>>> > * These are intermittent errors, IE we have about 300 jobs that run
>>> > nightly... And a fairly random but small-ish percentage of them fail
>>> with
>>> > the following classes of errors.
>>> >
>>> > S3 write errors
>>> >
>>> >> "ERROR Utils: Aborting task
>>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>>> AWS
>>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>>> Error
>>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>> >
>>> >
>>> >>
>>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>>> Code:
>>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>>> Error
>>> >> Message: One or more objects could not be deleted, S3 Extended
>>> Request ID:
>>> >> null"
>>> >
>>> >
>>> >
>>> > S3 Read Errors:
>>> >
>>> >> [Stage 1:=>
>>>  (27 + 4)
>>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in
>>> stage 1.0
>>> >> (TID 11)
>>> >> java.net.SocketException: Connection reset
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.
>>> java:884)
>>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >> at
>>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>>> ractSessionInputBuffer.java:198)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:178)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:200)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>>> ntLengthInputStream.java:103)
>>> >> at
>>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>>> nagedEntity.java:168)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>>> orInputStream.java:228)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>>> utStream.java:174)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>>> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream
>>> .java:187)
>>> >
>>> >
>>> >
>>> > We have literally tons of logs we can add but it would make the email
>>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>>> > something.
>>> >
>>> > Our config is along the lines of:
>>> >
>>> > spark-2.1.0-bin-hadoop2.7
>>> > '--packages
>>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>>> > pyspark-shell'
>>> >
>>> > Given the stack overflow / googling I've been doing I know we're not
>>> the
>>> > only org with these issues but I haven't found a good set of solutions
>>> in
>>> 

Re: Spark <--> S3 flakiness

2017-05-11 Thread Miguel Morales
Might want to try to use gzip as opposed to parquet.  The only way i
ever reliably got parquet to work on S3 is by using Alluxio as a
buffer, but it's a decent amount of work.

On Thu, May 11, 2017 at 11:50 AM, lucas.g...@gmail.com
 wrote:
> Also, and this is unrelated to the actual question... Why don't these
> messages show up in the archive?
>
> http://apache-spark-user-list.1001560.n3.nabble.com/
>
> Ideally I'd want to post a link to our internal wiki for these questions,
> but can't find them in the archive.
>
> On 11 May 2017 at 07:16, lucas.g...@gmail.com  wrote:
>>
>> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
>> pretty sure I came across this blog and ignored it due to that.
>>
>> Any other thoughts?  The linked tickets in:
>> https://issues.apache.org/jira/browse/SPARK-10063
>> https://issues.apache.org/jira/browse/HADOOP-13786
>> https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.
>>
>> On 10 May 2017 at 22:24, Miguel Morales  wrote:
>>>
>>> Try using the DirectParquetOutputCommiter:
>>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>>
>>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>>>  wrote:
>>> > Hi users, we have a bunch of pyspark jobs that are using S3 for loading
>>> > /
>>> > intermediate steps and final output of parquet files.
>>> >
>>> > We're running into the following issues on a semi regular basis:
>>> > * These are intermittent errors, IE we have about 300 jobs that run
>>> > nightly... And a fairly random but small-ish percentage of them fail
>>> > with
>>> > the following classes of errors.
>>> >
>>> > S3 write errors
>>> >
>>> >> "ERROR Utils: Aborting task
>>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>>> >> AWS
>>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>>> >> Error
>>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>> >
>>> >
>>> >>
>>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>>> >> Code:
>>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>>> >> Error
>>> >> Message: One or more objects could not be deleted, S3 Extended Request
>>> >> ID:
>>> >> null"
>>> >
>>> >
>>> >
>>> > S3 Read Errors:
>>> >
>>> >> [Stage 1:=>   (27
>>> >> + 4)
>>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>>> >> 1.0
>>> >> (TID 11)
>>> >> java.net.SocketException: Connection reset
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >> at
>>> >> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >> at
>>> >>
>>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
>>> >> at
>>> >>
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
>>> >> at
>>> >>
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
>>> >> at
>>> >>
>>> >> org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
>>> >> at
>>> >>
>>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
>>> >> at
>>> >>
>>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
>>> >> at
>>> >>
>>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>>> >> at
>>> >> org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)
>>> >
>>> >
>>> >
>>> > We have literally tons of logs we can add but it would make the email
>>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>>> > something.
>>> >
>>> > Our config is along the lines of:
>>> >
>>> > spark-2.1.0-bin-hadoop2.7
>>> > '--packages
>>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>>> > pyspark-shell'
>>> >
>>> > Given the stack overflow / googling I've been doing I know we're not
>>> > the
>>> > only org with these issues but I 

Re: Spark <--> S3 flakiness

2017-05-11 Thread lucas.g...@gmail.com
Also, and this is unrelated to the actual question... Why don't these
messages show up in the archive?

http://apache-spark-user-list.1001560.n3.nabble.com/

Ideally I'd want to post a link to our internal wiki for these questions,
but can't find them in the archive.

On 11 May 2017 at 07:16, lucas.g...@gmail.com  wrote:

> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
> pretty sure I came across this blog and ignored it due to that.
>
> Any other thoughts?  The linked tickets in: https://issues.apache.org/
> jira/browse/SPARK-10063 https://issues.apache.org/jira/browse/HADOOP-13786
>  https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.
>
> On 10 May 2017 at 22:24, Miguel Morales  wrote:
>
>> Try using the DirectParquetOutputCommiter:
>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>
>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>>  wrote:
>> > Hi users, we have a bunch of pyspark jobs that are using S3 for loading
>> /
>> > intermediate steps and final output of parquet files.
>> >
>> > We're running into the following issues on a semi regular basis:
>> > * These are intermittent errors, IE we have about 300 jobs that run
>> > nightly... And a fairly random but small-ish percentage of them fail
>> with
>> > the following classes of errors.
>> >
>> > S3 write errors
>> >
>> >> "ERROR Utils: Aborting task
>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>> AWS
>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>> Error
>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>> >
>> >
>> >>
>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>> Code:
>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>> Error
>> >> Message: One or more objects could not be deleted, S3 Extended Request
>> ID:
>> >> null"
>> >
>> >
>> >
>> > S3 Read Errors:
>> >
>> >> [Stage 1:=>   (27
>> + 4)
>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
>> 1.0
>> >> (TID 11)
>> >> java.net.SocketException: Connection reset
>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.
>> java:884)
>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> >> at
>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>> ractSessionInputBuffer.java:198)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>> tLengthInputStream.java:178)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>> tLengthInputStream.java:200)
>> >> at
>> >> org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>> ntLengthInputStream.java:103)
>> >> at
>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>> nagedEntity.java:168)
>> >> at
>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>> orInputStream.java:228)
>> >> at
>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>> utStream.java:174)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream
>> .java:187)
>> >
>> >
>> >
>> > We have literally tons of logs we can add but it would make the email
>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>> > something.
>> >
>> > Our config is along the lines of:
>> >
>> > spark-2.1.0-bin-hadoop2.7
>> > '--packages
>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>> > pyspark-shell'
>> >
>> > Given the stack overflow / googling I've been doing I know we're not the
>> > only org with these issues but I haven't found a good set of solutions
>> in
>> > those spaces yet.
>> >
>> > Thanks!
>> >
>> > Gary Lucas
>>
>
>


Re: Spark consumes more memory

2017-05-11 Thread Rick Moritz
I would try to track down the "no space left on device" - find out where
that originates from, since you should be able to allocate 10 executors
with 4 cores and 15GB RAM each quite easily. In that case,you may want to
increase overhead, so yarn doesn't kill your executors.
Check that no local drives are filling up with temporary data, by runnning
a watch df on all nodes,
Also check that no quotas are being enforced, and that your log-partitions
aren't flowing over.

Depending on your disk and network speed, as well as the time it takes yarn
to allocate resources and spark to initialize the spark context, 10 minutes
doesn't sound too bad. Also, I don't think 150 partitions are a helpful
partition size, if you have 7G RAM per executor, and aren't doing any
joining or other memory intensive calculation. Try again with 64
partitions, and see if the reduced overhead helps.
Also, track which action/task are running longer than expected in SparkUI.
That sohuld help ID where your bottleneck is located.

On Thu, May 11, 2017 at 5:46 PM, Anantharaman, Srinatha (Contractor) <
srinatha_ananthara...@comcast.com> wrote:

> Hi,
>
>
>
> I am reading a Hive Orc table into memory, StorageLevel is set to
> (StorageLevel.MEMORY_AND_DISK_SER)
>
> Total size of the Hive table is 5GB
>
> Started the spark-shell as below
>
>
>
> spark-shell --master yarn --deploy-mode client --num-executors 8
> --driver-memory 5G --executor-memory 7G --executor-cores 2 --conf
> spark.yarn.executor.memoryOverhead=512
>
> I have 10 node cluster each with 35 GB memory and 4 cores running on HDP
> 2.5
>
> SPARK_LOCAL_DIRS location has enough space
>
>
>
> My concern is below simple code to load data to memory takes approx. 10-12
> mins.
>
> If I change values for 
> num-executors/driver-memory/executor-memory/executor-cores
> other than above mentioned I get “No space left on device” error
>
> While running each nodes consumes varying size of memory from 7GB to 20 GB
>
>
>
> import org.apache.spark.storage.StorageLevel
>
>
>
>
>
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> sqlContext.sql("SET hive.mapred.supports.subdirectories=true")
>
> sqlContext.sql("SET mapreduce.input.fileinputformat.input.dir.
> recursive=true")
>
> val tab1 =  sqlContext.sql("select * from xyz").repartition(150).
> persist(StorageLevel.MEMORY_AND_DISK_SER)
>
> tab1.registerTempTable("AUDIT")
>
> tab1.count()
>
>
>
> kindly advice how to improve the performance of loading Hive table to
> Spark memory and avoid the space issue
>
>
>
> Regards,
>
> ~Sri
>


Spark consumes more memory

2017-05-11 Thread Anantharaman, Srinatha (Contractor)
Hi,

I am reading a Hive Orc table into memory, StorageLevel is set to 
(StorageLevel.MEMORY_AND_DISK_SER)
Total size of the Hive table is 5GB
Started the spark-shell as below

spark-shell --master yarn --deploy-mode client --num-executors 8 
--driver-memory 5G --executor-memory 7G --executor-cores 2 --conf 
spark.yarn.executor.memoryOverhead=512
I have 10 node cluster each with 35 GB memory and 4 cores running on HDP 2.5
SPARK_LOCAL_DIRS location has enough space

My concern is below simple code to load data to memory takes approx. 10-12 mins.
If I change values for 
num-executors/driver-memory/executor-memory/executor-cores other than above 
mentioned I get "No space left on device" error
While running each nodes consumes varying size of memory from 7GB to 20 GB

import org.apache.spark.storage.StorageLevel


val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("SET hive.mapred.supports.subdirectories=true")
sqlContext.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")
val tab1 =  sqlContext.sql("select * from 
xyz").repartition(150).persist(StorageLevel.MEMORY_AND_DISK_SER)
tab1.registerTempTable("AUDIT")
tab1.count()

kindly advice how to improve the performance of loading Hive table to Spark 
memory and avoid the space issue

Regards,
~Sri


BinaryClassificationMetrics only supports AreaUnderPR and AreaUnderROC?

2017-05-11 Thread Lan Jiang
I realized that in the Spark ML, BinaryClassifcationMetrics only supports
AreaUnderPR and AreaUnderROC. Why is that? I

What if I need other metrics such as F-score, accuracy? I tried to use
MulticlassClassificationEvaluator to evaluate other metrics such as
Accuracy for a binary classification problem and it seems working. But I am
not sure if there is any issue using MulticlassClassificationEvaluator for
a binary classification. According to the Spark ML documentation "The
Evaluator can be a RegressionEvaluator for regression problems, *a
BinaryClassificationEvaluator for binary data, or a
MulticlassClassificationEvaluator for multiclass problems*. "

https://spark.apache.org/docs/2.1.0/ml-tuning.html

Can someone shed some lights on the issue?

Lan


Re: Spark <--> S3 flakiness

2017-05-11 Thread lucas.g...@gmail.com
Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
pretty sure I came across this blog and ignored it due to that.

Any other thoughts?  The linked tickets in:
https://issues.apache.org/jira/browse/SPARK-10063
https://issues.apache.org/jira/browse/HADOOP-13786
https://issues.apache.org/jira/browse/HADOOP-9565 look relevant too.

On 10 May 2017 at 22:24, Miguel Morales  wrote:

> Try using the DirectParquetOutputCommiter:
> http://dev.sortable.com/spark-directparquetoutputcommitter/
>
> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>  wrote:
> > Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
> > intermediate steps and final output of parquet files.
> >
> > We're running into the following issues on a semi regular basis:
> > * These are intermittent errors, IE we have about 300 jobs that run
> > nightly... And a fairly random but small-ish percentage of them fail with
> > the following classes of errors.
> >
> > S3 write errors
> >
> >> "ERROR Utils: Aborting task
> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
> AWS
> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
> Error
> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
> >
> >
> >>
> >> "Py4JJavaError: An error occurred while calling o43.parquet.
> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
> Code:
> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
> Error
> >> Message: One or more objects could not be deleted, S3 Extended Request
> ID:
> >> null"
> >
> >
> >
> > S3 Read Errors:
> >
> >> [Stage 1:=>   (27
> + 4)
> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage
> 1.0
> >> (TID 11)
> >> java.net.SocketException: Connection reset
> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(
> SSLSocketImpl.java:884)
> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> >> at
> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(
> AbstractSessionInputBuffer.java:198)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:178)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.read(
> ContentLengthInputStream.java:200)
> >> at
> >> org.apache.http.impl.io.ContentLengthInputStream.close(
> ContentLengthInputStream.java:103)
> >> at
> >> org.apache.http.conn.BasicManagedEntity.streamClosed(
> BasicManagedEntity.java:168)
> >> at
> >> org.apache.http.conn.EofSensorInputStream.checkClose(
> EofSensorInputStream.java:228)
> >> at
> >> org.apache.http.conn.EofSensorInputStream.close(
> EofSensorInputStream.java:174)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(
> S3AInputStream.java:187)
> >
> >
> >
> > We have literally tons of logs we can add but it would make the email
> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
> > something.
> >
> > Our config is along the lines of:
> >
> > spark-2.1.0-bin-hadoop2.7
> > '--packages
> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
> > pyspark-shell'
> >
> > Given the stack overflow / googling I've been doing I know we're not the
> > only org with these issues but I haven't found a good set of solutions in
> > those spaces yet.
> >
> > Thanks!
> >
> > Gary Lucas
>


Re: running spark program on intellij connecting to remote master for cluster

2017-05-11 Thread s t

Hello David,

Let me make it more clear;


  *   There is not any spark installed on windows laptop, just the intellij and 
the related dependencies.
  *   SparkLauncher is good starting point for submitting a job programatically 
but  i am not sure if my current problem is related with job execution strategy
  *   I am not even using spark-submit
  *   I have one notebook running intellij and the below code, one master 
ubuntu and two slaves ubuntu on my network.

Here is the code :

import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

/**
 * Created by serkan on 23.04.2017.
 */
object KinesisStreamCluster {



 val spark:SparkSession = SparkSession.builder()
   .config("spark.jars.packages",
"org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0")
   .master(“spark://xxx.xxx.xxx.xxx:7077”).getOrCreate()

 def main(args: Array[String]): Unit = {

   val ssc = new StreamingContext(spark.sparkContext, Seconds(20))

   val kinesisCheckpointInterval = Milliseconds(2)

   val kinesisStream = KinesisUtils.createStream(ssc, “scala",
"stream_name_for_kinesis", 
"kinesis.eu-west-1.amazonaws.com", 
"eu-west-1",
InitialPositionInStream.LATEST, kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2)

   kinesisStream.print()

   ssc.start()
   ssc.awaitTermination()

 }

}


and getting error while executing the code :



17/05/11 09:52:25 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70,
10.240.65.189, executor 1): java.io.IOException:
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kinesis.KinesisReceiver
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:258)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



I have modifed the spark-defaults.conf on master and slaves and inserted the 
definition below but nothing changed.


spark.jars.packagesorg.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0



Nothing changed !

David Kaczynski > şunları 
yazdı (10 May 2017 15:29):

Do you have Spark installed locally on your laptop with IntelliJ?  Are you 
using the SparkLauncher class or your local spark-submit script?  A while back, 
I was trying to submit a spark job from my local workstation to a remote 
cluster using the SparkLauncher class, but I didn't actually have SPARK_HOME 
set or the spark-submit script on my local machine yet, so the submit was 
failing.  I think the error I was getting was that SPARK_HOME environment 
variable was not set, though.

On Wed, May 10, 2017 at 5:51 AM s t 
> wrote:
Hello,

I am trying to run spark code from my laptop with intellij. I have cluster of 2 
nodes and a master. When i start the program from intellij it gets error of 
some missing classes.

I am aware that some jars need to be distributed to the workers but do not know 
if it is possible programatically. spark submit or jupyter notebook handles the 
issue but intellij does not.

can any one give some advices to me ?
-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Spark Shell issue on HDInsight

2017-05-11 Thread ayan guha
Hi

Thanks for reply, but unfortunately did not work. I am getting same error.

sshuser@ed0-svochd:~/azure-spark-docdb-test$ spark-shell --jars
azure-documentdb-spark-0.0.3-SNAPSHOT.jar,azure-documentdb-1.10.0.jar
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
[init] error: error while loading , Error accessing
/home/sshuser/azure-spark-docdb-test/azure-documentdb-spark-0.0.3-SNAPSHOT.jar

Failed to initialize compiler: object java.lang.Object in compiler mirror
not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.

Failed to initialize compiler: object java.lang.Object in compiler mirror
not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.
Exception in thread "main" java.lang.NullPointerException
at
scala.reflect.internal.SymbolTable.exitingPhase(SymbolTable.scala:256)
at
scala.tools.nsc.interpreter.IMain$Request.x$20$lzycompute(IMain.scala:896)
at scala.tools.nsc.interpreter.IMain$Request.x$20(IMain.scala:895)
at
scala.tools.nsc.interpreter.IMain$Request.headerPreamble$lzycompute(IMain.scala:895)
at
scala.tools.nsc.interpreter.IMain$Request.headerPreamble(IMain.scala:895)
at
scala.tools.nsc.interpreter.IMain$Request$Wrapper.preamble(IMain.scala:918)
at
scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1337)
at
scala.tools.nsc.interpreter.IMain$CodeAssembler$$anonfun$apply$23.apply(IMain.scala:1336)
at scala.tools.nsc.util.package$.stringFromWriter(package.scala:64)
at
scala.tools.nsc.interpreter.IMain$CodeAssembler$class.apply(IMain.scala:1336)
at
scala.tools.nsc.interpreter.IMain$Request$Wrapper.apply(IMain.scala:908)
at
scala.tools.nsc.interpreter.IMain$Request.compile$lzycompute(IMain.scala:1002)
at
scala.tools.nsc.interpreter.IMain$Request.compile(IMain.scala:997)
at scala.tools.nsc.interpreter.IMain.compile(IMain.scala:579)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:567)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at
scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
at
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)
at
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at
org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
at
org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)
at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:94)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:68)
at org.apache.spark.repl.Main$.main(Main.scala:51)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
sshuser@ed0-svochd:~/azure-spark-docdb-test$


On Mon, May 8, 2017 at 11:50 PM, Denny Lee  wrote:

> This appears to be an issue with the Spark to DocumentDB connector,
> specifically version 0.0.1. Could you run the 0.0.3 version of the jar and
> see if you're still getting the same error?  i.e.
>
> spark-shell --master yarn --jars azure-documentdb-spark-0.0.3-
> 

Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

2017-05-11 Thread Revin Chalil
I am trying to convert avro records with field type = bytes to json string 
using Structured Streaming in Spark 2.1. Please see below.


object AvroConvert {

  case class KafkaMessage(
   payload: String
 )

  val schemaString ="""{
"type" : "record",
"name" : "HdfsEvent",
"namespace" : "com.abc.def.domain.hdfs",
"fields" : [ {
  "name" : "payload",
  "type" : {
"type" : "bytes",
"java-class" : "[B"
  }
} ]
  }"""
  val messageSchema = new Schema.Parser().parse(schemaString)
  val reader = new GenericDatumReader[GenericRecord](messageSchema)
  // Binary decoder
  val decoder = DecoderFactory.get()
  // Register implicit encoder for map operation
  implicit val encoder: Encoder[GenericRecord] = 
org.apache.spark.sql.Encoders.kryo[GenericRecord]

  def main(args: Array[String]) {

val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";

// Get Spark session
val session = SparkSession
  .builder
  .master("local[*]")
  .appName("myapp")
  .getOrCreate()

// Load streaming data
import session.implicits._

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .map(d => {
val rec = reader.read(null, decoder.binaryDecoder(d, null))
val payload = rec.get("payload").asInstanceOf[Byte].toString
new KafkaMessage(payload)
  })

val query = data.writeStream
  .outputMode("Append")
  .format("console")
  .start()

query.awaitTermination()
  }
}


I am getting the below error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40

at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)

at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)

at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)

at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)

at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)

at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)

at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)

at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)

at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:99)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


I read suggestions to use DataFileReader instead of binaryDecoder as below but 
was was not successful using this in scala.


DatumReader datumReader = new 
SpecificDatumReader(schema);

DataFileStream dataFileReader = new 
DataFileStream(inputStream, datumReader);


Once the Byte type "payload" is converted to json, I plan write it back to 
another topic of kafka.

Any help on this is much appreciated. Thank you!

Revin