Re: run spark job in yarn cluster mode as specified user

2018-01-22 Thread sd wang
Thanks!
I finally make this work, except parameter LinuxContainerExecutor and
 cache directory permissions,  the following parameter also need to be
updated to specified user.
yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user

Thanks.

2018-01-22 22:44 GMT+08:00 Margusja :

> Hi
>
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor requires
> user in each node and right permissions set in necessary directories.
>
> Br
> Margus
>
>
> On 22 Jan 2018, at 13:41, sd wang  wrote:
>
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor
>
>
>


Spark SQL bucket pruning support

2018-01-22 Thread Joe Wang
Hi,

I'm wondering if the current version of Spark still supports bucket
pruning? I see the pull request 
that incorporated the change, but the logic to actually skip reading
buckets has since been removed as part of other PRs
, and the logic in the
BucketedReadSuite to verify that pruned buckets are empty is currently
commented
out

.

Thanks,
Joe


Re: Spark Tuning Tool

2018-01-22 Thread lucas.g...@gmail.com
I'd be very interested in anything I can send to my analysts to assist them
with their troubleshooting / optimization... Of course our engineers would
appreciate it as well.

However we'd be way more interested if it was OSS.

Thanks!

Gary Lucas

On 22 January 2018 at 21:16, Holden Karau  wrote:

> That's very interesting, and might also get some interest on the dev@
> list if it was open source.
>
> On Tue, Jan 23, 2018 at 4:02 PM, Roger Marin 
> wrote:
>
>> I'd be very interested.
>>
>> On 23 Jan. 2018 4:01 pm, "Rohit Karlupia"  wrote:
>>
>>> Hi,
>>>
>>> I have been working on making the performance tuning of spark
>>> applications bit easier.  We have just released the beta version of the
>>> tool on Qubole.
>>>
>>> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
>>>
>>> This is not OSS yet but we would like to contribute it to OSS.  Fishing
>>> for some interest in the community if people find this work interesting and
>>> would like to try to it out.
>>>
>>> thanks,
>>> Rohit Karlupia
>>>
>>>
>>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: Spark Tuning Tool

2018-01-22 Thread Holden Karau
That's very interesting, and might also get some interest on the dev@ list
if it was open source.

On Tue, Jan 23, 2018 at 4:02 PM, Roger Marin  wrote:

> I'd be very interested.
>
> On 23 Jan. 2018 4:01 pm, "Rohit Karlupia"  wrote:
>
>> Hi,
>>
>> I have been working on making the performance tuning of spark
>> applications bit easier.  We have just released the beta version of the
>> tool on Qubole.
>>
>> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
>>
>> This is not OSS yet but we would like to contribute it to OSS.  Fishing
>> for some interest in the community if people find this work interesting and
>> would like to try to it out.
>>
>> thanks,
>> Rohit Karlupia
>>
>>
>>


-- 
Twitter: https://twitter.com/holdenkarau


Re: Spark Tuning Tool

2018-01-22 Thread Roger Marin
I'd be very interested.

On 23 Jan. 2018 4:01 pm, "Rohit Karlupia"  wrote:

> Hi,
>
> I have been working on making the performance tuning of spark applications
> bit easier.  We have just released the beta version of the tool on Qubole.
>
> https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/
>
> This is not OSS yet but we would like to contribute it to OSS.  Fishing
> for some interest in the community if people find this work interesting and
> would like to try to it out.
>
> thanks,
> Rohit Karlupia
>
>
>


Spark Tuning Tool

2018-01-22 Thread Rohit Karlupia
Hi,

I have been working on making the performance tuning of spark applications
bit easier.  We have just released the beta version of the tool on Qubole.

https://www.qubole.com/blog/introducing-quboles-spark-tuning-tool/

This is not OSS yet but we would like to contribute it to OSS.  Fishing for
some interest in the community if people find this work interesting and
would like to try to it out.

thanks,
Rohit Karlupia


Re: How to hold some data in memory while processing rows in a DataFrame?

2018-01-22 Thread vermanurag
Looking at description of problem window functions may solve your issue. It
 allows operation over a window that can include records before/ after the
particular record




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to hold some data in memory while processing rows in a DataFrame?

2018-01-22 Thread naresh Goud
If I understand your requirement correct.
Use broadcast variables to replicate across all nodes the small amount of
data you wanted to reuse.



On Mon, Jan 22, 2018 at 9:24 PM David Rosenstrauch 
wrote:

> This seems like an easy thing to do, but I've been banging my head against
> the wall for hours trying to get it to work.
>
> I'm processing a spark dataframe (in python).  What I want to do is, as
> I'm processing it I want to hold some data from one record in some local
> variables in memory, and then use those values later while I'm processing a
> subsequent record.  But I can't see any way to do this.
>
> I tried using:
>
> dataframe.select(a_custom_udf_function('some_column'))
>
> ... and then reading/writing to local variables in the udf function, but I
> can't get this to work properly.
>
> My next guess would be to use dataframe.foreach(a_custom_function) and try
> to save data to local variables in there, but I have a suspicion that may
> not work either.
>
>
> What's the correct way to do something like this in Spark?  In Hadoop I
> would just go ahead and declare local variables, and read and write to them
> in my map function as I like.  (Although with the knowledge that a) the
> same map function would get repeatedly called for records with many
> different keys, and b) there would be many different instances of my code
> spread across many machines, and so each map function running on an
> instance would only see a subset of the records.)  But in Spark it seems to
> be extraordinarily difficult to create local variables that can be read
> from / written to across different records in the dataframe.
>
> Perhaps there's something obvious I'm missing here?  If so, any help would
> be greatly appreciated!
>
> Thanks,
>
> DR
>
>


How to hold some data in memory while processing rows in a DataFrame?

2018-01-22 Thread David Rosenstrauch
 This seems like an easy thing to do, but I've been banging my head against
the wall for hours trying to get it to work.

I'm processing a spark dataframe (in python).  What I want to do is, as I'm
processing it I want to hold some data from one record in some local
variables in memory, and then use those values later while I'm processing a
subsequent record.  But I can't see any way to do this.

I tried using:

dataframe.select(a_custom_udf_function('some_column'))

... and then reading/writing to local variables in the udf function, but I
can't get this to work properly.

My next guess would be to use dataframe.foreach(a_custom_function) and try
to save data to local variables in there, but I have a suspicion that may
not work either.


What's the correct way to do something like this in Spark?  In Hadoop I
would just go ahead and declare local variables, and read and write to them
in my map function as I like.  (Although with the knowledge that a) the
same map function would get repeatedly called for records with many
different keys, and b) there would be many different instances of my code
spread across many machines, and so each map function running on an
instance would only see a subset of the records.)  But in Spark it seems to
be extraordinarily difficult to create local variables that can be read
from / written to across different records in the dataframe.

Perhaps there's something obvious I'm missing here?  If so, any help would
be greatly appreciated!

Thanks,

DR


Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time

2018-01-22 Thread Tathagata Das
For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage
after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw 
wrote:

> Hi,
>
> I recently did some experiments with stateful structured streaming by using
> flatmapgroupswithstate. The streaming application is quit simple: It
> receives data from Kafka, feed it to the stateful operator
> (flatmapgroupswithstate) and sinks the output to console.
> During a test with small datasets (3-5 records per batch) I experienced
> long
> batch runs.
>
> Taking a look at the log I see an explosion of tasks with these log
> entries:
> -
> 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
> 287)
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
> blocks out of 1 blocks
> 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
> org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
> fetches in 0 ms
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.sql.execution.streaming.state.
> HDFSBackedStateStoreProvider
> - Committed version 2 for
> HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-
> 8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
> to file
> /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
> 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
> org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
> 287). 2212 bytes result sent to driver
> -
>
> A batch run takes approx. 5 seconds and it seems most of the time it is
> doing tasks like above.
> I created several apps using the non-Spark SQL approach with mapWithState
> but never experienced these long batch runs.
>
> Anyone has this experience as well or can help me finding a solution for
> these long runs.
>
> Regards,
>
> Chris
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark vs Snowflake

2018-01-22 Thread Patrick McCarthy
Last I heard of them a year or two ago, they basically repackage AWS
services behind their own API/service layer for convenience. There's
probably a value-add if you're not familiar with optimizing AWS, but if you
already have that expertise I don't expect they would add much extra
performance if any.

On Mon, Jan 22, 2018 at 4:51 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> Has anyone had experience of using Snowflake
>  which touts itself as data warehouse
> built for the cloud? In reviews
> one
> recommendation states
>
> "DEFINITELY AN ALTERNATIVE TO AMAZON RED SHIFT AND SPARK"
>
> Now I am not sure about inner workings of this product but I will be
> surprised if the product is actually faster than using Spark on HDFS.
>
> There is very little literature on this product except that it shouts "me
> too" among Amazon Redshift and Google BigQuery.
>
> anyone has used this product?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Spark vs Snowflake

2018-01-22 Thread Mich Talebzadeh
Hi,

Has anyone had experience of using Snowflake 
which touts itself as data warehouse built for the cloud? In reviews
one
recommendation states

"DEFINITELY AN ALTERNATIVE TO AMAZON RED SHIFT AND SPARK"

Now I am not sure about inner workings of this product but I will be
surprised if the product is actually faster than using Spark on HDFS.

There is very little literature on this product except that it shouts "me
too" among Amazon Redshift and Google BigQuery.

anyone has used this product?

thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Has there been any explanation on the performance degradation between spark.ml and Mllib?

2018-01-22 Thread Weichen Xu
Hi Stephen,

Agree with Nick said, the ML vs MLLib comparison test seems to be flawed.

LR in Spark MLLib use SGD, in each iteration during training, SGD only
sample a small fraction of data and do gradient computation, but in each
iteration LBFGS need to aggregate over the whole input dataset. So in each
iteration LBFGS will take a longer time, if dataset is large.

But LBFGS is a kind of quasi-Newton methods so that it converges faster
(nearly converges quadratically), but SGD method is linear convergence, and
we need to tune the step-size for SGD otherwise we may get very slow
convergence speed.

On Sun, Jan 21, 2018 at 11:31 PM, Nick Pentreath 
wrote:

> At least one of their comparisons is flawed.
>
> The Spark ML version of linear regression (*note* they use linear
> regression and not logistic regression, it is not clear why) uses L-BFGS as
> the solver, not SGD (as MLLIB uses). Hence it is typically going to be
> slower. However, it should in most cases converge to a better solution.
> MLLIB doesn't offer an L-BFGS version for linear regression, but it does
> for logistic regression.
>
> In my view a more sensible comparison would be between LogReg with L-BFGS
> between ML and MLLIB. These should be close to identical since now the
> MLLIB version actually wraps the ML version.
>
> They also don't show any results for algorithm performance (accuracy, AUC
> etc). The better comparison to make is the run-time to achieve the same AUC
> (for example). SGD may be fast, but it may result in a significantly poorer
> solution relative to say L-BFGS.
>
> Note that the "withSGD" algorithms are deprecated in MLLIB partly to move
> users to ML, but also partly because their performance in terms of accuracy
> is relatively poor and the amount of tuning required (e.g. learning rates)
> is high.
>
> They say:
>
> The time difference between Spark MLlib and Spark ML can be explained by
> internally transforming the dataset from DataFrame to RDD in order to use
> the same implementation of the algorithm present in MLlib.
>
> but this is not true for the LR example.
>
> For the feature selection example, it is probably mostly due to the
> conversion, but even then the difference seems larger than what I would
> expect. It would be worth investigating their implementation to see if
> there are other potential underlying causes.
>
>
> On Sun, 21 Jan 2018 at 23:49 Stephen Boesch  wrote:
>
>> While MLLib performed favorably vs Flink it *also *performed favorably
>> vs spark.ml ..  and by an *order of magnitude*.  The following is one of
>> the tables - it is for Logistic Regression.  At that time spark.ML did not
>> yet support SVM
>>
>> From: https://bdataanalytics.biomedcentral.com/articles/10.1186/
>> s41044-016-0020-2
>>
>>
>>
>> Table 3
>>
>> LR learning time in seconds
>>
>> Dataset
>>
>> Spark MLlib
>>
>> Spark ML
>>
>> Flink
>>
>> ECBDL14-10
>>
>> 3
>>
>> 26
>>
>> 181
>>
>> ECBDL14-30
>>
>> 5
>>
>> 63
>>
>> 815
>>
>> ECBDL14-50
>>
>> 6
>>
>> 173
>>
>> 1314
>>
>> ECBDL14-75
>>
>> 8
>>
>> 260
>>
>> 1878
>>
>> ECBDL14-100
>>
>> 12
>>
>> 415
>>
>> 2566
>>
>> The DataFrame based API (spark.ml) is even slower vs the RDD (mllib)
>> than had been anticipated - yet the latter has been shutdown for several
>> versions of Spark already.  What is the thought process behind that
>> decision : *performance matters! *Is there visibility into a meaningful
>> narrowing of that gap?
>>
>


Re: Spark querying C* in Scala

2018-01-22 Thread Sathish Kumaran Vairavelu
You have to register a Cassandra table in spark as dataframes


https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


Thanks

Sathish
On Mon, Jan 22, 2018 at 7:43 AM Conconscious  wrote:

> Hi list,
>
> I have a Cassandra table with two fields; id bigint, kafka text
>
> My goal is to read only the kafka field (that is a JSON) and infer the
> schema
>
> Hi have this skeleton code (not working):
>
> sc.stop
> import org.apache.spark._
> import com.datastax.spark._
> import org.apache.spark.sql.functions.get_json_object
>
> import org.apache.spark.sql.functions.to_json
> import org.apache.spark.sql.functions.from_json
> import org.apache.spark.sql.types._
>
> val conf = new SparkConf(true)
> .set("spark.cassandra.connection.host", "127.0.0.1")
> .set("spark.cassandra.auth.username", "cassandra")
> .set("spark.cassandra.auth.password", "cassandra")
> val sc = new SparkContext(conf)
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val df = sqlContext.sql("SELECT kafka from table1")
> df.printSchema()
>
> I think at least I have two problems; is missing the keyspace, is not
> recognizing the table and for sure is not going to infer the schema from
> the text field.
>
> I have a working solution for json files, but I can't "translate" this
> to Cassandra:
>
> import org.apache.spark.sql.SparkSession
> import spark.implicits._
> val spark = SparkSession.builder().appName("Spark SQL basic
> example").getOrCreate()
> val redf = spark.read.json("/usr/local/spark/examples/cqlsh_r.json")
> redf.printSchema
> redf.count
> redf.show
> redf.createOrReplaceTempView("clicks")
> val clicksDF = spark.sql("SELECT * FROM clicks")
> clicksDF.show()
>
> My Spark version is 2.2.1 and Cassandra version is 3.11.1
>
> Thanks in advance
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [EXT] How do I extract a value in foreachRDD operation

2018-01-22 Thread Toy
Thanks Michael,

Can you give me an example? I'm new to Spark

On Mon, 22 Jan 2018 at 12:25 Michael Mansour 
wrote:

> Toy,
>
>
>
> I suggest your partition your data according to date, and use the
> forEachPartition function, using the partition as the bucket location.
>
> This would require you to define a custom hash partitioner function, but
> that is not too difficult.
>
>
>
> --
>
> Michael Mansour
>
> Data Scientist
>
> Symantec
>
> *From: *Toy 
> *Date: *Monday, January 22, 2018 at 8:19 AM
> *To: *"user@spark.apache.org" 
> *Subject: *[EXT] How do I extract a value in foreachRDD operation
>
>
>
> Hi,
>
>
>
> We have a spark application to parse log files and save to S3 in ORC
> format. However, during the foreachRDD operation we need to extract a date
> field to be able to determine the bucket location; we partition it by date.
> Currently, we just hardcode it by current date, but we have a requirement
> to determine it for each record.
>
>
>
> Here's the current code.
>
>
>
> jsonRows.foreachRDD(r => {
>
>   val parsedFormat = new SimpleDateFormat("-MM-dd/")
>
>   val parsedDate = parsedFormat.format(new java.util.Date())
>
>   val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
> parsedDate
>
>
>
>   val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
>
>
>
>   val writer =
> jsonDf.write.mode("append").format("orc").option("compression", "zlib")
>
>
>
>   if (environment.equals("local")) {
>
> writer.save("/tmp/sparrow")
>
>   } else {
>
> writer.save(OutputPath)
>
>   }
>
> })
>
>
>
> The column in jsonRow that we want is `_ts`.
>
>
>
> Thanks.
>


Production Critical : Data loss in spark streaming

2018-01-22 Thread KhajaAsmath Mohammed
Hi,

I have been using the spark streaming with kafka. I have to restart the
application daily due to kms issue and after restart the offsets are not
matching with the point I left. I am creating checkpoint directory with

val streamingContext = StreamingContext.getOrCreate(checkPointDir, () =>
createStreamingContext(checkPointDir, sparkSession, batchInt, kafkaParams,
topicsSet, config, sparkConfig))

Batch 1:


Batch 2: After Restart and completion of two batches.


[image: Inline image 1]
Thanks,
Asmath


Re: [EXT] How do I extract a value in foreachRDD operation

2018-01-22 Thread Michael Mansour
Toy,

I suggest your partition your data according to date, and use the 
forEachPartition function, using the partition as the bucket location.
This would require you to define a custom hash partitioner function, but that 
is not too difficult.

--
Michael Mansour
Data Scientist
Symantec
From: Toy 
Date: Monday, January 22, 2018 at 8:19 AM
To: "user@spark.apache.org" 
Subject: [EXT] How do I extract a value in foreachRDD operation

Hi,

We have a spark application to parse log files and save to S3 in ORC format. 
However, during the foreachRDD operation we need to extract a date field to be 
able to determine the bucket location; we partition it by date. Currently, we 
just hardcode it by current date, but we have a requirement to determine it for 
each record.

Here's the current code.

jsonRows.foreachRDD(r => {
  val parsedFormat = new SimpleDateFormat("-MM-dd/")
  val parsedDate = parsedFormat.format(new java.util.Date())
  val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" + parsedDate

  val jsonDf = sqlSession.read.schema(Schema.schema).json(r)

  val writer = 
jsonDf.write.mode("append").format("orc").option("compression", "zlib")

  if (environment.equals("local")) {
writer.save("/tmp/sparrow")
  } else {
writer.save(OutputPath)
  }
})

The column in jsonRow that we want is `_ts`.

Thanks.


Re: spark 2.0 and spark 2.2

2018-01-22 Thread Xiao Li
Generally, the behavior changes in Spark SQL will be documented in
https://spark.apache.org/docs/latest/sql-programming-guide.html#migration-guide

In the ongoing Spark 2.3 release, all the behavior changes in Spark
SQL/DataFrame/Dataset that causes behavior changes are documented in this
section.

Thanks,

Xiao

2018-01-22 7:07 GMT-08:00 Mihai Iacob :

> Does spark 2.2 have good backwards compatibility? Is there something that
> won't work that works in spark 2.0?
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: external shuffle service in mesos

2018-01-22 Thread Susan X. Huynh
Hi Igor,

You made a good point about the tradeoffs. I think the main thing you would
get with Marathon is the accounting for resources (the memory and cpus
specified in the config file). That allows Mesos to manage the resources
properly. I don't think the other tools mentioned would reserve resources
from Mesos.

If you want more information about production ops for Mesos, you might want
to ask in the Mesos mailing list. Or, you can check out the
https://dcos.io/community/ project.

Susan

On Sat, Jan 20, 2018 at 11:59 PM, igor.berman  wrote:

> Hi Susan
>
> In general I can get what I need without Marathon, with configuring
> external-shuffle-service with puppet/ansible/chef + maybe some alerts for
> checks.
>
> I mean in companies that don't have strong Devops teams and want to install
> services as simple as possible just by config - Marathon might be useful,
> however if company already has strong puppet/ansible/chef whatever infra,
> the Marathon addition(additional component) and management is less clear
>
> WDYT?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


How do I extract a value in foreachRDD operation

2018-01-22 Thread Toy
Hi,

We have a spark application to parse log files and save to S3 in ORC
format. However, during the foreachRDD operation we need to extract a date
field to be able to determine the bucket location; we partition it by date.
Currently, we just hardcode it by current date, but we have a requirement
to determine it for each record.

Here's the current code.

jsonRows.foreachRDD(r => {
  val parsedFormat = new SimpleDateFormat("-MM-dd/")
  val parsedDate = parsedFormat.format(new java.util.Date())
  val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
parsedDate

  val jsonDf = sqlSession.read.schema(Schema.schema).json(r)

  val writer =
jsonDf.write.mode("append").format("orc").option("compression", "zlib")

  if (environment.equals("local")) {
writer.save("/tmp/sparrow")
  } else {
writer.save(OutputPath)
  }
})

The column in jsonRow that we want is `_ts`.

Thanks.


Re: [Spark DataFrame]: Passing DataFrame to custom method results in NullPointerException

2018-01-22 Thread Matteo Cossu
Hello,
I did not understand very well your question.
However, I can tell you that if you do .collect() on a RDD you are
collecting all the data in the driver node. For this reason, you should use
it only when the RDD is very small.
Your function "validate_hostname" depends on a DataFrame. It's not possible
to refer a DataFrame from a worker node, that's why that operation doesn't
work. In the other case it works because the "map" is a function executed
in the driver, not an RDD's method.
In these cases you could use broadcast variables, but I have the intuition
that, in general, you are using the wrong approach to solve the problem.

Best Regards,

Matteo Cossu


On 15 January 2018 at 12:56,  wrote:

> Hi,
>
>
>
> My Spark app is mapping lines from a text file to case classes stored
> within an RDD.
>
>
>
> When I run the following code on this rdd:
>
> .collect.map(line => if(validate_hostname(line, data_frame))
> line).foreach(println)
>
>
>
> It correctly calls the method validate_hostname by passing the case class
> and another data_frame defined within the main method. Unfortunately the
> above map only returns a TraversableLike collection so I can’t do
> transformations and joins on this data structure so I’m tried to apply a
> filter on the rdd with the following code:
>
> .filter(line => validate_hostname(line, data_frame)).count()
>
>
>
> Unfortunately the above method with filtering the rdd does not pass the
> data_frame so I get a NullPointerException though it correctly passes the
> case class which I print within the method.
>
>
>
> Where am I going wrong?
>
>
>
> When
>
>
>
> Regards,
>
> Abdul Haseeb Hussain
>


spark 2.0 and spark 2.2

2018-01-22 Thread Mihai Iacob
Does spark 2.2 have good backwards compatibility? Is there something that won't work that works in spark 2.0?
 
Regards, 

Mihai IacobDSX Local - Security, IBM Analytics


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: run spark job in yarn cluster mode as specified user

2018-01-22 Thread Margusja
Hi

org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor requires user 
in each node and right permissions set in necessary directories. 

Br
Margus


> On 22 Jan 2018, at 13:41, sd wang  wrote:
> 
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor



Spark querying C* in Scala

2018-01-22 Thread Conconscious
Hi list,

I have a Cassandra table with two fields; id bigint, kafka text

My goal is to read only the kafka field (that is a JSON) and infer the
schema

Hi have this skeleton code (not working):

sc.stop
import org.apache.spark._
import com.datastax.spark._
import org.apache.spark.sql.functions.get_json_object

import org.apache.spark.sql.functions.to_json
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
val sc = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.sql("SELECT kafka from table1")
df.printSchema()

I think at least I have two problems; is missing the keyspace, is not
recognizing the table and for sure is not going to infer the schema from
the text field.

I have a working solution for json files, but I can't "translate" this
to Cassandra:

import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark = SparkSession.builder().appName("Spark SQL basic
example").getOrCreate()
val redf = spark.read.json("/usr/local/spark/examples/cqlsh_r.json")
redf.printSchema
redf.count
redf.show
redf.createOrReplaceTempView("clicks")
val clicksDF = spark.sql("SELECT * FROM clicks")
clicksDF.show()

My Spark version is 2.2.1 and Cassandra version is 3.11.1

Thanks in advance



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: run spark job in yarn cluster mode as specified user

2018-01-22 Thread Jörn Franke
Configure Kerberos

> On 22. Jan 2018, at 08:28, sd wang  wrote:
> 
> Hi Advisers,
> When submit spark job in yarn cluster mode, the job will be executed by 
> "yarn" user. Any parameters can change the user? I tried setting 
> HADOOP_USER_NAME but it did not work. I'm using spark 2.2. 
> Thanks for any help!


Re: run spark job in yarn cluster mode as specified user

2018-01-22 Thread sd wang
Hi Margus,
Appreciate your help!
Seems this parameter is related to CGroups functions.
I am using CDH without kerberos, I set the parameter:
yarn.nodemanager.container-executor.class=org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor

Then run spark job again, hit the problem as below, any points I missed?
Thanks again !
... ...
Diagnostics: Application application_1516614010938_0003 initialization
failed (exitCode=255) with output: main : command provided 0
main : run as user is nobody
main : requested yarn user is ses_test
Can't create directory
/data/yarn/nm/usercache/test_user/appcache/application_1516614010938_0003 -
Permission denied
Can't create directory
/data01/yarn/nm/usercache/test_user/appcache/application_1516614010938_0003
- Permission denied
Did not create any app directories
... ...



2018-01-22 15:36 GMT+08:00 Margusja :

> Hi
>
> One way to get it is use YARN configuration parameter - yarn.nodemanager.
> container-executor.class.
> By default it is org.apache.hadoop.yarn.server.nodemanager.
> DefaultContainerExecutor
>
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor - gives
> you user who executes script.
>
> Br
> Margus
>
>
>
> On 22 Jan 2018, at 09:28, sd wang  wrote:
>
> Hi Advisers,
> When submit spark job in yarn cluster mode, the job will be executed by
> "yarn" user. Any parameters can change the user? I tried
> setting HADOOP_USER_NAME but it did not work. I'm using spark 2.2.
> Thanks for any help!
>
>
>


Spark and CEP type examples

2018-01-22 Thread Esa Heikkinen
Hi

I am looking for simple examples using CEP (Complex Event Processing) with 
Scala and Python. Does anyone know good ones ?

I do not need preprocessing (like in Kafka), but only analyzing phase of CEP 
inside Spark.

I am also interested to know other possibilities to search sequential event 
patterns from (time series) log data using by Spark. These event patterns can 
be described by DAG (directed acyclic graphs), so they are no simple chain of 
events. And sometimes it can be better to search events in the backward or 
reverse order (not always in forward order like in streaming case). But this 
can be impossible by Spark ?

Esa Heikkinen




Using window function works extremely slowly

2018-01-22 Thread Anton Puzanov
I try to use spark sql built in window function:
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#window(org.apache.spark.sql.Column,%20java.lang.String)

I run it with step=1 seconds and window = 3 minutes (ratio of 180) and it
runs extremely slow compared to other methods (join & filter for example)

Example code:
Dataset:
+---+---+
|data   |timestamp  |
+---+---+
|data1|2017-12-28 11:23:10|
|data1|2017-12-28 11:23:11|
|data1|2017-12-28 11:23:19|
|data2|2017-12-28 11:23:13|
|data2|2017-12-28 11:23:14|
+---+---+
And a third column of features which doesn't show here.

Code:

private static String TIME_STEP_STRING = "1 seconds";
private static String TIME_WINDOW_STRING = "3 minutes";

Column slidingWindow = functions.window(data.col("timestamp"),
TIME_WINDOW_STRING, TIME_STEP_STRING);
Dataset data2 = data.withColumn("slide", slidingWindow);
Dataset finalRes = data2.groupBy(slidingWindow,
data2.col("data")).agg(functions.collect_set("features").as("feature_set")).cache();


Am I using it wrong? the situation is so bad I get
java.lang.OutOfMemoryError: Java heap space