Re: is it ok to have multiple sparksession's in one spark structured streaming app?

2017-09-08 Thread Paul
You would set the Kafka topic as your data source and you would write a custom 
output to Cassandra everything would be or could be contained within your 
stream 

-Paul

Sent from my iPhone

> On Sep 8, 2017, at 2:52 PM, kant kodali  wrote:
> 
> How can I use one SparkSession to talk to both Kafka and Cassandra let's say?
> 
> 
>> On Fri, Sep 8, 2017 at 3:46 AM, Arkadiusz Bicz  
>> wrote:
>> You don't need multiple spark sessions to have more than one stream working, 
>> but from maintenance and reliability perspective it is not good idea. 
>> 
>>> On Thu, Sep 7, 2017 at 2:40 AM, kant kodali  wrote:
>>> Hi All,
>>> 
>>> I am wondering if it is ok to have multiple sparksession's in one spark 
>>> structured streaming app? Basically, I want to create 1) Spark session for 
>>> reading from Kafka and 2) Another Spark session for storing the mutations 
>>> of a dataframe/dataset to a persistent table as I get the mutations from 
>>> #1? 
>>> 
>>> Finally, is this a common practice?
>>> 
>>> Thanks,
>>> kant
>> 
> 


Re: Multiple Kafka topics processing in Spark 2.2

2017-09-08 Thread Dan Dong
Hi,Alonso.
  Thanks! I've read about this but did not quite understand it. To pick out
the topic name of a kafka message seems a simple task but the example code
looks so complicated with redundent info. Why do we need offsetRanges here
and do we have a easy way to achieve this?

Cheers,
Dan


2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman :

> Hi, reading the official doc
> ,
> i think you can do it this way:
>
> import org.apache.spark.streaming.kafka._
>
>val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>
>   ssc, kafkaParams, topicsSet)
>
>
>  // Hold a reference to the current offset ranges, so it can be used 
> downstream
>  var offsetRanges = Array.empty[OffsetRange]
>
>  directKafkaStream.transform { rdd =>
>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>rdd
>  }.map {
>...
>  }.foreachRDD { rdd =>
>for (o <- offsetRanges) {
>  println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>}
>
>  }
>
>
> 2017-09-06 14:38 GMT+02:00 Dan Dong :
>
>> Hi, All,
>>   I have one issue here about how to process multiple Kafka topics in a
>> Spark 2.* program. My question is: How to get the topic name from a message
>> received from Kafka? E.g:
>>
>> ..
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>> // Get the lines, split them into words, count the words and print
>> val lines = messages.map(_._2)
>> val words = lines.flatMap(_.split(" "))
>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>> wordCounts.print()
>> ..
>>
>> Kafka send the messages in multiple topics through console producer for
>> example. But when Spark receive the message, how it will know which topic
>> is this piece of message coming from? Thanks a lot for any of your helps!
>>
>> Cheers,
>> Dan
>>
>
>
>
> --
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>


Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-09-08 Thread Karthik Palaniappan
For posterity, I found the root cause and filed a JIRA: 
https://issues.apache.org/jira/browse/SPARK-21960. I plan to open a pull 
request with the minor fix.

From: Karthik Palaniappan
Sent: Friday, September 1, 2017 9:49 AM
To: Akhil Das
Cc: user@spark.apache.org; t...@databricks.com
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Any ideas @Tathagata? I'd be happy to contribute a patch if you can point me in 
the right direction.

From: Karthik Palaniappan 
Sent: Friday, August 25, 2017 9:15 AM
To: Akhil Das
Cc: user@spark.apache.org; t...@databricks.com
Subject: RE: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

You have to set spark.executor.instances=0 in a streaming application with 
dynamic allocation: 
https://github.com/tdas/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L207.
 I originally had it set to a positive value, and explicitly set it to 0 after 
hitting that error.

Setting executor cores > 1 seems like reasonable advice in general, but that 
shouldn’t be my issue here, right?

From: Akhil Das
Sent: Thursday, August 24, 2017 2:34 AM
To: Karthik Palaniappan
Cc: user@spark.apache.org; 
t...@databricks.com
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Have you tried setting spark.executor.instances=0 to a positive non-zero value? 
Also, since its a streaming application set executor cores > 1.

On Wed, Aug 23, 2017 at 3:38 AM, Karthik Palaniappan 
> wrote:

I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started 
ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.



--
Cheers!




Multiple vcores per container when running Spark applications in Yarn cluster mode

2017-09-08 Thread Xiaoye Sun
Hi,

I am using Spark 1.6.1 and Yarn 2.7.4.
I want to submit a Spark application to a Yarn cluster. However, I found
that the number of vcores assigned to a container/executor is always 1,
even if I set spark.executor.cores=2. I also found the number of tasks an
executor runs concurrently is 2. So, it seems that Spark knows that an
executor/container has two CPU cores but the request is not correctly sent
to Yarn resource scheduler. I am using
the 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
on Yarn.

I am wondering that is it possible to assign multiple vcores to a container
when a Spark job is submitted to a Yarn cluster in yarn-cluster mode.

Thanks!
Best,
Xiaoye


Re: CSV write to S3 failing silently with partial completion

2017-09-08 Thread Steve Loughran

On 7 Sep 2017, at 18:36, Mcclintic, Abbi 
> wrote:

Thanks all – couple notes below.

Generally all our partitions are of equal size (ie on a normal day in this 
particular case I see 10 equally sized partitions of 2.8 GB). We see the 
problem with repartitioning and without – in this example we are repartitioning 
to 10 but we also see the problem without any repartitioning when the default 
partition count is 200. We know that data loss is occurring because we have a 
final quality gate that counts the number of rows and halts the process if we 
see too large of a drop.

We have one use case where the data needs to be read on a local machine after 
processing and one use case of copying to redshift. Regarding the redshift 
copy, it gets a bit complicated owing to VPC and encryption requirements so we 
haven’t looked into using the JDBC driver yet.

My understanding was that Amazon EMR does not support 
s3a,
 but it may be worth looking into.

1. No, it doesn't
2. You can't currently use s3a as a direct destination of work due to s3 not 
being consistent, not without a consistency layer on top (S3Guard, etc)

We may also try a combination of writing to HDFS combined with s3distcp.


+1




Re: is it ok to have multiple sparksession's in one spark structured streaming app?

2017-09-08 Thread kant kodali
How can I use one SparkSession to talk to both Kafka and Cassandra let's
say?


On Fri, Sep 8, 2017 at 3:46 AM, Arkadiusz Bicz 
wrote:

> You don't need multiple spark sessions to have more than one stream
> working, but from maintenance and reliability perspective it is not good
> idea.
>
> On Thu, Sep 7, 2017 at 2:40 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> I am wondering if it is ok to have multiple sparksession's in one spark
>> structured streaming app? Basically, I want to create 1) Spark session for
>> reading from Kafka and 2) Another Spark session for storing the mutations
>> of a dataframe/dataset to a persistent table as I get the mutations from
>> #1?
>>
>> Finally, is this a common practice?
>>
>> Thanks,
>> kant
>>
>
>


SPARK CSV ISSUE

2017-09-08 Thread Gourav Sengupta
Hi,

According to this thread https://issues.apache.org/jira/browse/SPARK-11374.
SPARK will not resolve the issue of skipping header option when the table
is defined in HIVE.

But I am unable to see a SPARK SQL option for setting up external
partitioned table.

Does that mean in case I have to create an external partitioned table I
must use HIVE and when I use HIVE SPARK does not allow me to ignore the
headers?


Regards,
Gourav Sengupta


Re: [Spark Core] excessive read/load times on parquet files in 2.2 vs 2.0

2017-09-08 Thread Matthew Anthony
The code is as simple as calling `data = spark.read.parquet(address.)`. I can't give you the actual address I'm reading from for 
security reasons. Is there something else I can provide? We're using 
standard EMR images with Hive and Spark installed.



On 9/8/17 11:00 AM, Neil Jonkers wrote:

Can you provide a code sample please?

On Fri, Sep 8, 2017 at 5:44 PM, Matthew Anthony > wrote:


Hi all -


since upgrading to 2.2.0, we've noticed a significant increase in
read.parquet(...) ops. The parquet files are being read from S3.
Upon entry at the interactive terminal (pyspark in this case), the
terminal will sit "idle" for several minutes (as many as 10)
before returning:


"17/09/08 15:34:37 WARN SharedInMemoryCache: Evicting cached table
partition metadata from memory due to size constraints
(spark.sql.hive.filesourcePartitionFileCacheSize = 20
bytes). This may impact query planning performance."


In the spark UI, there are no jobs being run during this idle
period. Subsequently, a short 1-task job lasting approximately 10
seconds runs, and then another idle time of roughly 2-3 minutes
follows thereafter before returning to the terminal/CLI.


Can someone explain what is happening here in the background? Is
there a misconfiguration we should be looking for? We are using
Hive metastore on the EMR cluster.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org







Re: [Spark Core] excessive read/load times on parquet files in 2.2 vs 2.0

2017-09-08 Thread Neil Jonkers
Can you provide a code sample please?

On Fri, Sep 8, 2017 at 5:44 PM, Matthew Anthony  wrote:

> Hi all -
>
>
> since upgrading to 2.2.0, we've noticed a significant increase in
> read.parquet(...) ops. The parquet files are being read from S3. Upon entry
> at the interactive terminal (pyspark in this case), the terminal will sit
> "idle" for several minutes (as many as 10) before returning:
>
>
> "17/09/08 15:34:37 WARN SharedInMemoryCache: Evicting cached table
> partition metadata from memory due to size constraints
> (spark.sql.hive.filesourcePartitionFileCacheSize = 20 bytes).
> This may impact query planning performance."
>
>
> In the spark UI, there are no jobs being run during this idle period.
> Subsequently, a short 1-task job lasting approximately 10 seconds runs, and
> then another idle time of roughly 2-3 minutes follows thereafter before
> returning to the terminal/CLI.
>
>
> Can someone explain what is happening here in the background? Is there a
> misconfiguration we should be looking for? We are using Hive metastore on
> the EMR cluster.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Core] excessive read/load times on parquet files in 2.2 vs 2.0

2017-09-08 Thread Matthew Anthony

Hi all -


since upgrading to 2.2.0, we've noticed a significant increase in 
read.parquet(...) ops. The parquet files are being read from S3. Upon 
entry at the interactive terminal (pyspark in this case), the terminal 
will sit "idle" for several minutes (as many as 10) before returning:



"17/09/08 15:34:37 WARN SharedInMemoryCache: Evicting cached table 
partition metadata from memory due to size constraints 
(spark.sql.hive.filesourcePartitionFileCacheSize = 20 bytes). 
This may impact query planning performance."



In the spark UI, there are no jobs being run during this idle period. 
Subsequently, a short 1-task job lasting approximately 10 seconds runs, 
and then another idle time of roughly 2-3 minutes follows thereafter 
before returning to the terminal/CLI.



Can someone explain what is happening here in the background? Is there a 
misconfiguration we should be looking for? We are using Hive metastore 
on the EMR cluster.



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CVE-2017-12612 Unsafe deserialization in Apache Spark launcher API

2017-09-08 Thread Sean Owen
Severity: Medium

Vendor: The Apache Software Foundation

Versions Affected:
Versions of Apache Spark from 1.6.0 until 2.1.1

Description:
In Apache Spark 1.6.0 until 2.1.1, the launcher API performs unsafe
deserialization of data received by  its socket. This makes applications
launched programmatically using the launcher API potentially
vulnerable to arbitrary code execution by an attacker with access to any
user
account on the local machine. It does not affect apps run by spark-submit or
spark-shell. The attacker would be able to execute code as the user that ran
the Spark application. Users are encouraged to update to version 2.2.0 or
later.

Mitigation:
Update to Apache Spark 2.2.0 or later.

Credit:
Aditya Sharad, Semmle


Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Sunita Arvind
Thanks for your response Praneeth. We did consider Kafka however cost was
the only hold back factor as we might need a larger cluster and existing
cluster is on premise and my app is on cloud. So the same cluster cannot be
used.
But I agree it does sound like a good alternative.

Regards
Sunita

On Thu, Sep 7, 2017 at 11:24 PM Praneeth Gayam 
wrote:

> With file stream you will have to deal with the following
>
>1. The file(s) must not be changed once created. So if the files are
>being continuously appended, the new data will not be read. Refer
>
> 
>2. The files must be created in the dataDirectory by atomically
>*moving* or *renaming* them into the data directory.
>
> Since the latency requirements for the second job in the chain is only a
> few mins, you may have to end up creating a new file every few mins
>
> You may want to consider Kafka as your intermediary store for building a
> chain/DAG of streaming jobs
>
> On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind 
> wrote:
>
>> Thanks for your response Michael
>> Will try it out.
>>
>> Regards
>> Sunita
>>
>> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
>> wrote:
>>
>>> If you use structured streaming and the file sink, you can have a
>>> subsequent stream read using the file source.  This will maintain exactly
>>> once processing even if there are hiccups or failures.
>>>
>>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>>> wrote:
>>>
 Hello Spark Experts,

 I have a design question w.r.t Spark Streaming. I have a streaming job
 that consumes protocol buffer encoded real time logs from a Kafka cluster
 on premise. My spark application runs on EMR (aws) and persists data onto
 s3. Before I persist, I need to strip header and convert protobuffer to
 parquet (I use sparksql-scalapb to convert from Protobuff to
 Spark.sql.Row). I need to persist Raw logs as is. I can continue the
 enrichment on the same dataframe after persisting the raw data, however, in
 order to modularize I am planning to have a separate job which picks up the
 raw data and performs enrichment on it. Also,  I am trying to avoid all in
 1 job as the enrichments could get project specific while raw data
 persistence stays customer/project agnostic.The enriched data is allowed to
 have some latency (few minutes)

 My challenge is, after persisting the raw data, how do I chain the next
 streaming job. The only way I can think of is -  job 1 (raw data)
 partitions on current date (MMDD) and within current date, the job 2
 (enrichment job) filters for records within 60s of current time and
 performs enrichment on it in 60s batches.
 Is this a good option? It seems to be error prone. When either of the
 jobs get delayed due to bursts or any error/exception this could lead to
 huge data losses and non-deterministic behavior . What are other
 alternatives to this?

 Appreciate any guidance in this regard.

 regards
 Sunita Koppar

>>>
>>>
>


Re: is it ok to have multiple sparksession's in one spark structured streaming app?

2017-09-08 Thread Arkadiusz Bicz
You don't need multiple spark sessions to have more than one stream
working, but from maintenance and reliability perspective it is not good
idea.

On Thu, Sep 7, 2017 at 2:40 AM, kant kodali  wrote:

> Hi All,
>
> I am wondering if it is ok to have multiple sparksession's in one spark
> structured streaming app? Basically, I want to create 1) Spark session for
> reading from Kafka and 2) Another Spark session for storing the mutations
> of a dataframe/dataset to a persistent table as I get the mutations from
> #1?
>
> Finally, is this a common practice?
>
> Thanks,
> kant
>


Re: graphframe out of memory

2017-09-08 Thread Imran Rajjad
No I did not, I thought Spark would take care of that itself since I have
put in the arguments.

On Thu, Sep 7, 2017 at 9:26 PM, Lukas Bradley 
wrote:

> Did you also increase the size of the heap of the Java app that is
> starting Spark?
>
> https://alvinalexander.com/blog/post/java/java-xmx-xms-
> memory-heap-size-control
>
> On Thu, Sep 7, 2017 at 12:16 PM, Imran Rajjad  wrote:
>
>> I am getting Out of Memory error while running connectedComponents job on
>> graph with around 12000 vertices and 134600 edges.
>> I am running spark in embedded mode in a standalone Java application and
>> have tried to increase the memory but it seems that its not taking any
>> effect
>>
>> sparkConf = new SparkConf().setAppName("SOME APP
>> NAME").setMaster("local[10]")
>> .set("spark.executor.memory","5g")
>> .set("spark.driver.memory","8g")
>> .set("spark.driver.maxResultSize","1g")
>> .set("spark.sql.warehouse.dir", "file:///d:/spark/tmp")
>> .set("hadoop.home.dir", "file:///D:/spark-2.1.0-bin-hadoop2.7/bin");
>>
>>   spark = SparkSession.builder().config(sparkConf).getOrCreate();
>>   spark.sparkContext().setLogLevel("ERROR");
>>   spark.sparkContext().setCheckpointDir("D:/spark/tmp");
>>
>> the stack trace
>> java.lang.OutOfMemoryError: Java heap space
>>  at java.util.Arrays.copyOf(Arrays.java:3332)
>>  at java.lang.AbstractStringBuilder.ensureCapacityInternal(Abstr
>> actStringBuilder.java:124)
>>  at java.lang.AbstractStringBuilder.append(AbstractStringBuilder
>> .java:448)
>>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>>  at scala.StringContext.standardInterpolator(StringContext.scala:126)
>>  at scala.StringContext.s(StringContext.scala:95)
>>  at org.apache.spark.sql.execution.QueryExecution.toString(
>> QueryExecution.scala:230)
>>  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutio
>> nId(SQLExecution.scala:54)
>>  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
>>  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$e
>> xecute$1(Dataset.scala:2385)
>>  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$D
>> ataset$$collect$1.apply(Dataset.scala:2390)
>>  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$D
>> ataset$$collect$1.apply(Dataset.scala:2390)
>>  at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801)
>>  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$c
>> ollect(Dataset.scala:2390)
>>  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2366)
>>  at org.graphframes.lib.ConnectedComponents$.skewedJoin(Connecte
>> dComponents.scala:239)
>>  at org.graphframes.lib.ConnectedComponents$.org$graphframes$
>> lib$ConnectedComponents$$run(ConnectedComponents.scala:308)
>>  at org.graphframes.lib.ConnectedComponents.run(ConnectedCompone
>> nts.scala:139)
>>
>> GraphFrame version is 0.5.0 and Spark version is 2.1.1
>>
>> regards,
>> Imran
>>
>> --
>> I.R
>>
>
>


-- 
I.R


Wish you give our product a wonderful name

2017-09-08 Thread Jone Zhang
We have built an an ml platform, based on open source framework like
hadoop, spark, tensorflow. Now we need to give our product a wonderful
name, and eager for everyone's advice.

Any answers will be greatly appreciated.
Thanks.


[no subject]

2017-09-08 Thread PICARD Damien
Hi !

I'm facing a Classloader problem using Spark 1.5.1

I use javax.validation and hibernate validation annotations on some of my beans 
:

  @NotBlank
  @Valid
  private String attribute1 ;

  @Valid
  private String attribute2 ;

When Spark tries to unmarshall these beans (after a remote RDD), I get the 
ClassNotFoundException :
17/09/07 09:19:25 INFO storage.BlockManager: Found block rdd_8_1 remotely
17/09/07 09:19:25 ERROR executor.Executor: Exception in task 3.0 in stage 2.0 
(TID 6)
java.lang.ClassNotFoundException: org.hibernate.validator.constraints.NotBlank
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
java.io.ObjectInputStream.resolveProxyClass(ObjectInputStream.java:700)
at java.io.ObjectInputStream.readProxyDesc(ObjectInputStream.java:1566)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
   ...

Indeed, it means that the annotation class is not found, because it is not in 
the classpath. Why ? I don't know, because I make a uber JAR that contains this 
class. I suppose that at the time the job tries to unmarshall the RDD, the uber 
jar is not loaded.

So, I try to add the hibernate JAR to the class loader manually, using this 
spark-submit command :

spark-submit --queue default \
--class com.my.Launcher \
--deploy-mode cluster \
--master yarn-cluster \
--driver-java-options "-Dfile.encoding=UTF-8" \
--jars /home/user/hibernate-validator-5.2.2.Final.jar \
--driver-class-path /home/user/hibernate-validator-5.2.2.Final.jar \
--conf 
"spark.executor.extraClassPath=/home/user/hibernate-validator-5.2.2.Final.jar" \
/home/user/uberjar-job.jar

Without effects. So, is there a way to add this class to the classloader ?

Thank you in advance.

Damien
=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


Part-time job

2017-09-08 Thread Uğur Sopaoğlu
Hi  all,

I have been working with Spark for about 8 months. But it is not fully
learned by self-study.  So I want to take a part-time job on a project.
Thus, I believe that it will both contribute to my own development and
benefit others.  I *do not have any salary* anticipation.

Can you help me?

Thanks

-- 
Uğur Sopaoğlu


Re: Spark Dataframe returning null columns when schema is specified

2017-09-08 Thread Praneeth Gayam
What is the desired behaviour when a field is null for only a few records?
You can not avoid nulls in this case
But if all rows are guaranteed to be uniform(either all-null are
all-non-null), you can *take* the first row of the DF and *drop* the
columns with null fields.

On Fri, Sep 8, 2017 at 12:14 AM, ravi6c2  wrote:

> Hi All, I have this problem where in Spark Dataframe is having null columns
> for the attributes from JSON that are not present. A clear explanation is
> provided below:
>
> *Use case:* Convert the JSON object into dataframe for further usage.
>
> *Case - 1:* Without specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>
>  file/t8407/Screenshot_at_Sep_07_11-36-27.png>
>
> *Case - 2:* With specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().schema(SchemaBuilder.buildSchema()).json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Schema Definition:*
> public static StructType buildSchema() {
> StructType schema = new StructType(
> new StructField[] {
> DataTypes.createStructField("request_id",
> DataTypes.StringType, false),
>
> DataTypes.createStructField("org_id", DataTypes.StringType, false),
>
> DataTypes.createStructField("queue_id", DataTypes.IntegerType, true),
>
> DataTypes.createStructField("owner", DataTypes.StringType, true),
>
> DataTypes.createStructField("disposition", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("created", DataTypes.TimestampType, true),
>
> DataTypes.createStructField("created_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("assigned", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("assigned_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("notes", DataTypes.StringType, true),
>
> DataTypes.createStructField("final_review_status",
> DataTypes.StringType, true),
>
> DataTypes.createStructField("event_tag", DataTypes.StringType, true),
>
> DataTypes.createStructField("additional_data", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("datetime", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("dc", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_id", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_status", DataTypes.StringType, true)
> });
> return (schema);
> }
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>  >
>
> If you see in the above case, when schema is defined I am getting the
> columns that are not specified in the JSON as null. Any work around on
> getting the result as expected in the first image(without nulls) using
> schema? I needed this to perform updates into 

Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Praneeth Gayam
With file stream you will have to deal with the following

   1. The file(s) must not be changed once created. So if the files are
   being continuously appended, the new data will not be read. Refer
   

   2. The files must be created in the dataDirectory by atomically *moving*
or *renaming* them into the data directory.

Since the latency requirements for the second job in the chain is only a
few mins, you may have to end up creating a new file every few mins

You may want to consider Kafka as your intermediary store for building a
chain/DAG of streaming jobs

On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind  wrote:

> Thanks for your response Michael
> Will try it out.
>
> Regards
> Sunita
>
> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>> enrichment on the same dataframe after persisting the raw data, however, in
>>> order to modularize I am planning to have a separate job which picks up the
>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>> 1 job as the enrichments could get project specific while raw data
>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>> have some latency (few minutes)
>>>
>>> My challenge is, after persisting the raw data, how do I chain the next
>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>> partitions on current date (MMDD) and within current date, the job 2
>>> (enrichment job) filters for records within 60s of current time and
>>> performs enrichment on it in 60s batches.
>>> Is this a good option? It seems to be error prone. When either of the
>>> jobs get delayed due to bursts or any error/exception this could lead to
>>> huge data losses and non-deterministic behavior . What are other
>>> alternatives to this?
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita Koppar
>>>
>>
>>