RE: How does predicate push down really help?

2016-11-16 Thread Mendelson, Assaf
In the first example, you define the table to be table users from some SQL 
server. Then you perform a filter.
Without predicate pushdown (or any optimization) basically spark understand 
this as follows:
“grab the data from the source described” (which in this case means get all of 
the table from the external sql server to spark memory)
“do the operations I asked for” (in this case filtering).
What predicate pushdown means in this case is that since spark knows an 
external SQL server can actually understand and benefit from the filter command 
it can actually send the filter as part of the query and then once the data 
arrives in spark, it is already filtered.

In the second example we have two tables A and B. What you ask in the command 
is:
“Read A”
“Read B”
“Perform the join” (which is a heavy operation)
“Perform the filtering on the result”

What predicate pushdown would do instead is translate it to:
“Read A”
“Perform filtering on A”
“Read B”
“Perform filtering on B”
“perform the join on the filtered A and B”
Now the join is being made on smaller data (after the filtering) and therefore 
takes less time. The heuristic is that in most cases the time saved on the join 
would be much more than any extra time taken by the filter itself.

BTW. You can see the differences between the original plan and the optimized 
plan by calling explain(true) on the dataframe.  This would show you what was 
parsed, how the optimization worked and what was physically run.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 9:50 AM
To: Mendelson, Assaf
Cc: user @spark
Subject: Re: How does predicate push down really help?

Hi Assaf,

I am still trying to understand the merits of predicate push down from the 
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs 
to pull all the rows and filter it in memory? why not simply issue select 
statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate 
push down feature we could simply do it using JOIN and where operators in the 
SQL statement right.

I feel like I am missing something to understand the merits of predicate push 
down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf 
> wrote:
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")





Re: How does predicate push down really help?

2016-11-16 Thread Ashic Mahtab
Consider a data source that has data in 500mb files, and doesn't support 
predicate push down. Spark will have to load all the data into memory before it 
can apply filtering, select "columns" etc. Each 500mb file will at some point 
have to be loaded entirely in memory. Now consider a data source that does 
support predicate push down, like mysql. Spark will only need to retrieve the 
rows and columns it needs as the db provides an interface for it to do so. If 
the underlying data source supports predicate push down, and the corresponding 
connector supports it, then filtering, projection, etc. is pushed down to the 
storage level. If not, the full dataset needs to be loaded into memory, and 
filtering, projection, etc. would happen there.

Get Outlook for Android



On Thu, Nov 17, 2016 at 7:50 AM +, "kant kodali" 
> wrote:

Hi Assaf,

I am still trying to understand the merits of predicate push down from the 
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs 
to pull all the rows and filter it in memory? why not simply issue select 
statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate 
push down feature we could simply do it using JOIN and where operators in the 
SQL statement right.

I feel like I am missing something to understand the merits of predicate push 
down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf 
> wrote:
Actually, both you translate to the same plan.
When you do sql("some code") or filter, it doesn't actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let's say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")





Re: How does predicate push down really help?

2016-11-16 Thread kant kodali
Hi Assaf,

I am still trying to understand the merits of predicate push down from the
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark
needs to pull all the rows and filter it in memory? why not simply issue
select statement with "where" clause to do the filtering via JDBC or
something?

Example 2: Same Argument as Example 1 except when we don't have a predicate
push down feature we could simply do it using JOIN and where operators in
the SQL statement right.

I feel like I am missing something to understand the merits of predicate
push down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf 
wrote:

> Actually, both you translate to the same plan.
>
> When you do sql(“some code”) or filter, it doesn’t actually do the query.
> Instead it is translated to a plan (parsed plan) which transform everything
> into standard spark expressions. Then spark analyzes it to fill in the
> blanks (what is users table for example) and attempts to optimize it.
> Predicate pushdown happens in the optimization portion.
>
> For example, let’s say that users would actually be backed by a table on
> an sql query in mysql.
>
> Without predicate pushdown spark would first pull the entire users table
> from mysql and only then do the filtering. Predicate pushdown would mean
> the filtering would be done as part of the original sql query.
>
>
>
> Another (probably better) example would be something like having two table
> A and B which are joined by some common key. Then a filtering is done on
> the key. Moving the filter to be before the join would probably make
> everything faster as filter is a faster operation than a join.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Thursday, November 17, 2016 8:03 AM
> *To:* user @spark
> *Subject:* How does predicate push down really help?
>
>
>
> How does predicate push down really help? in the following cases
>
>
>
> val df1 = spark.sql("select * from users where age > 30")
>
>
>
>  vs
>
>
>
> val df1 = spark.sql("select * from users")
>
> df.filter("age > 30")
>
>
>
>
>


RE: Nested UDFs

2016-11-16 Thread Mendelson, Assaf
Regexp_replace is supposed to receive a column, you don’t need to write a UDF 
for it.
Instead try:
Test_data.select(regexp_Replace(test_data.name, ‘a’, ‘X’)

You would need a Udf if you would wanted to do something on the string value of 
a single row (e.g. return data + “bla”)

Assaf.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi]
Sent: Thursday, November 17, 2016 9:15 AM
To: user@spark.apache.org
Subject: Nested UDFs

Hi,

Shouldn't this work?

from pyspark.sql.functions import regexp_replace, udf

def my_f(data):
return regexp_replace(data, 'a', 'X')
my_udf = udf(my_f)

test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
test_data.select(my_udf(test_data.name)).show()

But instead of 'a' being replaced with 'X' I get exception:
  File 
".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
 line 1471, in regexp_replace
jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern, 
replacement)
AttributeError: 'NoneType' object has no attribute '_jvm'

???

-Perttu



RE: How does predicate push down really help?

2016-11-16 Thread Mendelson, Assaf
Actually, both you translate to the same plan.
When you do sql(“some code”) or filter, it doesn’t actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let’s say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")




Nested UDFs

2016-11-16 Thread Perttu Ranta-aho
Hi,

Shouldn't this work?

from pyspark.sql.functions import regexp_replace, udf

def my_f(data):
return regexp_replace(data, 'a', 'X')
my_udf = udf(my_f)

test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
test_data.select(my_udf(test_data.name)).show()

But instead of 'a' being replaced with 'X' I get exception:
  File
".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
line 1471, in regexp_replace
jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern,
replacement)
AttributeError: 'NoneType' object has no attribute '_jvm'

???

-Perttu


How does predicate push down really help?

2016-11-16 Thread kant kodali
How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")


Re: Kafka segmentation

2016-11-16 Thread bo yang
I did not remember what exact configuration I was using. That link has some
good information! Thanks Cody!


On Wed, Nov 16, 2016 at 5:32 PM, Cody Koeninger  wrote:

> Yeah, if you're reporting issues, please be clear as to whether
> backpressure is enabled, and whether maxRatePerPartition is set.
>
> I expect that there is something wrong with backpressure, see e.g.
> https://issues.apache.org/jira/browse/SPARK-18371
>
> On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
> > I hit similar issue with Spark Streaming. The batch size seemed a little
> > random. Sometime it was large with many Kafka messages inside same batch,
> > sometimes it was very small with just a few messages. Is it possible that
> > was caused by the backpressure implementation in Spark Streaming?
> >
> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger 
> wrote:
> >>
> >> Moved to user list.
> >>
> >> I'm not really clear on what you're trying to accomplish (why put the
> >> csv file through Kafka instead of reading it directly with spark?)
> >>
> >> auto.offset.reset=largest just means that when starting the job
> >> without any defined offsets, it will start at the highest (most
> >> recent) available offsets.  That's probably not what you want if
> >> you've already loaded csv lines into kafka.
> >>
> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien  >
> >> wrote:
> >> > Hi all,
> >> >
> >> > I would like to ask a question related to the size of Kafka stream. I
> >> > want
> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
> get
> >> > the
> >> > output from Kafka and then save to Hive by using SparkSQL. The file
> csv
> >> > is
> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
> >> > integer). I see that Spark Streaming first received two
> >> > partitions/batches,
> >> > the first is of 60K messages and the second is of 50K msgs. But from
> the
> >> > third batch, Spark just received 200 messages for each batch (or
> >> > partition).
> >> > I think that this problem is coming from Kafka or some configuration
> in
> >> > Spark. I already tried to configure with the setting
> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
> >> >
> >> > Could you please tell me how to fix this problem?
> >> > Thank you so much.
> >> >
> >> > Best regards,
> >> > Alex
> >> >
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: Very long pause/hang at end of execution

2016-11-16 Thread Michael Johnson
On Wed, Nov 16, 2016 at 10:44 AM Aniket Bhatnagar  
wrote:
Thanks for sharing the thread dump. I had a look at them and couldn't find 
anything unusual. Is there anything in the logs (driver + executor) that 
suggests what's going on? Also, what does the spark job do and what is the 
version of spark and hadoop you are using?

I haven't seen anything in the logs; when I observed it happening before, in 
local mode, the last output before the hang would be a log statement from my 
code (that is, I had a log4j logger and was calling info() on that logger). 
That was also the last line of my main() function. Then, I saw no more output, 
neither from the driver nor the executors. I have seen the pause be as short as 
a few minutes, or approaching an hour. As far as I can tell, when it continues, 
the log statements look more or less normal.
Locally, I'm using Spark 2.0.1 built for Hadoop 2.7, but without installing 
Hadoop. Remotely, I'm running on Google Cloud Dataproc, which also uses Spark 
2.0.1, along with Hadoop 2.7.3. I've had it happen both locally and remotely.
The job loads data from a text file (using SparkContext.textFile()), and then 
splits each line and converts it into an array of integers. From there, I do 
some sketching (the data encodes either a tree, a graph, or text, and I create 
a fixed-length sketch that probabilistically produces similar results for 
similar nodes in the tree/graph). I then do some lightweight clustering on the 
sketches, and save the cluster assignments to a text file.
For what it's worth, when I look at the GC stats from the UI, they seem a bit 
high (they can be as high as 1 minute GC for a 15 minute run). However, those 
stats do not change during the pause period.
On Wed, Nov 16, 2016 at 2:48 AM Aniket Bhatnagar  
wrote:
Also, how are you launching the application? Through spark submit or creating 
spark content in your app? 


I'm calling spark-submit, and then within my app I call 
SparkContext.getOrCreate() to get a context. I then call sc.textFile() to load 
my data into an RDD, and then perform various actions on that. I tried adding a 
call to sc.stop() at the very end, after seeing some discussion that that might 
be necessary, but it didn't seem to make a difference.
The strange thing is that this behavior comes and goes. I tried opening the UI, 
as Pietro suggested, but that didn't seem to trigger it for me; I haven't 
figured out what, if anything, will make it happen every time.
On Wednesday, November 16, 2016 4:41 AM, Pietro Pugni  
wrote:

I have the same issue with Spark 2.0.1, Java 1.8.x and pyspark. I also use 
SparkSQL and JDBC. My application runs locally. It happens only of I connect to 
the UI during Spark execution and even if I close the browser before the 
execution ends. I observed this behaviour both on macOS Sierra and Red Hat 6.7

That is interesting that you are seeing this too. I can't get it to happen by 
using the UI...but I also am having difficulty making it happen at all right 
now. (Only trying locally at the moment.)
   

Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Nathan Lande
If you are dealing with a bunch of different schemas in 1 field, figuring
out a strategy to deal with that will depend on your data and does not
really have anything to do with spark since mapping your JSON payloads to
tractable data structures will depend on business logic.

The strategy of pulling out a blob into its on rdd and feeding it into the
JSON loader should work for any data source once you have your data
strategy figured out.

On Wed, Nov 16, 2016 at 4:39 PM, kant kodali  wrote:

> 1. I have a Cassandra Table where one of the columns is blob. And this
> blob contains a JSON encoded String however not all the blob's across the
> Cassandra table for that column are same (some blobs have difference json's
> than others) so In that case what is the best way to approach it? Do we
> need to put /group all the JSON Blobs that have same structure (same keys)
> into each individual data frame? For example, say if I have 5 json blobs
> that have same structure and another 3 JSON blobs that belongs to some
> other structure In this case do I need to create two data frames? (Attached
> is a screen shot of 2 rows of how my json looks like)
> 2. In my case, toJSON on RDD doesn't seem to help a lot. Attached a screen
> shot. Looks like I got the same data frame as my original one.
>
> Thanks much for these examples.
>
>
>
> On Wed, Nov 16, 2016 at 2:54 PM, Nathan Lande 
> wrote:
>
>> I'm looking forward to 2.1 but, in the meantime, you can pull out the
>> specific column into an RDD of JSON objects, pass this RDD into the
>> read.json() and then join the results back onto your initial DF.
>>
>> Here is an example of what we do to unpack headers from Avro log data:
>>
>> def jsonLoad(path):
>> #
>> #load in the df
>> raw = (sqlContext.read
>> .format('com.databricks.spark.avro')
>> .load(path)
>> )
>> #
>> #define json blob, add primary key elements (hi and lo)
>> #
>> JSONBlob = concat(
>> lit('{'),
>> concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
>> lit(',')),
>> concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
>> lit(',')),
>> concat(lit('"response":'), decode('requestResponse.response',
>> 'UTF-8')),
>> lit('}')
>> )
>> #
>> #extract the JSON blob as a string
>> rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
>> #
>> #transform the JSON string into a DF struct object
>> structuredJSON = sqlContext.read.json(rawJSONString)
>> #
>> #join the structured JSON back onto the initial DF using the hi and
>> lo join keys
>> final = (raw.join(structuredJSON,
>> ((raw['header.eventId.lo'] == structuredJSON['lo']) &
>> (raw['header.eventId.hi'] == structuredJSON['hi'])),
>> 'left_outer')
>> .drop('hi')
>> .drop('lo')
>> )
>> #
>> #win
>> return final
>>
>> On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon 
>>> wrote:
>>>
 Maybe it sounds like you are looking for from_json/to_json functions
 after en/decoding properly.

>>>
>>> Which are new built-in functions that will be released with Spark 2.1.
>>>
>>
>>
>


Re: Best practice for preprocessing feature with DataFrame

2016-11-16 Thread Divya Gehlot
Hi,

You can use the Column functions provided by Spark API

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Hope this helps .

Thanks,
Divya


On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:

> Hi,
> I have a sample, like:
> +---+--++
> |age|gender| city_id|
> +---+--++
> |   | 1|1042015:city_2044...|
> |90s| 2|1042015:city_2035...|
> |80s| 2|1042015:city_2061...|
> +---+--++
>
> and expectation is:
> "age":  90s -> 90, 80s -> 80
> "gender": 1 -> "male", 2 -> "female"
>
> I have two solutions:
> 1. Handle each column separately,  and then join all by index.
> val age = input.select("age").map(...)
> val gender = input.select("gender").map(...)
> val result = ...
>
> 2. Write utf function for each column, and then use in together:
>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
>
> However, both are awkward,
>
> Does anyone have a better work flow?
> Write some custom Transforms and use pipeline?
>
> Thanks.
>
>
>
>


Best practice for preprocessing feature with DataFrame

2016-11-16 Thread Yan Facai
Hi,
I have a sample, like:
+---+--++
|age|gender| city_id|
+---+--++
|   | 1|1042015:city_2044...|
|90s| 2|1042015:city_2035...|
|80s| 2|1042015:city_2061...|
+---+--++

and expectation is:
"age":  90s -> 90, 80s -> 80
"gender": 1 -> "male", 2 -> "female"

I have two solutions:
1. Handle each column separately,  and then join all by index.
val age = input.select("age").map(...)
val gender = input.select("gender").map(...)
val result = ...

2. Write utf function for each column, and then use in together:
 val result = input.select(ageUDF($"age"), genderUDF($"gender"))

However, both are awkward,

Does anyone have a better work flow?
Write some custom Transforms and use pipeline?

Thanks.


Configure spark.kryoserializer.buffer.max at runtime does not take effect

2016-11-16 Thread bluishpenguin
Hi all, 
I would like to configure the following setting during runtime as below:

spark = (SparkSession
.builder
.appName("ElasticSearchIndex")
.config("spark.kryoserializer.buffer.max", "1g")
.getOrCreate())

But I still hit error, 
Caused by: org.apache.spark.SparkException: Kryo serialization failed:
Buffer overflow. Available: 0, required: 1614707. To avoid this, increase
spark.kryoserializer.buffer.max value.

It works when configure it along with the spark-submit command as below:
spark-submit pyspark-shell-main --name "PySparkShell" --conf
spark.kryoserializer.buffer.max=1g

Any idea what have I done wrong?

Thank you.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configure-spark-kryoserializer-buffer-max-at-runtime-does-not-take-effect-tp28094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi Tariq and Jon,

At first thanks for quick response. I really appreciate that.

Well, I would like to start from the very begging of using Kafka with
Spark. For example, in the Spark distribution, I found an example using
Kafka with Spark streaming that demonstrates a Direct Kafka Word Count
example. In that example, I found the main class
*JavaDirectKafkaWordCount.java* under the
spark-2.0.0-bin-hadoop2.7\examples\src\main\java\org\apache\spark\examples\streaming
directory) that contains a code segment as follows:


---*-
String brokers = args[0];
String topics = args[1];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new
SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(20));

Set topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
---*-

In this code block, the confusing part is setting the values of two command
line arguments (i.e., *brokers *and *topics*). I tried to set them as
follows:

String brokers = "localhost:8890,localhost:8892";
String topics = " topic1,topic2";

However, I know this is not the right way to do so. But there has to have
the correct ways of setting the value of the brokers and topics.

Now, the thing is that I need help how to set/configure these two
parameters so that I can run this hello world like example successfully.
Any kind of help would be highly appreciated.




Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 17 November 2016 at 03:08, Jon Gregg  wrote:

> Since you're completely new to Kafka, I would start with the Kafka docs (
> https://kafka.apache.org/documentation).  You should be able to get
> through the Getting Started part easily and there are some examples for
> setting up a basic Kafka server.
>
> You don't need Kafka to start working with Spark Streaming (there are
> examples online to pull directly from Twitter, for example).  But at a high
> level if you're sending data from one server to another, it can be
> beneficial to send the messages to a distributed queue first for durable
> storage (so data doesn't get lost in transmission) and other benefits.
>
> On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq 
> wrote:
>
>> Hi Karim,
>>
>> Are you looking for something specific? Some information about your
>> usecase would be really  helpful in order to answer your question.
>>
>>
>> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi All,
>>>
>>> I am completely new with Kafka. I was wondering if somebody could
>>> provide me some guidelines on how to develop real-time streaming
>>> applications using Spark Streaming API with Kafka.
>>>
>>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>>> real life example should be better to start?
>>>
>>>
>>>
>>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim* BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> 
>>>
>>
>>
>> --
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Jon Gregg
Since you're completely new to Kafka, I would start with the Kafka docs (
https://kafka.apache.org/documentation).  You should be able to get through
the Getting Started part easily and there are some examples for setting up
a basic Kafka server.

You don't need Kafka to start working with Spark Streaming (there are
examples online to pull directly from Twitter, for example).  But at a high
level if you're sending data from one server to another, it can be
beneficial to send the messages to a distributed queue first for durable
storage (so data doesn't get lost in transmission) and other benefits.

On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq  wrote:

> Hi Karim,
>
> Are you looking for something specific? Some information about your
> usecase would be really  helpful in order to answer your question.
>
>
> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi All,
>>
>> I am completely new with Kafka. I was wondering if somebody could provide
>> me some guidelines on how to develop real-time streaming applications using
>> Spark Streaming API with Kafka.
>>
>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>> real life example should be better to start?
>>
>>
>>
>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim* BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>
>
> --
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
>


Re: Spark R guidelines for non-spark functions and coxph (Cox Regression for Time-Dependent Covariates)

2016-11-16 Thread Yanbo Liang
Hi Pietro,

Actually we have implemented R survreg() counterpart in Spark: Accelerated
failure time model. You can refer AFTSurvivalRegression if you use
Scala/Java/Python. For SparkR users, you can try spark.survreg().
The algorithms is completely distributed and return the same solution with
native R survreg().
Since Cox regression model is not easy to be trained distributed, we choose
to implement survreg() rather than coxph() for Spark. However, the
coefficients of AFT survival regression model is related of the Cox
regression model. You can check whether spark.survreg() satisfies your
requirements.
BTW, I'm the author of Spark AFTSurvivalRegression. Any more questions,
please feel free to let me know.

http://spark.apache.org/docs/latest/ml-classification-regression.html#survival-regression
http://spark.apache.org/docs/latest/api/R/index.html

Thanks
Yanbo

On Tue, Nov 15, 2016 at 9:46 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> I think the answer to this depends on what granularity you want to run
> the algorithm on. If its on the entire Spark DataFrame and if you
> except the data frame to be very large then it isn't easy to use the
> existing R function. However if you want to run the algorithm on
> smaller subsets of the data you can look at the support for UDFs we
> have in SparkR at
> http://spark.apache.org/docs/latest/sparkr.html#applying-
> user-defined-function
>
> Thanks
> Shivaram
>
> On Tue, Nov 15, 2016 at 3:56 AM, pietrop  wrote:
> > Hi all,
> > I'm writing here after some intensive usage on pyspark and SparkSQL.
> > I would like to use a well known function in the R world: coxph() from
> the
> > survival package.
> > From what I understood, I can't parallelize a function like coxph()
> because
> > it isn't provided with the SparkR package. In other words, I should
> > implement a SparkR compatible algorithm instead of using coxph().
> > I have no chance to make coxph() parallelizable, right?
> > More generally, I think this is true for any non-spark function which
> only
> > accept data.frame format as the data input.
> >
> > Do you plan to implement the coxph() counterpart in Spark? The most
> useful
> > version of this model is the Cox Regression Model for Time-Dependent
> > Covariates, which is missing from ANY ML framework as far as I know.
> >
> > Thank you
> >  Pietro
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-R-guidelines-for-non-spark-
> functions-and-coxph-Cox-Regression-for-Time-Dependent-
> Covariates-tp28077.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Yeah, if you're reporting issues, please be clear as to whether
backpressure is enabled, and whether maxRatePerPartition is set.

I expect that there is something wrong with backpressure, see e.g.
https://issues.apache.org/jira/browse/SPARK-18371

On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
> I hit similar issue with Spark Streaming. The batch size seemed a little
> random. Sometime it was large with many Kafka messages inside same batch,
> sometimes it was very small with just a few messages. Is it possible that
> was caused by the backpressure implementation in Spark Streaming?
>
> On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger  wrote:
>>
>> Moved to user list.
>>
>> I'm not really clear on what you're trying to accomplish (why put the
>> csv file through Kafka instead of reading it directly with spark?)
>>
>> auto.offset.reset=largest just means that when starting the job
>> without any defined offsets, it will start at the highest (most
>> recent) available offsets.  That's probably not what you want if
>> you've already loaded csv lines into kafka.
>>
>> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien 
>> wrote:
>> > Hi all,
>> >
>> > I would like to ask a question related to the size of Kafka stream. I
>> > want
>> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to get
>> > the
>> > output from Kafka and then save to Hive by using SparkSQL. The file csv
>> > is
>> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> > integer). I see that Spark Streaming first received two
>> > partitions/batches,
>> > the first is of 60K messages and the second is of 50K msgs. But from the
>> > third batch, Spark just received 200 messages for each batch (or
>> > partition).
>> > I think that this problem is coming from Kafka or some configuration in
>> > Spark. I already tried to configure with the setting
>> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >
>> > Could you please tell me how to fix this problem?
>> > Thank you so much.
>> >
>> > Best regards,
>> > Alex
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-16 Thread bo yang
I hit similar issue with Spark Streaming. The batch size seemed a little
random. Sometime it was large with many Kafka messages inside same batch,
sometimes it was very small with just a few messages. Is it possible that
was caused by the backpressure implementation in Spark Streaming?

On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger  wrote:

> Moved to user list.
>
> I'm not really clear on what you're trying to accomplish (why put the
> csv file through Kafka instead of reading it directly with spark?)
>
> auto.offset.reset=largest just means that when starting the job
> without any defined offsets, it will start at the highest (most
> recent) available offsets.  That's probably not what you want if
> you've already loaded csv lines into kafka.
>
> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien 
> wrote:
> > Hi all,
> >
> > I would like to ask a question related to the size of Kafka stream. I
> want
> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to get
> the
> > output from Kafka and then save to Hive by using SparkSQL. The file csv
> is
> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
> > integer). I see that Spark Streaming first received two
> partitions/batches,
> > the first is of 60K messages and the second is of 50K msgs. But from the
> > third batch, Spark just received 200 messages for each batch (or
> partition).
> > I think that this problem is coming from Kafka or some configuration in
> > Spark. I already tried to configure with the setting
> > "auto.offset.reset=largest", but every batch only gets 200 messages.
> >
> > Could you please tell me how to fix this problem?
> > Thank you so much.
> >
> > Best regards,
> > Alex
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Moved to user list.

I'm not really clear on what you're trying to accomplish (why put the
csv file through Kafka instead of reading it directly with spark?)

auto.offset.reset=largest just means that when starting the job
without any defined offsets, it will start at the highest (most
recent) available offsets.  That's probably not what you want if
you've already loaded csv lines into kafka.

On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien  wrote:
> Hi all,
>
> I would like to ask a question related to the size of Kafka stream. I want
> to put data (e.g., file *.csv) to Kafka then use Spark streaming to get the
> output from Kafka and then save to Hive by using SparkSQL. The file csv is
> about 100MB with ~250K messages/rows (Each row has about 10 fields of
> integer). I see that Spark Streaming first received two partitions/batches,
> the first is of 60K messages and the second is of 50K msgs. But from the
> third batch, Spark just received 200 messages for each batch (or partition).
> I think that this problem is coming from Kafka or some configuration in
> Spark. I already tried to configure with the setting
> "auto.offset.reset=largest", but every batch only gets 200 messages.
>
> Could you please tell me how to fix this problem?
> Thank you so much.
>
> Best regards,
> Alex
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
Ryan,

I just wanted to provide more info. Here is my .proto file which is the
basis for generating the Person class. Thanks.

option java_package = "com.example.protos";
enum Gender {
MALE = 1;
FEMALE = 2;
}
message Address {
optional string street = 1;
optional string city = 2;
}
message Person {
optional string name = 1;
optional int32 age = 2;
optional Gender gender = 3;
repeated string tags = 4;
repeated Address addresses = 5;
}


On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande 
wrote:

> *Thanks for the response. Following is the Person class..*
>
> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
> // Do not edit!
> //
> // Protofile syntax: PROTO2
>
> package com.example.protos.demo
>
>
>
> @SerialVersionUID(0L)
> final case class Person(
> name: scala.Option[String] = None,
> age: scala.Option[Int] = None,
> gender: scala.Option[com.example.protos.demo.Gender] = None,
> tags: scala.collection.Seq[String] = Nil,
> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
> ) extends com.trueaccord.scalapb.GeneratedMessage with 
> com.trueaccord.scalapb.Message[Person] with 
> com.trueaccord.lenses.Updatable[Person] {
> @transient
> private[this] var __serializedSizeCachedValue: Int = 0
> private[this] def __computeSerializedValue(): Int = {
>   var __size = 0
>   if (name.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>   if (age.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>   if (gender.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>   tags.foreach(tags => __size += 
> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>   addresses.foreach(addresses => __size += 1 + 
> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>  + addresses.serializedSize)
>   __size
> }
> final override def serializedSize: Int = {
>   var read = __serializedSizeCachedValue
>   if (read == 0) {
> read = __computeSerializedValue()
> __serializedSizeCachedValue = read
>   }
>   read
> }
> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>   name.foreach { __v =>
> _output__.writeString(1, __v)
>   };
>   age.foreach { __v =>
> _output__.writeInt32(2, __v)
>   };
>   gender.foreach { __v =>
> _output__.writeEnum(3, __v.value)
>   };
>   tags.foreach { __v =>
> _output__.writeString(4, __v)
>   };
>   addresses.foreach { __v =>
> _output__.writeTag(5, 2)
> _output__.writeUInt32NoTag(__v.serializedSize)
> __v.writeTo(_output__)
>   };
> }
> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
> com.example.protos.demo.Person = {
>   var __name = this.name
>   var __age = this.age
>   var __gender = this.gender
>   val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= 
> this.tags)
>   val __addresses = 
> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>  ++= this.addresses)
>   var _done__ = false
>   while (!_done__) {
> val _tag__ = _input__.readTag()
> _tag__ match {
>   case 0 => _done__ = true
>   case 10 =>
> __name = Some(_input__.readString())
>   case 16 =>
> __age = Some(_input__.readInt32())
>   case 24 =>
> __gender = 
> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>   case 34 =>
> __tags += _input__.readString()
>   case 42 =>
> __addresses += 
> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
> com.example.protos.demo.Address.defaultInstance)
>   case tag => _input__.skipField(tag)
> }
>   }
>   com.example.protos.demo.Person(
>   name = __name,
>   age = __age,
>   gender = __gender,
>   tags = __tags.result(),
>   addresses = __addresses.result()
>   )
> }
> def getName: String = name.getOrElse("")
> def clearName: Person = copy(name = None)
> def withName(__v: String): Person = copy(name = Some(__v))
> def getAge: Int = age.getOrElse(0)
> def clearAge: Person = copy(age = None)
> def withAge(__v: Int): Person = copy(age = Some(__v))
> def getGender: com.example.protos.demo.Gender = 
> gender.getOrElse(com.example.protos.demo.Gender.MALE)
> def clearGender: Person = copy(gender = None)
> def withGender(__v: com.example.protos.demo.Gender): Person = copy(gender 
> = Some(__v))
> def clearTags = copy(tags = scala.collection.Seq.empty)
> def addTags(__vs: String*): Person = addAllTags(__vs)
> def addAllTags(__vs: 

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
*Thanks for the response. Following is the Person class..*

// Generated by the Scala Plugin for the Protocol Buffer Compiler.
// Do not edit!
//
// Protofile syntax: PROTO2

package com.example.protos.demo



@SerialVersionUID(0L)
final case class Person(
name: scala.Option[String] = None,
age: scala.Option[Int] = None,
gender: scala.Option[com.example.protos.demo.Gender] = None,
tags: scala.collection.Seq[String] = Nil,
addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
) extends com.trueaccord.scalapb.GeneratedMessage with
com.trueaccord.scalapb.Message[Person] with
com.trueaccord.lenses.Updatable[Person] {
@transient
private[this] var __serializedSizeCachedValue: Int = 0
private[this] def __computeSerializedValue(): Int = {
  var __size = 0
  if (name.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
  if (age.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
  if (gender.isDefined) { __size +=
com.google.protobuf.CodedOutputStream.computeEnumSize(3,
gender.get.value) }
  tags.foreach(tags => __size +=
com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
  addresses.foreach(addresses => __size += 1 +
com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
+ addresses.serializedSize)
  __size
}
final override def serializedSize: Int = {
  var read = __serializedSizeCachedValue
  if (read == 0) {
read = __computeSerializedValue()
__serializedSizeCachedValue = read
  }
  read
}
def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
  name.foreach { __v =>
_output__.writeString(1, __v)
  };
  age.foreach { __v =>
_output__.writeInt32(2, __v)
  };
  gender.foreach { __v =>
_output__.writeEnum(3, __v.value)
  };
  tags.foreach { __v =>
_output__.writeString(4, __v)
  };
  addresses.foreach { __v =>
_output__.writeTag(5, 2)
_output__.writeUInt32NoTag(__v.serializedSize)
__v.writeTo(_output__)
  };
}
def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream):
com.example.protos.demo.Person = {
  var __name = this.name
  var __age = this.age
  var __gender = this.gender
  val __tags =
(scala.collection.immutable.Vector.newBuilder[String] ++= this.tags)
  val __addresses =
(scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
++= this.addresses)
  var _done__ = false
  while (!_done__) {
val _tag__ = _input__.readTag()
_tag__ match {
  case 0 => _done__ = true
  case 10 =>
__name = Some(_input__.readString())
  case 16 =>
__age = Some(_input__.readInt32())
  case 24 =>
__gender =
Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
  case 34 =>
__tags += _input__.readString()
  case 42 =>
__addresses +=
com.trueaccord.scalapb.LiteParser.readMessage(_input__,
com.example.protos.demo.Address.defaultInstance)
  case tag => _input__.skipField(tag)
}
  }
  com.example.protos.demo.Person(
  name = __name,
  age = __age,
  gender = __gender,
  tags = __tags.result(),
  addresses = __addresses.result()
  )
}
def getName: String = name.getOrElse("")
def clearName: Person = copy(name = None)
def withName(__v: String): Person = copy(name = Some(__v))
def getAge: Int = age.getOrElse(0)
def clearAge: Person = copy(age = None)
def withAge(__v: Int): Person = copy(age = Some(__v))
def getGender: com.example.protos.demo.Gender =
gender.getOrElse(com.example.protos.demo.Gender.MALE)
def clearGender: Person = copy(gender = None)
def withGender(__v: com.example.protos.demo.Gender): Person =
copy(gender = Some(__v))
def clearTags = copy(tags = scala.collection.Seq.empty)
def addTags(__vs: String*): Person = addAllTags(__vs)
def addAllTags(__vs: TraversableOnce[String]): Person = copy(tags
= tags ++ __vs)
def withTags(__v: scala.collection.Seq[String]): Person = copy(tags = __v)
def clearAddresses = copy(addresses = scala.collection.Seq.empty)
def addAddresses(__vs: com.example.protos.demo.Address*): Person =
addAllAddresses(__vs)
def addAllAddresses(__vs:
TraversableOnce[com.example.protos.demo.Address]): Person =
copy(addresses = addresses ++ __vs)
def withAddresses(__v:
scala.collection.Seq[com.example.protos.demo.Address]): Person =
copy(addresses = __v)
def getField(__field:
com.google.protobuf.Descriptors.FieldDescriptor): scala.Any = {
  __field.getNumber match {
case 1 => name.getOrElse(null)
case 2 => age.getOrElse(null)
case 3 => gender.map(_.valueDescriptor).getOrElse(null)

SparkILoop doesn't run

2016-11-16 Thread Mohit Jaggi
I am trying to use SparkILoop to write some tests(shown below) but the test 
hangs with the following stack trace. Any idea what is going on?


import org.apache.log4j.{Level, LogManager}
import org.apache.spark.repl.SparkILoop
import org.scalatest.{BeforeAndAfterAll, FunSuite}

class SparkReplSpec extends FunSuite with BeforeAndAfterAll {

  override def beforeAll(): Unit = {
  }

  override def afterAll(): Unit = {
  }

  test("yay!") {
val rootLogger = LogManager.getRootLogger
val logLevel = rootLogger.getLevel
rootLogger.setLevel(Level.ERROR)

val output = SparkILoop.run(
  """
|println("hello")
  """.stripMargin)

println(s" $output ")

  }
}

/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java 
-Dspark.master=local[*] -Didea.launcher.port=7532 
"-Didea.launcher.bin.path=/Applications/IntelliJ IDEA CE.app/Contents/bin" 
-Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application 

Any with S3 experience with Spark? Having ListBucket issues

2016-11-16 Thread Edden Burrow
Anyone dealing with a lot of files with spark?  We're trying s3a with 2.0.1
because we're seeing intermittent errors in S3 where jobs fail and
saveAsText file fails. Using pyspark.

Is there any issue with working in a S3 folder that has too many files?
How about having versioning enabled? Are these things going to be a problem?

We're pre-building the s3 file list and storing it in a file and passing
that to textFile as a long comma separated list of files - So we are not
running list files.

But we get errors with saveAsText file, related to ListBucket.  Even though
we're not using wildcard '*'.

org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
Failed to parse XML document with handler class
org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler


Running spark 2.0.1 with the s3a protocol.

thanks


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread Shixiong(Ryan) Zhu
Could you provide the Person class?

On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande 
wrote:

> I am using 2.11.8. Thanks
>
> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>> known race conditions in reflection and the Scala community doesn't have
>> plan to fix it (http://docs.scala-lang.org/ov
>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>> upgrading to Scala 2.11.
>>
>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> I am using protobuf to encode. This may not be related to the new
>>> release issue
>>>
>>> Exception in thread "main" scala.ScalaReflectionException:  is
>>> not a term
>>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(S
>>> ymbols.scala:84)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>>> tParams(ScalaReflection.scala:811)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>>> ms(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>>> ructorParameters(ScalaReflection.scala:800)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>>> rParameters(ScalaReflection.scala:39)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:582)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:460)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:592)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>>> ly(ScalaReflection.scala:583)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>>> aversableLike.scala:252)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>>> ke.scala:252)
>>> at scala.collection.immutable.List.flatMap(List.scala:344)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>>> ark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflect
>>> ion.scala:583)
>>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>>> (ScalaReflection.scala:425)
>>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>>> ply(ExpressionEncoder.scala:61)
>>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>>> cits.scala:47)
>>> at PersonConsumer$.main(PersonConsumer.scala:33)
>>> at PersonConsumer.main(PersonConsumer.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>
>>> The following is my code ...
>>>
>>> object PersonConsumer {
>>>   import org.apache.spark.rdd.RDD
>>>   import com.trueaccord.scalapb.spark._
>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>   import com.example.protos.demo._
>>>
>>>   def main(args : Array[String]) {
>>>
>>> def parseLine(s: String): Person =
>>>   Person.parseFrom(
>>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>>
>>> val spark = SparkSession.builder.
>>>   master("local")
>>>   .appName("spark session example")
>>>   .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>> val ds1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>>
>>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>>
>>> val ds3 = ds2.map(str => 
>>> parseLine(str)).createOrReplaceTempView("persons")
>>>
>>> val ds4 = spark.sqlContext.sql("select name from persons")
>>>
>>> val query = ds4.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>> query.awaitTermination()
>>>   }
>>> }
>>>
>>>
>>
>


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
I am using 2.11.8. Thanks

On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu  wrote:

> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
> known race conditions in reflection and the Scala community doesn't have
> plan to fix it (http://docs.scala-lang.org/overviews/reflection/thread-
> safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11.
>
> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> I am using protobuf to encode. This may not be related to the new release
>> issue
>>
>> Exception in thread "main" scala.ScalaReflectionException:  is not
>> a term
>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(
>> Symbols.scala:84)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>> tParams(ScalaReflection.scala:811)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>> ms(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>> ructorParameters(ScalaReflection.scala:800)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>> rParameters(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:582)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:460)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:592)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:583)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:252)
>> at scala.collection.immutable.List.flatMap(List.scala:344)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:583)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>> (ScalaReflection.scala:425)
>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>> ply(ExpressionEncoder.scala:61)
>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>> cits.scala:47)
>> at PersonConsumer$.main(PersonConsumer.scala:33)
>> at PersonConsumer.main(PersonConsumer.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> The following is my code ...
>>
>> object PersonConsumer {
>>   import org.apache.spark.rdd.RDD
>>   import com.trueaccord.scalapb.spark._
>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>   import com.example.protos.demo._
>>
>>   def main(args : Array[String]) {
>>
>> def parseLine(s: String): Person =
>>   Person.parseFrom(
>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val ds1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>
>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>
>> val ds3 = ds2.map(str => 
>> parseLine(str)).createOrReplaceTempView("persons")
>>
>> val ds4 = spark.sqlContext.sql("select name from persons")
>>
>> val query = ds4.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>> query.awaitTermination()
>>   }
>> }
>>
>>
>


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread Shixiong(Ryan) Zhu
Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
known race conditions in reflection and the Scala community doesn't have
plan to fix it (
http://docs.scala-lang.org/overviews/reflection/thread-safety.html) AFAIK,
the only way to fix it is upgrading to Scala 2.11.

On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande 
wrote:

> I am using protobuf to encode. This may not be related to the new release
> issue
>
> Exception in thread "main" scala.ScalaReflectionException:  is not
> a term
> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
> at scala.reflect.internal.Symbols$SymbolContextApiImpl.
> asTerm(Symbols.scala:84)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(
> ScalaReflection.scala:811)
> at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(
> ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$class.
> getConstructorParameters(ScalaReflection.scala:800)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> getConstructorParameters(ScalaReflection.scala:39)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:582)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:460)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:592)
> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.
> apply(ScalaReflection.scala:583)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:252)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$
> spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.
> scala:583)
> at org.apache.spark.sql.catalyst.ScalaReflection$.
> serializerFor(ScalaReflection.scala:425)
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.
> apply(ExpressionEncoder.scala:61)
> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
> at org.apache.spark.sql.SQLImplicits.newProductEncoder(
> SQLImplicits.scala:47)
> at PersonConsumer$.main(PersonConsumer.scala:33)
> at PersonConsumer.main(PersonConsumer.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
> The following is my code ...
>
> object PersonConsumer {
>   import org.apache.spark.rdd.RDD
>   import com.trueaccord.scalapb.spark._
>   import org.apache.spark.sql.{SQLContext, SparkSession}
>   import com.example.protos.demo._
>
>   def main(args : Array[String]) {
>
> def parseLine(s: String): Person =
>   Person.parseFrom(
> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>
> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val ds1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>
> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>
> val ds3 = ds2.map(str => 
> parseLine(str)).createOrReplaceTempView("persons")
>
> val ds4 = spark.sqlContext.sql("select name from persons")
>
> val query = ds4.writeStream
>   .outputMode("append")
>   .format("console")
>   .start()
> query.awaitTermination()
>   }
> }
>
>


How to propagate R_LIBS to sparkr executors

2016-11-16 Thread Rodrick Brown
I’m having an issue with a R module not getting picked up on the slave nodes in 
mesos. I have the following environment value R_LIBS set and for some reason 
this environment is only set in the driver context and not the executor is 
their a way to pass environment values down the executor level in sparkr? 

I’m using Mesos 1.0.1 and Spark 2.0.1 

Thanks.  


-- 
 
Rodrick Brown / Site Reliability Engineer 
+1 917 445 6839 / rodr...@orchardplatform.com 

Orchard Platform 
101 5th Avenue, 4th Floor, New York, NY 10003 
http://www.orchardplatform.com 
Orchard Blog  | Marketplace Lending 
Meetup 

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.


RE: submitting a spark job using yarn-client and getting NoClassDefFoundError: org/apache/spark/Logging

2016-11-16 Thread David Robison
I’ve gotten a little further along. It now submits the job via Yarn, but now 
the jobs exit immediately with the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:646)
at 
org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I’ve checked and the class does live in the spark assembly. Any thoughts as 
what might be wrong?


Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: David Robison [mailto:david.robi...@psgglobal.net]
Sent: Wednesday, November 16, 2016 9:04 AM
To: Rohit Verma 
Cc: user@spark.apache.org
Subject: RE: Problem submitting a spark job using yarn-client as master


This sender failed our fraud detection checks and may not be who they appear to 
be. Learn about spoofing

Feedback

Unfortunately, it doesn’t get that far in my code where I have a SparkContext 
from which to set the Hadoop config parameters. Here is my Java code:

SparkConf sparkConf = new SparkConf()
   .setJars(new String[] { 
"file:///opt/wildfly/mapreduce/mysparkjob-5.0.0.jar", })
   .setSparkHome("/usr/hdp/" + getHdpVersion() + "/spark")
   .set("fs.defaultFS", config.get("fs.defaultFS"))
   ;
sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);

The job dies in the constructor of the JavaSparkContext. I have a logging call 
right after creating the SparkContext and it is never executied.
Any idea what I’m doing wrong? David

Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: Rohit Verma [mailto:rohit.ve...@rokittech.com]
Sent: Tuesday, November 15, 2016 9:27 PM
To: David Robison 
>
Cc: user@spark.apache.org
Subject: Re: Problem submitting a spark job using yarn-client as master

you can set hdfs as defaults,

sparksession.sparkContext().hadoopConfiguration().set("fs.defaultFS", 
“hdfs://master_node:8020”);

Regards
Rohit

On Nov 16, 2016, at 3:15 AM, David Robison 
> wrote:

I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 

Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
I am using protobuf to encode. This may not be related to the new release
issue

Exception in thread "main" scala.ScalaReflectionException:  is not a
term
at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
at
scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811)
at
org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800)
at
org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.immutable.List.flatMap(List.scala:344)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583)
at
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
at
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)
at PersonConsumer$.main(PersonConsumer.scala:33)
at PersonConsumer.main(PersonConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

The following is my code ...

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

def parseLine(s: String): Person =
  Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val ds1 = 
spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

val ds4 = spark.sqlContext.sql("select name from persons")

val query = ds4.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()
  }
}


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Mohammad Tariq
Hi Karim,

Are you looking for something specific? Some information about your usecase
would be really  helpful in order to answer your question.

On Wednesday, November 16, 2016, Karim, Md. Rezaul <
rezaul.ka...@insight-centre.org> wrote:

> Hi All,
>
> I am completely new with Kafka. I was wondering if somebody could provide
> me some guidelines on how to develop real-time streaming applications using
> Spark Streaming API with Kafka.
>
> I am aware the Spark Streaming  and Kafka integration [1]. However, a real
> life example should be better to start?
>
>
>
> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>


-- 


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



[SQL/Catalyst] Janino Generated Code Debugging

2016-11-16 Thread Aleksander Eskilson
Hi there,

I have some jobs generating Java code (via Janino) that I would like to
inspect more directly during runtime. The Janino page seems to indicate an
environmental variable can be set to support debugging the generated code,
allowing one to step into it directly and inspect variables and set
breakpoints. I'm using Intellij and setting both

-Dorg.codehaus.janino.source_debugging.enable=true
-Dorg.codehaus.janino.source_debugging.dir=/Users/username/path/to/project/src

but when I begin debug, I can't seem to view the generated code, even if I
set a breakpoint to the location that calls it and attempt to step into the
code, or reference a line of the stacktrace that should take me into the
code. Any idea how to properly set Janino to debug the Catalyst-generated
code more directly?

Best,
Alek


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Michael Armbrust
On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon  wrote:

> Maybe it sounds like you are looking for from_json/to_json functions after
> en/decoding properly.
>

Which are new built-in functions that will be released with Spark 2.1.


RE: Spark UI shows Jobs are processing, but the files are already written to S3

2016-11-16 Thread Shreya Agarwal
I think that is a bug. I have seen that a lot especially with long running jobs 
where Spark skips a lot of stages because it has pre-computed results. And some 
of these are never marked as completed, even though in reality they are. I 
figured this out because I was using the interactive shell (spark-shell) and 
the shell came up to a prompt indicating the job had finished even though there 
were a lot of Active jobs and tasks according to the UI. And my output is 
correct.

Is there a JIRA item tracking this?

From: Kuchekar [mailto:kuchekar.nil...@gmail.com]
Sent: Wednesday, November 16, 2016 10:00 AM
To: spark users 
Subject: Spark UI shows Jobs are processing, but the files are already written 
to S3

Hi,

 I am running a spark job, which saves the computed data (massive data) to 
S3. On  the Spark Ui I see the some jobs are active, but no activity in the 
logs. Also on S3 all the data has be written (verified each bucket --> it has 
_SUCCESS file)

Am I missing something?

Thanks.
Kuchekar, Nilesh


Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi All,

I am completely new with Kafka. I was wondering if somebody could provide
me some guidelines on how to develop real-time streaming applications using
Spark Streaming API with Kafka.

I am aware the Spark Streaming  and Kafka integration [1]. However, a real
life example should be better to start?



1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html





Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Spark UI shows Jobs are processing, but the files are already written to S3

2016-11-16 Thread Kuchekar
Hi,

 I am running a spark job, which saves the computed data (massive data)
to S3. On  the Spark Ui I see the some jobs are active, but no activity in
the logs. Also on S3 all the data has be written (verified each bucket -->
it has _SUCCESS file)

Am I missing something?

Thanks.
Kuchekar, Nilesh


RE: CSV to parquet preserving partitioning

2016-11-16 Thread neil90
All you need to do is load all the files into one dataframe at once. Then
save the dataframe using partitionBy -

df.write.format("parquet").partitionBy("directoryCol").save("hdfs://path")

Then if you look at the new folder it should look like how you want it I.E -
hdfs://path/dir=dir1/part-r-xxx.gz.parquet 
hdfs://path/dir=dir2/part-r-yyy.gz.parquet 
hdfs://path/dir=dir3/part-r-zzz.gz.parquet 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-16 Thread Hyukjin Kwon
It seems a bit weird. Could we open an issue and talk in the repository
link I sent?

Let me try to reproduce your case with your data if possible.

On 17 Nov 2016 2:26 a.m., "Arun Patel"  wrote:

> I tried below options.
>
> 1) Increase executor memory.  Increased up to maximum possibility 14GB.
> Same error.
> 2) Tried new version - spark-xml_2.10:0.4.1.  Same error.
> 3) Tried with low level rowTags.  It worked for lower level rowTag and
> returned 16000 rows.
>
> Are there any workarounds for this issue?  I tried playing with 
> spark.memory.fraction
> and spark.memory.storageFraction.  But, it did not help.  Appreciate your
> help on this!!!
>
>
>
> On Tue, Nov 15, 2016 at 8:44 PM, Arun Patel 
> wrote:
>
>> Thanks for the quick response.
>>
>> Its a single XML file and I am using a top level rowTag.  So, it creates
>> only one row in a Dataframe with 5 columns. One of these columns will
>> contain most of the data as StructType.  Is there a limitation to store
>> data in a cell of a Dataframe?
>>
>> I will check with new version and try to use different rowTags and
>> increase executor-memory tomorrow. I will open a new issue as well.
>>
>>
>>
>> On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon 
>> wrote:
>>
>>> Hi Arun,
>>>
>>>
>>> I have few questions.
>>>
>>> Dose your XML file have like few huge documents? In this case of a row
>>> having a huge size like (like 500MB), it would consume a lot of memory
>>>
>>> becuase at least it should hold a row to iterate if I remember
>>> correctly. I remember this happened to me before while processing a huge
>>> record for test purpose.
>>>
>>>
>>> How about trying to increase --executor-memory?
>>>
>>>
>>> Also, you could try to select only few fields to prune the data with the
>>> latest version just to doubly sure if you don't mind?.
>>>
>>>
>>> Lastly, do you mind if I ask to open an issue in
>>> https://github.com/databricks/spark-xml/issues if you still face this
>>> problem?
>>>
>>> I will try to take a look at my best.
>>>
>>>
>>> Thank you.
>>>
>>>
>>> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>>>
 I am trying to read an XML file which is 1GB is size.  I am getting an
 error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
 limit' after reading 7 partitions in local mode.  In Yarn mode, it
 throws 'java.lang.OutOfMemoryError: Java heap space' error after
 reading 3 partitions.

 Any suggestion?

 PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
 --jars / tmp/spark-xml_2.10-0.3.3.jar



 Dataframe Creation Command:   df = sqlContext.read.format('com.da
 tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')



 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
 (TID 1) in 25978 ms on localhost (1/10)

 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728

 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID
 2). 2309 bytes result sent to driver

 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
 (TID 3, localhost, partition 3,ANY, 2266 bytes)

 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)

 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
 (TID 2) in 51001 ms on localhost (2/10)

 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728

 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID
 3). 2309 bytes result sent to driver

 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
 (TID 4, localhost, partition 4,ANY, 2266 bytes)

 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)

 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
 (TID 3) in 24336 ms on localhost (3/10)

 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728

 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID
 4). 2309 bytes result sent to driver

 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
 (TID 5, localhost, partition 5,ANY, 2266 bytes)

 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)

 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
 (TID 4) in 20895 ms on localhost (4/10)

 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
 hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728

 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID
 5). 2309 bytes result sent to driver

 16/11/15 

Re: use case reading files split per id

2016-11-16 Thread ruben
Yes that binary files function looks interesting, thanks for the tip.

Some followup questions:

- I always wonder when people are talking about 'small' files and 'large'
files. Is there any rule of thumb when these things apply? Are small files
those which can fit completely in memory on the node and large files do not?

- If it works similarly to wholeTextFiles it will give me tuples like this:
(/base/id1/file1, contentA)
(/base/id1/file2, contentB)
...
(/base/id2/file1, contentC)
(/base/id2/file2, contentD)
...

since I want to end up with tuples like:
(id1, parsedContentA ++ parsedContentB ++ ...)
(id2, parsedContentC ++ parsedContentD ++ ...)

would reduceByKey be the best function to accomplish this?
will using dataFrames give me any benefits here?
This will end up with some shuffling of parsedContent's which are
List[(Timestamp, RecordData)] right? but I guess this is not really
something which can be avoided.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044p28086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ML DataFrame API - need cosine similarity, how to convert to RDD Vectors?

2016-11-16 Thread Russell Jurney
Asher, can you cast like that? Does that casting work? That is my
confusion: I don't know what a DataFrame Vector turns into in terms of an
RDD type.

I'll try this, thanks.

On Tue, Nov 15, 2016 at 11:25 AM, Asher Krim  wrote:

> What language are you using? For Java, you might convert the dataframe to
> an rdd using something like this:
>
> df
> .toJavaRDD()
> .map(row -> (SparseVector)row.getAs(row.fieldIndex("columnName")));
>
> On Tue, Nov 15, 2016 at 1:06 PM, Russell Jurney 
> wrote:
>
>> I have two dataframes with common feature vectors and I need to get the
>> cosine similarity of one against the other. It looks like this is possible
>> in the RDD based API, mllib, but not in ml.
>>
>> So, how do I convert my sparse dataframe vectors into something spark
>> mllib can use? I've searched, but haven't found anything.
>>
>> Thanks!
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>
>
>
> --
> Asher Krim
> Senior Software Engineer
>



-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: Spark-xml - OutOfMemoryError: Requested array size exceeds VM limit

2016-11-16 Thread Arun Patel
I tried below options.

1) Increase executor memory.  Increased up to maximum possibility 14GB.
Same error.
2) Tried new version - spark-xml_2.10:0.4.1.  Same error.
3) Tried with low level rowTags.  It worked for lower level rowTag and
returned 16000 rows.

Are there any workarounds for this issue?  I tried playing with
spark.memory.fraction
and spark.memory.storageFraction.  But, it did not help.  Appreciate your
help on this!!!



On Tue, Nov 15, 2016 at 8:44 PM, Arun Patel  wrote:

> Thanks for the quick response.
>
> Its a single XML file and I am using a top level rowTag.  So, it creates
> only one row in a Dataframe with 5 columns. One of these columns will
> contain most of the data as StructType.  Is there a limitation to store
> data in a cell of a Dataframe?
>
> I will check with new version and try to use different rowTags and
> increase executor-memory tomorrow. I will open a new issue as well.
>
>
>
> On Tue, Nov 15, 2016 at 7:52 PM, Hyukjin Kwon  wrote:
>
>> Hi Arun,
>>
>>
>> I have few questions.
>>
>> Dose your XML file have like few huge documents? In this case of a row
>> having a huge size like (like 500MB), it would consume a lot of memory
>>
>> becuase at least it should hold a row to iterate if I remember correctly.
>> I remember this happened to me before while processing a huge record for
>> test purpose.
>>
>>
>> How about trying to increase --executor-memory?
>>
>>
>> Also, you could try to select only few fields to prune the data with the
>> latest version just to doubly sure if you don't mind?.
>>
>>
>> Lastly, do you mind if I ask to open an issue in
>> https://github.com/databricks/spark-xml/issues if you still face this
>> problem?
>>
>> I will try to take a look at my best.
>>
>>
>> Thank you.
>>
>>
>> 2016-11-16 9:12 GMT+09:00 Arun Patel :
>>
>>> I am trying to read an XML file which is 1GB is size.  I am getting an
>>> error 'java.lang.OutOfMemoryError: Requested array size exceeds VM
>>> limit' after reading 7 partitions in local mode.  In Yarn mode, it
>>> throws 'java.lang.OutOfMemoryError: Java heap space' error after
>>> reading 3 partitions.
>>>
>>> Any suggestion?
>>>
>>> PySpark Shell Command:pyspark --master local[4] --driver-memory 3G
>>> --jars / tmp/spark-xml_2.10-0.3.3.jar
>>>
>>>
>>>
>>> Dataframe Creation Command:   df = sqlContext.read.format('com.da
>>> tabricks.spark.xml').options(rowTag='GGL').load('GGL_1.2G.xml')
>>>
>>>
>>>
>>> 16/11/15 18:27:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>>> (TID 1) in 25978 ms on localhost (1/10)
>>>
>>> 16/11/15 18:27:04 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:268435456+134217728
>>>
>>> 16/11/15 18:27:55 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:27:55 INFO TaskSetManager: Starting task 3.0 in stage 0.0
>>> (TID 3, localhost, partition 3,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:27:55 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
>>>
>>> 16/11/15 18:27:55 INFO TaskSetManager: Finished task 2.0 in stage 0.0
>>> (TID 2) in 51001 ms on localhost (2/10)
>>>
>>> 16/11/15 18:27:55 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:402653184+134217728
>>>
>>> 16/11/15 18:28:19 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:28:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0
>>> (TID 4, localhost, partition 4,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:28:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
>>>
>>> 16/11/15 18:28:19 INFO TaskSetManager: Finished task 3.0 in stage 0.0
>>> (TID 3) in 24336 ms on localhost (3/10)
>>>
>>> 16/11/15 18:28:19 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:536870912+134217728
>>>
>>> 16/11/15 18:28:40 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:28:40 INFO TaskSetManager: Starting task 5.0 in stage 0.0
>>> (TID 5, localhost, partition 5,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:28:40 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
>>>
>>> 16/11/15 18:28:40 INFO TaskSetManager: Finished task 4.0 in stage 0.0
>>> (TID 4) in 20895 ms on localhost (4/10)
>>>
>>> 16/11/15 18:28:40 INFO NewHadoopRDD: Input split:
>>> hdfs://singlenodevm:8020/user/arunp/GGL_1.2G.xml:671088640+134217728
>>>
>>> 16/11/15 18:29:01 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5).
>>> 2309 bytes result sent to driver
>>>
>>> 16/11/15 18:29:01 INFO TaskSetManager: Starting task 6.0 in stage 0.0
>>> (TID 6, localhost, partition 6,ANY, 2266 bytes)
>>>
>>> 16/11/15 18:29:01 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
>>>
>>> 16/11/15 18:29:01 INFO TaskSetManager: Finished task 5.0 in stage 0.0
>>> (TID 5) in 20793 ms on localhost (5/10)
>>>
>>> 16/11/15 18:29:01 INFO NewHadoopRDD: Input split:
>>> 

RE: CSV to parquet preserving partitioning

2016-11-16 Thread benoitdr
Yes, by parsing the file content, it's possible to recover in which directory 
they are.

From: neil90 [via Apache Spark User List] 
[mailto:ml-node+s1001560n28083...@n3.nabble.com]
Sent: mercredi 16 novembre 2016 17:41
To: Drooghaag, Benoit (Nokia - BE) 
Subject: Re: CSV to parquet preserving partitioning

Is there anything in the files to let you know which directory they should be 
in?

If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
To unsubscribe from CSV to parquet preserving partitioning, click 
here.
NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: CSV to parquet preserving partitioning

2016-11-16 Thread neil90
Is there anything in the files to let you know which directory they should be
in?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28083.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: AVRO File size when caching in-memory

2016-11-16 Thread Shreya Agarwal
Ah, yes. Nested schemas should be avoided if you want the best memory usage.

Sent from my Windows 10 phone

From: Prithish
Sent: Wednesday, November 16, 2016 12:48 AM
To: Takeshi Yamamuro
Cc: Shreya Agarwal; 
user@spark.apache.org
Subject: Re: AVRO File size when caching in-memory

It's something like the schema shown below (with several additional 
levels/sublevels)

root
 |-- sentAt: long (nullable = true)
 |-- sharing: string (nullable = true)
 |-- receivedAt: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- story: struct (nullable = true)
 ||-- super: string (nullable = true)
 ||-- lang: string (nullable = true)
 ||-- setting: string (nullable = true)
 ||-- myapp: struct (nullable = true)
 |||-- id: string (nullable = true)
 |||-- ver: string (nullable = true)
 |||-- build: string (nullable = true)
 ||-- comp: struct (nullable = true)
 |||-- notes: string (nullable = true)
 |||-- source: string (nullable = true)
 |||-- name: string (nullable = true)
 |||-- content: string (nullable = true)
 |||-- sub: string (nullable = true)
 ||-- loc: struct (nullable = true)
 |||-- city: string (nullable = true)
 |||-- country: string (nullable = true)
 |||-- lat: double (nullable = true)
 |||-- long: double (nullable = true)

On Wed, Nov 16, 2016 at 2:08 PM, Takeshi Yamamuro 
> wrote:
Hi,

What's the schema interpreted by spark?
A compression logic of the spark caching depends on column types.

// maropu


On Wed, Nov 16, 2016 at 5:26 PM, Prithish 
> wrote:
Thanks for your response.

I did some more tests and I am seeing that when I have a flatter structure for 
my AVRO, the cache memory use is close to the CSV. But, when I use few levels 
of nesting, the cache memory usage blows up. This is really critical for 
planning the cluster we will be using. To avoid using a larger cluster, looks 
like, we will have to consider keeping the structure flat as much as possible.

On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal 
> wrote:
(Adding user@spark back to the discussion)

Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope for 
compression. On the other hand avro and parquet are already compressed and just 
store extra schema info, afaik. Avro and parquet are both going to make your 
data smaller, parquet through compressed columnar storage, and avro through its 
binary data format.

Next, talking about the 62kb becoming 1224kb. I actually do not see such a 
massive blow up. The avro you shared is 28kb on my system and becomes 53.7kb 
when cached in memory deserialized and 52.9kb when cached In memory serialized. 
Exact same numbers with parquet as well. This is expected behavior, if I am not 
wrong.

In fact, now that I think about it, even larger blow ups might be valid, since 
your data must have been deserialized from the compressed avro format, making 
it bigger. The order of magnitude of difference in size would depend on the 
type of data you have and how well it was compressable.

The purpose of these formats is to store data to persistent storage in a way 
that's faster to read from, not to reduce cache-memory usage.

Maybe others here have more info to share.

Regards,
Shreya

Sent from my Windows 10 phone

From: Prithish
Sent: Tuesday, November 15, 2016 11:04 PM
To: Shreya Agarwal
Subject: Re: AVRO File size when caching in-memory

I did another test and noting my observations here. These were done with the 
same data in avro and csv formats.

In AVRO, the file size on disk was 62kb and after caching, the in-memory size 
is 1224kb
In CSV, the file size on disk was 690kb and after caching, the in-memory size 
is 290kb

I'm guessing that the spark caching is not able to compress when the source is 
avro. Not sure if this is just my immature conclusion. Waiting to hear your 
observation.

On Wed, Nov 16, 2016 at 12:14 PM, Prithish 
> wrote:
Thanks for your response.

I have attached the code (that I ran using the Spark-shell) as well as a sample 
avro file. After you run this code, the data is cached in memory and you can go 
to the "storage" tab on the Spark-ui (localhost:4040) and see the size it uses. 
In this example the size is small, but in my actual scenario, the source file 
size is 30GB and the in-memory size comes to around 800GB. I am trying to 
understand if this is expected when using avro or not.

On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
> wrote:
I haven't used Avro ever. But if you can send over a quick sample 

HttpFileServer behavior in 1.6.3

2016-11-16 Thread Kai Wang
Hi

I am running Spark 1.6.3 along with Spark Jobserver. I notice some
interesting behaviors of HttpFileServer.

When I destroy a SparkContext, HttpFileServer doesn't release the
port. If I don't specify spark.fileserver.port, the next HttpFileServer
binds to a new random port (as expected). However if I do want
HttpFileServer to use a well known port for firewall purpose, the next
HttpFileServer will try to bind to the port, it will fail then try the
port+1 until either it can find an open port or max retries is exceeded.

I feel HttpFileServer should be shutdown when SparkContext is destroyed. Is
this a bug in Spark or SJS?


Re: Log-loss for multiclass classification

2016-11-16 Thread janardhan shetty
I am sure some work might be in pipeline as it is a normal evaluation
criteria. Any thoughts or links ?

On Nov 15, 2016 11:15 AM, "janardhan shetty"  wrote:

> Hi,
>
> Best practice for multi class classification technique is to evaluate the
> model by *log-loss*.
> Is there any jira or work going on to implement the same in
>
> *MulticlassClassificationEvaluator*
>
> Currently it supports following :
> (supports "f1" (default), "weightedPrecision", "weightedRecall",
> "accuracy")
>


RE: Problem submitting a spark job using yarn-client as master

2016-11-16 Thread David Robison
Unfortunately, it doesn’t get that far in my code where I have a SparkContext 
from which to set the Hadoop config parameters. Here is my Java code:

SparkConf sparkConf = new SparkConf()
   .setJars(new String[] { 
"file:///opt/wildfly/mapreduce/mysparkjob-5.0.0.jar", })
   .setSparkHome("/usr/hdp/" + getHdpVersion() + "/spark")
   .set("fs.defaultFS", config.get("fs.defaultFS"))
   ;
sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);

The job dies in the constructor of the JavaSparkContext. I have a logging call 
right after creating the SparkContext and it is never executied.
Any idea what I’m doing wrong? David

Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: Rohit Verma [mailto:rohit.ve...@rokittech.com]
Sent: Tuesday, November 15, 2016 9:27 PM
To: David Robison 
Cc: user@spark.apache.org
Subject: Re: Problem submitting a spark job using yarn-client as master

you can set hdfs as defaults,

sparksession.sparkContext().hadoopConfiguration().set("fs.defaultFS", 
“hdfs://master_node:8020”);

Regards
Rohit

On Nov 16, 2016, at 3:15 AM, David Robison 
> wrote:

I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
15:39:38,102 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
view acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
modify acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(wildfly, hdfs); users with modify permissions: Set(wildfly, 
hdfs)
15:39:38,138 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Submitting application 5 to ResourceManager
15:39:38,256 INFO  [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] 
(default task-1) Submitted application application_1479240217825_0005
15:39:39,269 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:39,279 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1479242378159
final status: UNDEFINED
tracking URL: 
http://vb1.localdomain:8088/proxy/application_1479240217825_0005/
user: hdfs
15:39:40,285 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:41,290 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: FAILED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: Application application_1479240217825_0005 
failed 2 times due to AM Container for appattempt_1479240217825_0005_02 
exited with  exitCode: -1000
For more detailed output, check application tracking 

RE: CSV to parquet preserving partitioning

2016-11-16 Thread Drooghaag, Benoit (Nokia - BE)
Good point, thanks !

That does the job from the moment the datasets corresponding to each input 
directory contain a single partition.

Question now is how to achieve this without shuffling the data ?
I’m using the databricks csv reader on spark 1.6 and I don’t think there is a 
way to control the partitioning.
As I can see, it creates one partition per csv file, so the data from one input 
directory can be puzzled accross the nodes ...

From: Daniel Siegmann [mailto:dsiegm...@securityscorecard.io]
Sent: mardi 15 novembre 2016 18:57
To: Drooghaag, Benoit (Nokia - BE) 
Cc: user 
Subject: Re: CSV to parquet preserving partitioning

Did you try unioning the datasets for each CSV into a single dataset? You may 
need to put the directory name into a column so you can partition by it.
On Tue, Nov 15, 2016 at 8:44 AM, benoitdr 
> wrote:
Hello,

I'm trying to convert a bunch of csv files to parquet, with the interesting
case that the input csv files are already "partitioned" by directory.
All the input files have the same set of columns.
The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files and write their data to a parquet table in
hdfs, preserving the partitioning (partitioned by input directory), and such
as there is a single output file per partition.
The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet


The best solution I have found so far is to loop among the input
directories, loading the csv files in a dataframe and to write the dataframe
in the target partition.
But this not efficient since I want a single output file per partition, the
writing to hdfs is a single tasks that blocks the loop.
I wonder how to achieve this with a maximum of parallelism (and without
shuffling the data in the cluster).

Thanks !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



problem deploying spark-jobserver on CentOS

2016-11-16 Thread Reza zade
Hi

I'm going to deploy jobserver on my CentOS (spark is installed with cdh5.7).

I'm using oracle jdk1.8, sbt-0.13.13, spark-1.6.0 and jobserver-0.6.2.

When I run sbt command (after running sbt publish-local) I encountered the
bellow message :

[cloudera@quickstart spark-jobserver]$ sbt
[info] Loading project definition from /home/cloudera/spark-
jobserver/project
Missing bintray credentials /home/cloudera/.bintray/.credentials. Some
bintray features depend on this.
fatal: Not a git repository (or any of the parent directories): .git
fatal: Not a git repository (or any of the parent directories): .git
fatal: Not a git repository (or any of the parent directories): .git
fatal: Not a git repository (or any of the parent directories): .git
Missing bintray credentials /home/cloudera/.bintray/.credentials. Some
bintray features depend on this.
Missing bintray credentials /home/cloudera/.bintray/.credentials. Some
bintray features depend on this.
Missing bintray credentials /home/cloudera/.bintray/.credentials. Some
bintray features depend on this.
[info] Set current project to root (in build file:/home/cloudera/spark-
jobserver/)
> re-start
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 35 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 7 ms
[success] created output: /home/cloudera/spark-jobserver/job-server/target
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 5 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] created output: /home/cloudera/spark-jobserver/job-server-api/
target
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 11 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] created output: /home/cloudera/spark-jobserver/akka-app/target
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 5 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] created output: /home/cloudera/spark-jobserver/job-server-tests/
target
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 0 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 0 ms
[success] created output: /home/cloudera/spark-jobserver/target
[info] scalastyle using config /home/cloudera/spark-
jobserver/scalastyle-config.xml
[info] Processed 13 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] created output: /home/cloudera/spark-jobserver/job-server-extras/
target
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server-api...
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server-api...
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server-api...
[info] Updating {file:/home/cloudera/spark-jobserver/}akka-app...
[info] Updating {file:/home/cloudera/spark-jobserver/}akka-app...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server-tests...
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server...
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server...
[info] Resolving org.scala-lang#scala-library;2.10.6 ...
[info] Updating {file:/home/cloudera/spark-jobserver/}job-server...
[info] Resolving org.uncommons.maths#uncommons-maths;1.2.2a ...
[warn] Credentials file /home/cloudera/.bintray/.credentials does not exist
[warn] Credentials file 

Re: Writing parquet table using spark

2016-11-16 Thread Dirceu Semighini Filho
Hello,
Have you configured this property?
spark.sql.parquet.compression.codec



2016-11-16 6:40 GMT-02:00 Vaibhav Sinha :

> Hi,
> I am using hiveContext.sql() method to select data from source table and
> insert into parquet tables.
> The query executed from spark takes about 3x more disk space to write
> the same number of rows compared to when fired from impala.
> Just wondering if this is normal behaviour and if there's a way to
> control this.
>
> Best
> Vaibhav.
>
>
> --
> Sent from my iPhone.
>


Map and MapParitions with partition-local variable

2016-11-16 Thread Zsolt Tóth
Hi,

I need to run a map() and a mapPartitions() on my input DF. As a
side-effect of the map(), a partition-local variable should be updated,
that is used in the mapPartitions() afterwards.
I can't use Broadcast variable, because it's shared between partitions on
the same executor.

Where can I define this variable?
I could run a single mapPartitions() that defines the variable, iterates
over the input (just as the map() would do), collect the result into an
ArrayList, and then use the list's iterator (and the updated
partition-local variable) as the input of the transformation that the
original mapPartitions() did.

It feels however, that this is not as optimal as running
map()+mapPartitions() because I need to store the ArrayList (which is
basically the whole data in the partition) in memory.

Thanks,
Zsolt


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread Hyukjin Kwon
Maybe it sounds like you are looking for from_json/to_json functions after
en/decoding properly.

On 16 Nov 2016 6:45 p.m., "kant kodali"  wrote:

>
>
> https://spark.apache.org/docs/2.0.2/sql-programming-guide.
> html#json-datasets
>
> "Spark SQL can automatically infer the schema of a JSON dataset and load
> it as a DataFrame. This conversion can be done using
> SQLContext.read.json() on either an RDD of String, or a JSON file."
>
> val df = spark.sql("SELECT json_encoded_blob_column from table_name"); //
> A cassandra query (cassandra stores blobs in hexadecimal )   
> json_encoded_blob_column
> is encoded in hexadecimal. It will be great to have these blobs interpreted
> and be loaded as a data frame but for now is there anyway to load or parse
> json_encoded_blob_column into a data frame?
>
>
>
>
>
>


Re: Very long pause/hang at end of execution

2016-11-16 Thread Aniket Bhatnagar
Also, how are you launching the application? Through spark submit or
creating spark content in your app?

Thanks,
Aniket

On Wed, Nov 16, 2016 at 10:44 AM Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks for sharing the thread dump. I had a look at them and couldn't find
> anything unusual. Is there anything in the logs (driver + executor) that
> suggests what's going on? Also, what does the spark job do and what is the
> version of spark and hadoop you are using?
>
> Thanks,
> Aniket
>
>
> On Wed, Nov 16, 2016 at 2:07 AM Michael Johnson 
> wrote:
>
> The extremely long hand/pause has started happening again. I've been
> running on a small remote cluster, so I used the UI to grab thread dumps
> rather than doing it from the command line. There seems to be one executor
> still alive, along with the driver; I grabbed 4 thread dumps from each, a
> couple of seconds apart. I'd greatly appreciate any help tracking down
> what's going on! (I've attached them, but I can paste them somewhere if
> that's more convenient.)
>
> Thanks,
> Michael
>
>
>
>
> On Sunday, November 6, 2016 10:49 PM, Michael Johnson
>  wrote:
>
>
> Hm. Something must have changed, as it was happening quite consistently
> and now I can't get it to reproduce. Thank you for the offer, and if it
> happens again I will try grabbing thread dumps and I will see if I can
> figure out what is going on.
>
>
> On Sunday, November 6, 2016 10:02 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> I doubt it's GC as you mentioned that the pause is several minutes. Since
> it's reproducible in local mode, can you run the spark application locally
> and once your job is complete (and application appears paused), can you
> take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
> with 1 second delay between each dump and attach them? I can take a look.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson 
> wrote:
>
> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
>  wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>
>
>
>
>


Re: Very long pause/hang at end of execution

2016-11-16 Thread Aniket Bhatnagar
Thanks for sharing the thread dump. I had a look at them and couldn't find
anything unusual. Is there anything in the logs (driver + executor) that
suggests what's going on? Also, what does the spark job do and what is the
version of spark and hadoop you are using?

Thanks,
Aniket

On Wed, Nov 16, 2016 at 2:07 AM Michael Johnson 
wrote:

> The extremely long hand/pause has started happening again. I've been
> running on a small remote cluster, so I used the UI to grab thread dumps
> rather than doing it from the command line. There seems to be one executor
> still alive, along with the driver; I grabbed 4 thread dumps from each, a
> couple of seconds apart. I'd greatly appreciate any help tracking down
> what's going on! (I've attached them, but I can paste them somewhere if
> that's more convenient.)
>
> Thanks,
> Michael
>
>
>
>
> On Sunday, November 6, 2016 10:49 PM, Michael Johnson
>  wrote:
>
>
> Hm. Something must have changed, as it was happening quite consistently
> and now I can't get it to reproduce. Thank you for the offer, and if it
> happens again I will try grabbing thread dumps and I will see if I can
> figure out what is going on.
>
>
> On Sunday, November 6, 2016 10:02 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> I doubt it's GC as you mentioned that the pause is several minutes. Since
> it's reproducible in local mode, can you run the spark application locally
> and once your job is complete (and application appears paused), can you
> take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
> with 1 second delay between each dump and attach them? I can take a look.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson 
> wrote:
>
> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson
>  wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>
>
>
>
>


How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-16 Thread kant kodali
https://spark.apache.org/docs/2.0.2/sql-programming-guide.html#json-datasets

"Spark SQL can automatically infer the schema of a JSON dataset and load it
as a DataFrame. This conversion can be done using SQLContext.read.json() on
either an RDD of String, or a JSON file."

val df = spark.sql("SELECT json_encoded_blob_column from table_name"); // A
cassandra query (cassandra stores blobs in hexadecimal )
json_encoded_blob_column
is encoded in hexadecimal. It will be great to have these blobs interpreted
and be loaded as a data frame but for now is there anyway to load or parse
json_encoded_blob_column into a data frame?


Re: Very long pause/hang at end of execution

2016-11-16 Thread Pietro Pugni
I have the same issue with Spark 2.0.1, Java 1.8.x and pyspark. I also use
SparkSQL and JDBC. My application runs locally. It happens only of I
connect to the UI during Spark execution and even if I close the browser
before the execution ends. I observed this behaviour both on macOS Sierra
and Red Hat 6.7

Il 16 nov 2016 3:09 AM, "Michael Johnson" 
ha scritto:

> The extremely long hand/pause has started happening again. I've been
> running on a small remote cluster, so I used the UI to grab thread dumps
> rather than doing it from the command line. There seems to be one executor
> still alive, along with the driver; I grabbed 4 thread dumps from each, a
> couple of seconds apart. I'd greatly appreciate any help tracking down
> what's going on! (I've attached them, but I can paste them somewhere if
> that's more convenient.)
>
> Thanks,
> Michael
>
>
>
>
> On Sunday, November 6, 2016 10:49 PM, Michael Johnson <
> mjjohnson@yahoo.com.INVALID> wrote:
>
>
> Hm. Something must have changed, as it was happening quite consistently
> and now I can't get it to reproduce. Thank you for the offer, and if it
> happens again I will try grabbing thread dumps and I will see if I can
> figure out what is going on.
>
>
> On Sunday, November 6, 2016 10:02 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> I doubt it's GC as you mentioned that the pause is several minutes. Since
> it's reproducible in local mode, can you run the spark application locally
> and once your job is complete (and application appears paused), can you
> take 5 thread dumps (using jstack or jcmd on the local spark JVM process)
> with 1 second delay between each dump and attach them? I can take a look.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 2:21 PM Michael Johnson 
> wrote:
>
> Thanks; I tried looking at the thread dumps for the driver and the one
> executor that had that option in the UI, but I'm afraid I don't know how to
> interpret what I saw...  I don't think it could be my code directly, since
> at this point my code has all completed? Could GC be taking that long?
>
> (I could also try grabbing the thread dumps and pasting them here, if that
> would help?)
>
> On Sunday, November 6, 2016 8:36 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>
> In order to know what's going on, you can study the thread dumps either
> from spark UI or from any other thread dump analysis tool.
>
> Thanks,
> Aniket
>
> On Sun, Nov 6, 2016 at 1:31 PM Michael Johnson 
> 
> wrote:
>
> I'm doing some processing and then clustering of a small dataset (~150
> MB). Everything seems to work fine, until the end; the last few lines of my
> program are log statements, but after printing those, nothing seems to
> happen for a long time...many minutes; I'm not usually patient enough to
> let it go, but I think one time when I did just wait, it took over an hour
> (and did eventually exit on its own). Any ideas on what's happening, or how
> to troubleshoot?
>
> (This happens both when running locally, using the localhost mode, as well
> as on a small cluster with four 4-processor nodes each with 15GB of RAM; in
> both cases the executors have 2GB+ of RAM, and none of the inputs/outputs
> on any of the stages is more than 75 MB...)
>
> Thanks,
> Michael
>
>
>
>
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: AVRO File size when caching in-memory

2016-11-16 Thread Prithish
It's something like the schema shown below (with several additional
levels/sublevels)

root
 |-- sentAt: long (nullable = true)
 |-- sharing: string (nullable = true)
 |-- receivedAt: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- story: struct (nullable = true)
 ||-- super: string (nullable = true)
 ||-- lang: string (nullable = true)
 ||-- setting: string (nullable = true)
 ||-- myapp: struct (nullable = true)
 |||-- id: string (nullable = true)
 |||-- ver: string (nullable = true)
 |||-- build: string (nullable = true)
 ||-- comp: struct (nullable = true)
 |||-- notes: string (nullable = true)
 |||-- source: string (nullable = true)
 |||-- name: string (nullable = true)
 |||-- content: string (nullable = true)
 |||-- sub: string (nullable = true)
 ||-- loc: struct (nullable = true)
 |||-- city: string (nullable = true)
 |||-- country: string (nullable = true)
 |||-- lat: double (nullable = true)
 |||-- long: double (nullable = true)

On Wed, Nov 16, 2016 at 2:08 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> What's the schema interpreted by spark?
> A compression logic of the spark caching depends on column types.
>
> // maropu
>
>
> On Wed, Nov 16, 2016 at 5:26 PM, Prithish  wrote:
>
>> Thanks for your response.
>>
>> I did some more tests and I am seeing that when I have a flatter
>> structure for my AVRO, the cache memory use is close to the CSV. But, when
>> I use few levels of nesting, the cache memory usage blows up. This is
>> really critical for planning the cluster we will be using. To avoid using a
>> larger cluster, looks like, we will have to consider keeping the structure
>> flat as much as possible.
>>
>> On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal 
>> wrote:
>>
>>> (Adding user@spark back to the discussion)
>>>
>>>
>>>
>>> Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of
>>> scope for compression. On the other hand avro and parquet are already
>>> compressed and just store extra schema info, afaik. Avro and parquet are
>>> both going to make your data smaller, parquet through compressed columnar
>>> storage, and avro through its binary data format.
>>>
>>>
>>>
>>> Next, talking about the 62kb becoming 1224kb. I actually do not see such
>>> a massive blow up. The avro you shared is 28kb on my system and becomes
>>> 53.7kb when cached in memory deserialized and 52.9kb when cached In memory
>>> serialized. Exact same numbers with parquet as well. This is expected
>>> behavior, if I am not wrong.
>>>
>>>
>>>
>>> In fact, now that I think about it, even larger blow ups might be valid,
>>> since your data must have been deserialized from the compressed avro
>>> format, making it bigger. The order of magnitude of difference in size
>>> would depend on the type of data you have and how well it was compressable.
>>>
>>>
>>>
>>> The purpose of these formats is to store data to persistent storage in a
>>> way that's faster to read from, not to reduce cache-memory usage.
>>>
>>>
>>>
>>> Maybe others here have more info to share.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Shreya
>>>
>>>
>>>
>>> Sent from my Windows 10 phone
>>>
>>>
>>>
>>> *From: *Prithish 
>>> *Sent: *Tuesday, November 15, 2016 11:04 PM
>>> *To: *Shreya Agarwal 
>>> *Subject: *Re: AVRO File size when caching in-memory
>>>
>>>
>>> I did another test and noting my observations here. These were done with
>>> the same data in avro and csv formats.
>>>
>>> In AVRO, the file size on disk was 62kb and after caching, the in-memory
>>> size is 1224kb
>>> In CSV, the file size on disk was 690kb and after caching, the in-memory
>>> size is 290kb
>>>
>>> I'm guessing that the spark caching is not able to compress when the
>>> source is avro. Not sure if this is just my immature conclusion. Waiting to
>>> hear your observation.
>>>
>>> On Wed, Nov 16, 2016 at 12:14 PM, Prithish  wrote:
>>>
 Thanks for your response.

 I have attached the code (that I ran using the Spark-shell) as well as
 a sample avro file. After you run this code, the data is cached in memory
 and you can go to the "storage" tab on the Spark-ui (localhost:4040) and
 see the size it uses. In this example the size is small, but in my actual
 scenario, the source file size is 30GB and the in-memory size comes to
 around 800GB. I am trying to understand if this is expected when using avro
 or not.

 On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal <
 shrey...@microsoft.com> wrote:

> I haven’t used Avro ever. But if you can send over a quick sample
> code, I can run and see if I repro it and maybe debug.
>
>
>
> *From:* Prithish [mailto:prith...@gmail.com]
> *Sent:* Tuesday, November 15, 2016 8:44 PM
> *To:* Jörn Franke 

Writing parquet table using spark

2016-11-16 Thread Vaibhav Sinha
Hi,
I am using hiveContext.sql() method to select data from source table and
insert into parquet tables.
The query executed from spark takes about 3x more disk space to write
the same number of rows compared to when fired from impala.
Just wondering if this is normal behaviour and if there's a way to
control this.

Best
Vaibhav.


-- 
Sent from my iPhone.


Re: AVRO File size when caching in-memory

2016-11-16 Thread Takeshi Yamamuro
Hi,

What's the schema interpreted by spark?
A compression logic of the spark caching depends on column types.

// maropu


On Wed, Nov 16, 2016 at 5:26 PM, Prithish  wrote:

> Thanks for your response.
>
> I did some more tests and I am seeing that when I have a flatter structure
> for my AVRO, the cache memory use is close to the CSV. But, when I use few
> levels of nesting, the cache memory usage blows up. This is really critical
> for planning the cluster we will be using. To avoid using a larger cluster,
> looks like, we will have to consider keeping the structure flat as much as
> possible.
>
> On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal 
> wrote:
>
>> (Adding user@spark back to the discussion)
>>
>>
>>
>> Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope
>> for compression. On the other hand avro and parquet are already compressed
>> and just store extra schema info, afaik. Avro and parquet are both going to
>> make your data smaller, parquet through compressed columnar storage, and
>> avro through its binary data format.
>>
>>
>>
>> Next, talking about the 62kb becoming 1224kb. I actually do not see such
>> a massive blow up. The avro you shared is 28kb on my system and becomes
>> 53.7kb when cached in memory deserialized and 52.9kb when cached In memory
>> serialized. Exact same numbers with parquet as well. This is expected
>> behavior, if I am not wrong.
>>
>>
>>
>> In fact, now that I think about it, even larger blow ups might be valid,
>> since your data must have been deserialized from the compressed avro
>> format, making it bigger. The order of magnitude of difference in size
>> would depend on the type of data you have and how well it was compressable.
>>
>>
>>
>> The purpose of these formats is to store data to persistent storage in a
>> way that's faster to read from, not to reduce cache-memory usage.
>>
>>
>>
>> Maybe others here have more info to share.
>>
>>
>>
>> Regards,
>>
>> Shreya
>>
>>
>>
>> Sent from my Windows 10 phone
>>
>>
>>
>> *From: *Prithish 
>> *Sent: *Tuesday, November 15, 2016 11:04 PM
>> *To: *Shreya Agarwal 
>> *Subject: *Re: AVRO File size when caching in-memory
>>
>>
>> I did another test and noting my observations here. These were done with
>> the same data in avro and csv formats.
>>
>> In AVRO, the file size on disk was 62kb and after caching, the in-memory
>> size is 1224kb
>> In CSV, the file size on disk was 690kb and after caching, the in-memory
>> size is 290kb
>>
>> I'm guessing that the spark caching is not able to compress when the
>> source is avro. Not sure if this is just my immature conclusion. Waiting to
>> hear your observation.
>>
>> On Wed, Nov 16, 2016 at 12:14 PM, Prithish  wrote:
>>
>>> Thanks for your response.
>>>
>>> I have attached the code (that I ran using the Spark-shell) as well as a
>>> sample avro file. After you run this code, the data is cached in memory and
>>> you can go to the "storage" tab on the Spark-ui (localhost:4040) and see
>>> the size it uses. In this example the size is small, but in my actual
>>> scenario, the source file size is 30GB and the in-memory size comes to
>>> around 800GB. I am trying to understand if this is expected when using avro
>>> or not.
>>>
>>> On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal >> > wrote:
>>>
 I haven’t used Avro ever. But if you can send over a quick sample code,
 I can run and see if I repro it and maybe debug.



 *From:* Prithish [mailto:prith...@gmail.com]
 *Sent:* Tuesday, November 15, 2016 8:44 PM
 *To:* Jörn Franke 
 *Cc:* User 
 *Subject:* Re: AVRO File size when caching in-memory



 Anyone?



 On Tue, Nov 15, 2016 at 10:45 AM, Prithish  wrote:

 I am using 2.0.1 and databricks avro library 3.0.1. I am running this
 on the latest AWS EMR release.



 On Mon, Nov 14, 2016 at 3:06 PM, Jörn Franke 
 wrote:

 spark version? Are you using tungsten?


 > On 14 Nov 2016, at 10:05, Prithish  wrote:
 >
 > Can someone please explain why this happens?
 >
 > When I read a 600kb AVRO file and cache this in memory (using
 cacheTable), it shows up as 11mb (storage tab in Spark UI). I have tried
 this with different file sizes, and the size in-memory is always
 proportionate. I thought Spark compresses when using cacheTable.





>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: AVRO File size when caching in-memory

2016-11-16 Thread Prithish
Thanks for your response.

I did some more tests and I am seeing that when I have a flatter structure
for my AVRO, the cache memory use is close to the CSV. But, when I use few
levels of nesting, the cache memory usage blows up. This is really critical
for planning the cluster we will be using. To avoid using a larger cluster,
looks like, we will have to consider keeping the structure flat as much as
possible.

On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal 
wrote:

> (Adding user@spark back to the discussion)
>
>
>
> Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope
> for compression. On the other hand avro and parquet are already compressed
> and just store extra schema info, afaik. Avro and parquet are both going to
> make your data smaller, parquet through compressed columnar storage, and
> avro through its binary data format.
>
>
>
> Next, talking about the 62kb becoming 1224kb. I actually do not see such a
> massive blow up. The avro you shared is 28kb on my system and becomes
> 53.7kb when cached in memory deserialized and 52.9kb when cached In memory
> serialized. Exact same numbers with parquet as well. This is expected
> behavior, if I am not wrong.
>
>
>
> In fact, now that I think about it, even larger blow ups might be valid,
> since your data must have been deserialized from the compressed avro
> format, making it bigger. The order of magnitude of difference in size
> would depend on the type of data you have and how well it was compressable.
>
>
>
> The purpose of these formats is to store data to persistent storage in a
> way that's faster to read from, not to reduce cache-memory usage.
>
>
>
> Maybe others here have more info to share.
>
>
>
> Regards,
>
> Shreya
>
>
>
> Sent from my Windows 10 phone
>
>
>
> *From: *Prithish 
> *Sent: *Tuesday, November 15, 2016 11:04 PM
> *To: *Shreya Agarwal 
> *Subject: *Re: AVRO File size when caching in-memory
>
>
> I did another test and noting my observations here. These were done with
> the same data in avro and csv formats.
>
> In AVRO, the file size on disk was 62kb and after caching, the in-memory
> size is 1224kb
> In CSV, the file size on disk was 690kb and after caching, the in-memory
> size is 290kb
>
> I'm guessing that the spark caching is not able to compress when the
> source is avro. Not sure if this is just my immature conclusion. Waiting to
> hear your observation.
>
> On Wed, Nov 16, 2016 at 12:14 PM, Prithish  wrote:
>
>> Thanks for your response.
>>
>> I have attached the code (that I ran using the Spark-shell) as well as a
>> sample avro file. After you run this code, the data is cached in memory and
>> you can go to the "storage" tab on the Spark-ui (localhost:4040) and see
>> the size it uses. In this example the size is small, but in my actual
>> scenario, the source file size is 30GB and the in-memory size comes to
>> around 800GB. I am trying to understand if this is expected when using avro
>> or not.
>>
>> On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
>> wrote:
>>
>>> I haven’t used Avro ever. But if you can send over a quick sample code,
>>> I can run and see if I repro it and maybe debug.
>>>
>>>
>>>
>>> *From:* Prithish [mailto:prith...@gmail.com]
>>> *Sent:* Tuesday, November 15, 2016 8:44 PM
>>> *To:* Jörn Franke 
>>> *Cc:* User 
>>> *Subject:* Re: AVRO File size when caching in-memory
>>>
>>>
>>>
>>> Anyone?
>>>
>>>
>>>
>>> On Tue, Nov 15, 2016 at 10:45 AM, Prithish  wrote:
>>>
>>> I am using 2.0.1 and databricks avro library 3.0.1. I am running this on
>>> the latest AWS EMR release.
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 3:06 PM, Jörn Franke 
>>> wrote:
>>>
>>> spark version? Are you using tungsten?
>>>
>>>
>>> > On 14 Nov 2016, at 10:05, Prithish  wrote:
>>> >
>>> > Can someone please explain why this happens?
>>> >
>>> > When I read a 600kb AVRO file and cache this in memory (using
>>> cacheTable), it shows up as 11mb (storage tab in Spark UI). I have tried
>>> this with different file sizes, and the size in-memory is always
>>> proportionate. I thought Spark compresses when using cacheTable.
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: what is the optimized way to combine multiple dataframes into one dataframe ?

2016-11-16 Thread Deepak Sharma
Can you try caching the individual dataframes and then union them?
It may save you time.

Thanks
Deepak

On Wed, Nov 16, 2016 at 12:35 PM, Devi P.V  wrote:

> Hi all,
>
> I have 4 data frames with three columns,
>
> client_id,product_id,interest
>
> I want to combine these 4 dataframes into one dataframe.I used union like
> following
>
> df1.union(df2).union(df3).union(df4)
>
> But it is time consuming for bigdata.what is the optimized way for doing
> this using spark 2.0 & scala
>
>
> Thanks
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net