Re: Spark based Data Warehouse

2017-11-17 Thread lucas.g...@gmail.com
We are using Spark on Kubernetes on AWS (it's a long story) but it does
work.  It's still on the raw side but we've been pretty successful.

We configured our cluster primarily with Kube-AWS and auto scaling groups.
There are gotcha's there, but so far we've been quite successful.

Gary Lucas

On 17 November 2017 at 22:20, ashish rawat  wrote:

> Thanks everyone for their suggestions. Does any of you take care of auto
> scale up and down of your underlying spark clusters on AWS?
>
> On Nov 14, 2017 10:46 AM, "lucas.g...@gmail.com" 
> wrote:
>
> Hi Ashish, bear in mind that EMR has some additional tooling available
> that smoothes out some S3 problems that you may / almost certainly will
> encounter.
>
> We are using Spark / S3 not on EMR and have encountered issues with file
> consistency, you can deal with it but be aware it's additional technical
> debt that you'll need to own.  We didn't want to own an HDFS cluster so we
> consider it worthwhile.
>
> Here are some additional resources:  The video is Steve Loughran talking
> about S3.
> https://medium.com/@subhojit20_27731/apache-spark-and-
> amazon-s3-gotchas-and-best-practices-a767242f3d98
> https://www.youtube.com/watch?v=ND4L_zSDqF0
>
> For the record we use S3 heavily but tend to drop our processed data into
> databases so they can be more easily consumed by visualization tools.
>
> Good luck!
>
> Gary Lucas
>
> On 13 November 2017 at 20:04, Affan Syed  wrote:
>
>> Another option that we are trying internally is to uses Mesos for
>> isolating different jobs or groups. Within a single group, using Livy to
>> create different spark contexts also works.
>>
>> - Affan
>>
>> On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat 
>> wrote:
>>
>>> Thanks Sky Yin. This really helps.
>>>
>>> On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:
>>>
>>> We are running Spark in AWS EMR as data warehouse. All data are in S3
>>> and metadata in Hive metastore.
>>>
>>> We have internal tools to creat juypter notebook on the dev cluster. I
>>> guess you can use zeppelin instead, or Livy?
>>>
>>> We run genie as a job server for the prod cluster, so users have to
>>> submit their queries through the genie. For better resource utilization, we
>>> rely on Yarn dynamic allocation to balance the load of multiple
>>> jobs/queries in Spark.
>>>
>>> Hope this helps.
>>>
>>> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat 
>>> wrote:
>>>
 Hello Everyone,

 I was trying to understand if anyone here has tried a data warehouse
 solution using S3 and Spark SQL. Out of multiple possible options
 (redshift, presto, hive etc), we were planning to go with Spark SQL, for
 our aggregates and processing requirements.

 If anyone has tried it out, would like to understand the following:

1. Is Spark SQL and UDF, able to handle all the workloads?
2. What user interface did you provide for data scientist, data
engineers and analysts
3. What are the challenges in running concurrent queries, by many
users, over Spark SQL? Considering Spark still does not provide spill to
disk, in many scenarios, are there frequent query failures when 
 executing
concurrent queries
4. Are there any open source implementations, which provide
something similar?


 Regards,
 Ashish

>>>
>>>
>>
>
>


Re: Spark based Data Warehouse

2017-11-17 Thread ashish rawat
Thanks everyone for their suggestions. Does any of you take care of auto
scale up and down of your underlying spark clusters on AWS?

On Nov 14, 2017 10:46 AM, "lucas.g...@gmail.com" 
wrote:

Hi Ashish, bear in mind that EMR has some additional tooling available that
smoothes out some S3 problems that you may / almost certainly will
encounter.

We are using Spark / S3 not on EMR and have encountered issues with file
consistency, you can deal with it but be aware it's additional technical
debt that you'll need to own.  We didn't want to own an HDFS cluster so we
consider it worthwhile.

Here are some additional resources:  The video is Steve Loughran talking
about S3.
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-
best-practices-a767242f3d98
https://www.youtube.com/watch?v=ND4L_zSDqF0

For the record we use S3 heavily but tend to drop our processed data into
databases so they can be more easily consumed by visualization tools.

Good luck!

Gary Lucas

On 13 November 2017 at 20:04, Affan Syed  wrote:

> Another option that we are trying internally is to uses Mesos for
> isolating different jobs or groups. Within a single group, using Livy to
> create different spark contexts also works.
>
> - Affan
>
> On Tue, Nov 14, 2017 at 8:43 AM, ashish rawat  wrote:
>
>> Thanks Sky Yin. This really helps.
>>
>> On Nov 14, 2017 12:11 AM, "Sky Yin"  wrote:
>>
>> We are running Spark in AWS EMR as data warehouse. All data are in S3 and
>> metadata in Hive metastore.
>>
>> We have internal tools to creat juypter notebook on the dev cluster. I
>> guess you can use zeppelin instead, or Livy?
>>
>> We run genie as a job server for the prod cluster, so users have to
>> submit their queries through the genie. For better resource utilization, we
>> rely on Yarn dynamic allocation to balance the load of multiple
>> jobs/queries in Spark.
>>
>> Hope this helps.
>>
>> On Sat, Nov 11, 2017 at 11:21 PM ashish rawat 
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> I was trying to understand if anyone here has tried a data warehouse
>>> solution using S3 and Spark SQL. Out of multiple possible options
>>> (redshift, presto, hive etc), we were planning to go with Spark SQL, for
>>> our aggregates and processing requirements.
>>>
>>> If anyone has tried it out, would like to understand the following:
>>>
>>>1. Is Spark SQL and UDF, able to handle all the workloads?
>>>2. What user interface did you provide for data scientist, data
>>>engineers and analysts
>>>3. What are the challenges in running concurrent queries, by many
>>>users, over Spark SQL? Considering Spark still does not provide spill to
>>>disk, in many scenarios, are there frequent query failures when executing
>>>concurrent queries
>>>4. Are there any open source implementations, which provide
>>>something similar?
>>>
>>>
>>> Regards,
>>> Ashish
>>>
>>
>>
>


SpecificColumnarIterator has grown past JVM limit of 0xFFF

2017-11-17 Thread Md. Rezaul Karim
Dear All,

I was training the RandomForest with an input dataset having 20,000 columns
and 12,000 rows.
But when I start the training, it shows an exception:

Constant pool for class
org.apache.spark.sql.catalyst.expressions.GeneratedClass$*SpecificColumnarIterator
has grown past JVM limit of 0xFFF*

I understand that the current implementation cannot handle so many columns.
However, I was still wondering if there's any workaround to handle a
dataset like this?





Kind regards,
_

*Md. Rezaul Karim*, BSc, MSc

Research Scientist, Fraunhofer FIT, Germany
PhD Researcher, Information Systems, RWTH Aachen University, Germany
*Email:* rezaul.ka...@fit.fraunhofer.de
*Phone*: +49 241 80 21527 <%2B%2B49%20241%2080%2021527>

*Web:* http://www.reza-analytics.eu/index.html



History server and non-HDFS filesystems

2017-11-17 Thread Paul Mackles
Hi - I had originally posted this as a bug (SPARK-22528) but given my
uncertainty, it was suggested that I send it to the mailing list instead...

We are using Azure Data Lake (ADL) to store our event logs. This worked
fine in 2.1.x, but in 2.2.0 the underlying files are no longer visible to
the history server - even though we are using the same service principal
that was used to write the logs. I tracked it down to this call in
"FSHistoryProvider" (which was added for v2.2.0):


SparkHadoopUtil.checkAccessPermission()


>From what I can tell, it is preemptively checking the permissions on the
files and skipping the ones which it thinks are not readable. The problem
is that its using a check that appears to be specific to HDFS and so even
though the files are definitely readable, it skips over them. Also,
"FSHistoryProvider"
is the only place this code is used.

I was able to workaround it by either:

* setting the permissions for the files on ADL to world readable

* or setting HADOOP_PROXY to the objectId of the Azure service principal
which owns file

Neither of these workarounds are acceptable for our environment. That said,
I am not sure how this should be addressed:

* Is this an issue with the Azure/Hadoop not complying with how the Hadoop
FileSystem interface/contract in some way?

* Is this an issue with "checkAccessPermission()" not really accounting for
all of the possible FileSystem implementations?

My gut tells me its the latter because the
SparkHadoopUtil.checkAccessPermission()
gets its "currentUser" info from outside of the FileSystem class and it
doesn't make sense to me that an instance of FileSystem would affect a
global context since there could be many FileSytem instances in a given
app.

That said, I know ADL is not heavily used at this time so I wonder if
anyone is seeing this with S3 as well? Maybe not since S3 permissions are
always reported as world-readable (I think) which causes
checkAccessPermission()
to succeed.

Any thoughts or feedback appreciated.

-- 
Thanks,
Paul


Spark Streaming in Wait mode

2017-11-17 Thread KhajaAsmath Mohammed
Hi,

I am running spark streaming job and it is not picking up the next batches
but the job is still shows as running on YARN.

is this expected behavior if there is no data or waiting for data to pick
up?

I am almost behind 4 hours of batches (30 min interval)


[image: Inline image 1]

[image: Inline image 2]

hadoop.security.authentication=kerberos
spark.executor.memory=12g
spark.driver.am.memory=8G
spark.yarn.am.memoryOverhead=8g
spark.scheduler.mode=FAIR
spark.shuffle.compress=true
spark.shuffle.spill.compress=true
spark.broadcast.compress=true
spark.io.compression.codec=snappy
spark.dynamicAllocation.enabled=false
spark.streaming.dynamicAllocation.enabled=true
## HIVE JDBC ##
java.security.krb5.conf=krb5.conf
javax.security.auth.useSubjectCredsOnly=true
hive.jdbc.url=jdbc:Xa;principal=hive/_h...@ad.navistar.com;ssl=true
hive.jdbc.driver=org.apache.hive.jdbc.HiveDriver
keytab.file=va_dflt.keytab
spark.sql.parquet.binaryAsString=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.compression.codec=snappy
spark.rdd.compress=true
spark.io.compression.codec=snappy
spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false
index=15
includeIndex=true
BatchInterval=1800
CheckPointDir=hdfs://prodnameservice1/user/yyy1k78/KafkaCheckPoint
KafkaBrokerList=XXX
KafkaTopics=occlocation
###33
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.locality.wait=10
spark.task.maxFailures=8
spark.ui.killEnabled=false
spark.logConf=true
# SPARK STREAMING CONFIGURATION
spark.streaming.blockInterval=200
spark.streaming.receiver.writeAheadLog.enable=true
spark.streaming.backpressure.enabled=true
#spark.streaming.backpressure.pid.minRate=10
#spark.streaming.receiver.maxRate=100
#spark.streaming.kafka.maxRatePerPartition==100
#spark.streaming.backpressure.initialRate=30
spark.yarn.maxAppAttempts=8
spark.yarn.am.attemptFailuresValidityInterval=1h
spark.yarn.executor.failuresValidityInterval=1h

any suggestions on why the batches are not running ? is it expected
behavior?

Thanks,
Asmath


Union of streaming dataframes

2017-11-17 Thread Lalwani, Jayesh
Is union of 2 Structured streaming dataframes from different sources supported 
in 2.2?

We have done a union of 2 streaming dataframes that are from the same source. I 
wanted to know if multiple streams are supported or going to be supported in 
the future


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Struct Type

2017-11-17 Thread Jacek Laskowski
Hi,

Use explode function, filter operator and collect_list function.

Or "heavier" flatMap.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Nov 17, 2017 at 6:06 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I have following schema in dataframe and I want to extract key which
> matches as MaxSpeed from the array and it's corresponding value of the key.
>
> |-- tags: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- key: string (nullable = true)
>  |||-- value: string (nullable = true)
>
> is there any way to achieve it in dataframe?
>
> Thanks,
> Asmath
>


Re: Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-17 Thread Cody Koeninger
I don't see anything obvious, you'd need to do more troubleshooting.

Could also try creating a single rdd for a known range of offsets:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd

On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala  wrote:
> Hi Cody,
>
> It worked, after moving the parameter to sparkConf. I don't see that error.
> But, Now i'm seeing the count for each RDD returns 0. But, there are records
> in the topic i'm reading.
>
> Do you see anything wrong with how i'm creating the Direct Stream ?
>
> Thanks
> Jagadish
>
> On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger  wrote:
>>
>> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
>> a kafka parameter.
>>
>> see http://spark.apache.org/docs/latest/configuration.html
>>
>> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala  wrote:
>> > Hi,
>> >
>> > I'm trying to add spark-streaming to our kafka topic. But, I keep
>> > getting
>> > this error
>> > java.lang.AssertionError: assertion failed: Failed to get record after
>> > polling for 512 ms.
>> >
>> > I tried to add different params like max.poll.interval.ms,
>> > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams.
>> > But, i still get failed to get records after 512ms. Not sure, even
>> > adding
>> > the above params doesn't change the polling time.
>> >
>> > Without spark-streaming, i'm able to fetch the records. Only with
>> > spark-streaming addon, i get this error.
>> >
>> > Any help is greatly appreciated. Below, is the code i'm using.
>> >
>> > SparkConf sparkConf = new
>> >
>> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
>> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> > Durations.seconds(10));
>> >
>> > kafkaParams.put("bootstrap.servers", hosts);
>> > kafkaParams.put("group.id", groupid);
>> > kafkaParams.put("auto.commit.enable", false);
>> > kafkaParams.put("key.deserializer", StringDeserializer.class);
>> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
>> > kafkaParams.put("auto.offset.reset", "earliest");
>> > //kafkaParams.put("max.poll.interval.ms", 12000);
>> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
>> > //kafkaParams.put("request.timeout.ms", 12000);
>> >
>> >
>> > JavaInputDStream>> messages =
>> >   KafkaUtils.createDirectStream(ssc,
>> > LocationStrategies.PreferConsistent(),
>> >
>> > ConsumerStrategies.Subscribe(topics, kafkaParams));
>> > messages.foreachRDD(rdd -> {
>> > List>> input =
>> > rdd.collect();
>> > System.out.println("count is"+input.size());
>> > });
>> > ssc.start();
>> > ssc.awaitTermination();
>> >
>> > Thanks
>> > Jagadish
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Struct Type

2017-11-17 Thread KhajaAsmath Mohammed
Hi,

I have following schema in dataframe and I want to extract key which
matches as MaxSpeed from the array and it's corresponding value of the key.

|-- tags: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- key: string (nullable = true)
 |||-- value: string (nullable = true)

is there any way to achieve it in dataframe?

Thanks,
Asmath


Re: Multiple transformations without recalculating or caching

2017-11-17 Thread Fernando Pereira
Notice the fact that I have 1+ TB. If I didn't mind things to be slow I
wouldn't be using spark.

On 17 November 2017 at 11:06, Sebastian Piu  wrote:

> If you don't want to recalculate you need to hold the results somewhere,
> of you need to save it why don't you so that and then read it again and get
> your stats?
>
> On Fri, 17 Nov 2017, 10:03 Fernando Pereira,  wrote:
>
>> Dear Spark users
>>
>> Is it possible to take the output of a transformation (RDD/Dataframe) and
>> feed it to two independent transformations without recalculating the first
>> transformation and without caching the whole dataset?
>>
>> Consider the case of a very large dataset (1+TB) which suffered several
>> transformations and now we want to save it but also calculate some
>> statistics per group.
>> So the best processing way would for: for each partition: do task A, do
>> task B.
>>
>> I don't see a way of instructing spark how to proceed that way without
>> caching to disk, which seems unnecessarily heavy. And if we don't cache
>> spark recalculates every partition all the way from the beginning. In
>> either case huge file reads happen.
>>
>> Any ideas on how to avoid it?
>>
>> Thanks
>>
>> Fernando
>>
>


Re: Multiple transformations without recalculating or caching

2017-11-17 Thread Sebastian Piu
If you don't want to recalculate you need to hold the results somewhere, of
you need to save it why don't you so that and then read it again and get
your stats?

On Fri, 17 Nov 2017, 10:03 Fernando Pereira,  wrote:

> Dear Spark users
>
> Is it possible to take the output of a transformation (RDD/Dataframe) and
> feed it to two independent transformations without recalculating the first
> transformation and without caching the whole dataset?
>
> Consider the case of a very large dataset (1+TB) which suffered several
> transformations and now we want to save it but also calculate some
> statistics per group.
> So the best processing way would for: for each partition: do task A, do
> task B.
>
> I don't see a way of instructing spark how to proceed that way without
> caching to disk, which seems unnecessarily heavy. And if we don't cache
> spark recalculates every partition all the way from the beginning. In
> either case huge file reads happen.
>
> Any ideas on how to avoid it?
>
> Thanks
>
> Fernando
>


Multiple transformations without recalculating or caching

2017-11-17 Thread Fernando Pereira
Dear Spark users

Is it possible to take the output of a transformation (RDD/Dataframe) and
feed it to two independent transformations without recalculating the first
transformation and without caching the whole dataset?

Consider the case of a very large dataset (1+TB) which suffered several
transformations and now we want to save it but also calculate some
statistics per group.
So the best processing way would for: for each partition: do task A, do
task B.

I don't see a way of instructing spark how to proceed that way without
caching to disk, which seems unnecessarily heavy. And if we don't cache
spark recalculates every partition all the way from the beginning. In
either case huge file reads happen.

Any ideas on how to avoid it?

Thanks
Fernando