Re: one hot encode a column of vector

2017-04-24 Thread Yan Facai
How about using countvectorizer?
http://spark.apache.org/docs/latest/ml-features.html#countvectorizer





On Tue, Apr 25, 2017 at 9:31 AM, Zeming Yu  wrote:

> how do I do one hot encode on a column of array? e.g. ['TG', 'CA']
>
>
> FYI here's my code for one hot encoding normal categorical columns. How do I 
> make it work for a column of array?
>
>
> from pyspark.ml import Pipeline
> from pyspark.ml.feature import StringIndexer
>
> indexers = [StringIndexer(inputCol=column, 
> outputCol=column+"_index").fit(flight3) for column in list(set['ColA', 
> 'ColB', 'ColC'])]
>
> pipeline = Pipeline(stages=indexers)
> flight4 = pipeline.fit(flight3).transform(flight3)
>
>
>
>


Re: how to add new column using regular expression within pyspark dataframe

2017-04-24 Thread Yan Facai
Don't use udf, as `minute` and `unix_timestamp` are native method of
spark.sql.


scala> df.withColumn("minute", minute(unix_timestamp($"str",
"HH'h'mm'm'").cast("timestamp"))).show





On Tue, Apr 25, 2017 at 7:55 AM, Zeming Yu  wrote:

> I tried this, but doesn't seem to work. Do you know how ot fix it?
>
> def getMinutes(aString):
> return minute(unix_timestamp(aString, "HH'h'mm'm'").cast("timestamp"))
>
> udfGetMinutes = udf(getMinutes, IntegerType())
>
> flight2 = (flight2.withColumn('stop_duration1',
> udfGetMinutes(flight2.stop_duration1))
>   )
>
>
>
> On Sat, Apr 22, 2017 at 8:51 PM, 颜发才(Yan Facai) 
> wrote:
>
>> Hi, Zeming.
>>
>> I prefer to convert String to DateTime, like this:
>>
>> scala> val df = Seq("15h10m", "17h0m", "21h25m").toDF("str")
>>
>> scala> val ts = unix_timestamp($"str", "HH'h'mm'm'").cast("timestamp")
>>
>> scala> df.withColumn("minute", minute(ts)).show
>> +--+--+
>> |   str|minute|
>> +--+--+
>> |15h10m|10|
>> | 17h0m |  0|
>> |21h25m|25|
>> +--+--+
>>
>>
>> By the way, check Date-time function section of API:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org
>> .apache.spark.sql.functions$
>>
>>
>>
>>
>> On Sat, Apr 22, 2017 at 6:27 PM, Zeming Yu  wrote:
>>
>>> Thanks a lot!
>>>
>>> Just another question, how can I extract the minutes as a number?
>>>
>>> I can use:
>>> .withColumn('duration_m',split(flight.duration,'h').getItem(1)
>>>
>>> to get strings like '10m'
>>>
>>> but how do I drop the charater "m" at the end? I can use substr(), but
>>> what's the function to get the length of the string so that I can do
>>> something like substr(1, len(...)-1)?
>>>
>>> On Thu, Apr 20, 2017 at 11:36 PM, Pushkar.Gujar >> > wrote:
>>>
 Can be as  simple as -

 from pyspark.sql.functions import split

 flight.withColumn('hour',split(flight.duration,'h').getItem(0))


 Thank you,
 *Pushkar Gujar*


 On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu  wrote:

> Any examples?
>
> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:
>
>> How about using `withColumn` and UDF?
>>
>> example:
>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>> 
>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>
>>
>>
>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu 
>> wrote:
>>
>>> I've got a dataframe with a column looking like this:
>>>
>>> display(flight.select("duration").show())
>>>
>>> ++
>>> |duration|
>>> ++
>>> |  15h10m|
>>> |   17h0m|
>>> |  21h25m|
>>> |  14h30m|
>>> |  24h50m|
>>> |  26h10m|
>>> |  14h30m|
>>> |   23h5m|
>>> |  21h30m|
>>> |  11h50m|
>>> |  16h10m|
>>> |  15h15m|
>>> |  21h25m|
>>> |  14h25m|
>>> |  14h40m|
>>> |   16h0m|
>>> |  24h20m|
>>> |  14h30m|
>>> |  14h25m|
>>> |  14h30m|
>>> ++
>>> only showing top 20 rows
>>>
>>>
>>>
>>> I need to extract the hour as a number and store it as an additional
>>> column within the same dataframe. What's the best way to do that?
>>>
>>>
>>> I tried the following, but it failed:
>>>
>>> import re
>>> def getHours(x):
>>>   return re.match('([0-9]+(?=h))', x)
>>> temp = flight.select("duration").rdd.map(lambda
>>> x:getHours(x[0])).toDF()
>>> temp.select("duration").show()
>>>
>>>
>>> error message:
>>>
>>>
>>> ---Py4JJavaError
>>>  Traceback (most recent call 
>>> last) in ()  2 def 
>>> getHours(x):  3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  
>>> 5 temp.select("duration").show()
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>>> age=1)] 56 """---> 57 return 
>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59  
>>>RDD.toDF = toDF
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)
>>> 518 519 if isinstance(data, RDD):--> 520 rdd, 
>>> schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)  
>>>   521 else:522 rdd, schema = 
>>> self._createFromLocal(map(prepare, data), schema)
>>> 

Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Chawla,Sumit
Thanks a lot @ Dongjin, @Ryan

I am using Spark 1.6.  I agree with your assesment Ryan.  Further
investigation seemed to suggest that our cluster was probably at 100%
capacity at that point of time.  Though tasks were failing on that slave,
still it was accepting the task, and task  retries exhausted much faster
than the other slaves freeing up to accept those tasks.

Regards
Sumit Chawla


On Mon, Apr 24, 2017 at 9:48 AM, Ryan Blue  wrote:

> Looking at the code a bit more, it appears that blacklisting is disabled
> by default. To enable it, set spark.blacklist.enabled=true.
>
> The updates in 2.1.0 appear to provide much more fine-grained settings for
> this, like the number of tasks that can fail before an executor is
> blacklisted for a stage. In that version, you probably want to set
> spark.blacklist.task.maxTaskAttemptsPerExecutor. See the settings docs
>  and search for
> “blacklist” to see all the options.
>
> rb
> ​
>
> On Mon, Apr 24, 2017 at 9:41 AM, Ryan Blue  wrote:
>
>> Chawla,
>>
>> We hit this issue, too. I worked around it by setting
>> spark.scheduler.executorTaskBlacklistTime=5000. The problem for us was
>> that the scheduler was using locality to select the executor, even though
>> it had already failed there. The executor task blacklist time controls how
>> long the scheduler will avoid using an executor for a failed task, which
>> will cause it to avoid rescheduling on the executor. The default was 0, so
>> the executor was put back into consideration immediately.
>>
>> In 2.1.0 that setting has changed to spark.blacklist.timeout. I’m not
>> sure if that does exactly the same thing. The default for that setting is
>> 1h instead of 0. It’s better to have a non-zero default to avoid what
>> you’re seeing.
>>
>> rb
>> ​
>>
>> On Fri, Apr 21, 2017 at 1:32 PM, Chawla,Sumit 
>> wrote:
>>
>>> I am seeing a strange issue. I had a bad behaving slave that failed the
>>> entire job.  I have set spark.task.maxFailures to 8 for my job.  Seems
>>> like all task retries happen on the same slave in case of failure.  My
>>> expectation was that task will be retried on different slave in case of
>>> failure, and chance of all 8 retries to happen on same slave is very less.
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: pyspark vector

2017-04-24 Thread Peyman Mohajerian
setVocabSize


On Mon, Apr 24, 2017 at 5:36 PM, Zeming Yu  wrote:

> Hi all,
>
> Beginner question:
>
> what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])?
>
> https://spark.apache.org/docs/2.1.0/ml-features.html
>
>  id | texts   | vector
> |-|---
>  0  | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0])
>  1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])
>
>


Re: udf that handles null values

2017-04-24 Thread Zeming Yu
Thank you both!

Here's the code that's working now. It's a bit hard to read due to so many
functions. Any idea how I can improve the readability?

from pyspark.sql.functions import trim, when, from_unixtime,
unix_timestamp, minute, hour

duration_test = flight2.select("stop_duration1")
duration_test.show()


duration_test.withColumn('duration_h',
when(duration_test.stop_duration1.isNull(), 999)

.otherwise(hour(unix_timestamp(duration_test.stop_duration1,"HH'h'mm'm'").cast("timestamp".show(20)


+--+
|stop_duration1|
+--+
| 0h50m|
| 3h15m|
| 8h35m|
| 1h30m|
|12h15m|
|11h50m|
|  2h5m|
|10h25m|
| 8h20m|
|  null|
| 2h50m|
| 2h30m|
| 7h45m|
| 1h10m|
| 2h15m|
|  2h0m|
|10h25m|
| 1h40m|
| 1h55m|
| 1h40m|
+--+
only showing top 20 rows

+--+--+
|stop_duration1|duration_h|
+--+--+
| 0h50m| 0|
| 3h15m| 3|
| 8h35m| 8|
| 1h30m| 1|
|12h15m|12|
|11h50m|11|
|  2h5m| 2|
|10h25m|10|
| 8h20m| 8|
|  null|   999|
| 2h50m| 2|
| 2h30m| 2|
| 7h45m| 7|
| 1h10m| 1|
| 2h15m| 2|
|  2h0m| 2|
|10h25m|10|
| 1h40m| 1|
| 1h55m| 1|
| 1h40m| 1|
+--+--+
only showing top 20 rows





On Tue, Apr 25, 2017 at 11:29 AM, Pushkar.Gujar 
wrote:

> Someone had similar issue today at stackoverflow.
>
> http://stackoverflow.com/questions/43595201/python-how-
> to-convert-pyspark-column-to-date-type-if-there-are-null-
> values/43595728#43595728
>
>
> Thank you,
> *Pushkar Gujar*
>
>
> On Mon, Apr 24, 2017 at 8:22 PM, Zeming Yu  wrote:
>
>> hi all,
>>
>> I tried to write a UDF that handles null values:
>>
>> def getMinutes(hString, minString):
>> if (hString != None) & (minString != None): return int(hString) * 60
>> + int(minString[:-1])
>> else: return None
>>
>> flight2 = (flight2.withColumn("duration_minutes",
>> udfGetMinutes("duration_h", "duration_m")))
>>
>>
>> but I got this error:
>>
>>   File "", line 6, in getMinutes
>> TypeError: int() argument must be a string, a bytes-like object or a number, 
>> not 'NoneType'
>>
>>
>> Does anyone know how to do this?
>>
>>
>> Thanks,
>>
>> Zeming
>>
>>
>


one hot encode a column of vector

2017-04-24 Thread Zeming Yu
how do I do one hot encode on a column of array? e.g. ['TG', 'CA']


FYI here's my code for one hot encoding normal categorical columns.
How do I make it work for a column of array?


from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column,
outputCol=column+"_index").fit(flight3) for column in list(set['ColA',
'ColB', 'ColC'])]

pipeline = Pipeline(stages=indexers)
flight4 = pipeline.fit(flight3).transform(flight3)


Re: udf that handles null values

2017-04-24 Thread Pushkar.Gujar
Someone had similar issue today at stackoverflow.

http://stackoverflow.com/questions/43595201/python-how-to-convert-pyspark-column-to-date-type-if-there-are-null-values/43595728#43595728



Thank you,
*Pushkar Gujar*


On Mon, Apr 24, 2017 at 8:22 PM, Zeming Yu  wrote:

> hi all,
>
> I tried to write a UDF that handles null values:
>
> def getMinutes(hString, minString):
> if (hString != None) & (minString != None): return int(hString) * 60 +
> int(minString[:-1])
> else: return None
>
> flight2 = (flight2.withColumn("duration_minutes",
> udfGetMinutes("duration_h", "duration_m")))
>
>
> but I got this error:
>
>   File "", line 6, in getMinutes
> TypeError: int() argument must be a string, a bytes-like object or a number, 
> not 'NoneType'
>
>
> Does anyone know how to do this?
>
>
> Thanks,
>
> Zeming
>
>


Re: udf that handles null values

2017-04-24 Thread cy h
Quoting Python's Coding Style Guidelines - PEP-008 

https://www.python.org/dev/peps/pep-0008/#programming-recommendations



Comparisons to singletons like Noneshould always be done with is or is not, 
never the equality operators.



Cinyoung

2017. 4. 25. 오전 9:22 Zeming Yu  작성:

> hi all,
> 
> I tried to write a UDF that handles null values:
> 
> def getMinutes(hString, minString):
> if (hString != None) & (minString != None): return int(hString) * 60 + 
> int(minString[:-1])
> else: return None
> 
> flight2 = (flight2.withColumn("duration_minutes", udfGetMinutes("duration_h", 
> "duration_m")))
> 
> 
> but I got this error: 
>   File "", line 6, in getMinutes
> TypeError: int() argument must be a string, a bytes-like object or a number, 
> not 'NoneType'
> 
> Does anyone know how to do this?
> 
> Thanks,
> Zeming


pyspark vector

2017-04-24 Thread Zeming Yu
Hi all,

Beginner question:

what does the 3 mean in the (3,[0,1,2],[1.0,1.0,1.0])?

https://spark.apache.org/docs/2.1.0/ml-features.html

 id | texts   | vector
|-|---
 0  | Array("a", "b", "c")| (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])


udf that handles null values

2017-04-24 Thread Zeming Yu
hi all,

I tried to write a UDF that handles null values:

def getMinutes(hString, minString):
if (hString != None) & (minString != None): return int(hString) * 60 +
int(minString[:-1])
else: return None

flight2 = (flight2.withColumn("duration_minutes",
udfGetMinutes("duration_h", "duration_m")))


but I got this error:

  File "", line 6, in getMinutes
TypeError: int() argument must be a string, a bytes-like object or a
number, not 'NoneType'


Does anyone know how to do this?


Thanks,

Zeming


Spark-SQL Query Optimization: overlapping ranges

2017-04-24 Thread Lavelle, Shawn
Hello Spark Users!

   Does the Spark Optimization engine reduce overlapping column ranges?  If so, 
should it push this down to a Data Source?

  Example,
This:  Select * from table where col between 3 and 7 OR col between 5 and 9
Reduces to:  Select * from table where col between 3 and 9


  Thanks for your insight!

~ Shawn M Lavelle



[cid:imagec067d4.GIF@a6621680.4e9e19c3]

Shawn Lavelle
Software Development

4101 Arrowhead Drive
Medina, Minnesota 55340-9457
Phone: 763 551 0559
Fax: 763 551 0750
Email: shawn.lave...@osii.com
Website: www.osii.com



Re: How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?

2017-04-24 Thread kant kodali
Thanks sam!


On Mon, Apr 24, 2017 at 1:50 AM, Sam Elamin  wrote:

> you have 2 options
> 1 )Clean ->Write your own parser to through each property and create a
> dataset
> 2) Hacky but simple -> Convert to json string then read in using
> spark.read.json(jsonString)
>
> Please bear in mind the second option is expensive which is why it is hacky
>
> I wrote my own parser here
> 
> which you can use to convert between JsonObjects to StructType schemas
>
> Regards
> Sam
>
>
> On Sun, Apr 23, 2017 at 7:50 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That
>> JsonObject is from Gson Library.
>>
>> Thanks!
>>
>
>


Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Michael Armbrust
Foreach runs on the executors and so is not able to modify an array list
that is only present on the driver.  You should just call collectAsList on
the DataFrame.

On Mon, Apr 24, 2017 at 10:36 AM, Devender Yadav <
devender.ya...@impetus.co.in> wrote:

> Hi All,
>
>
> I am using Spark 1.6.2 and Java 7.
>
>
> *Sample json* (total 100 records):
>
> {"name":"dev","salary":1,"occupation":"engg","address":"noida"}
>
> {"name":"karthik","salary":2,"occupation":"engg","address":"noida"}
>
> *Useful code:*
>
>final List> jsonData = new ArrayList<>();
>
>DataFrame df =  
> sqlContext.read().json("file:///home/dev/data-json/emp.json");
>JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();
>
>rdd.foreach(new VoidFunction() {
>@Override
>public void call(String line)  {
>try {
>jsonData.add (new ObjectMapper().readValue(line, Map.class));
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
>} catch (IOException e) {
>e.printStackTrace();
>}
>}
>});
>
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
>
> jsonData List is empty in the end.
>
>
> Output:
>
> Executor task launch worker-1List size: 1Executor task launch worker-1List 
> size: 2Executor task launch worker-1List size: 3...Executor task launch 
> worker-1List size: 100
>
> mainList size: 0
>
>
>
> Regards,
> Devender
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: Spark diclines mesos offers

2017-04-24 Thread Michael Gummelt
Have you run with debug logging?  There are some hints in the debug logs:
https://github.com/apache/spark/blob/branch-2.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L316

On Mon, Apr 24, 2017 at 4:53 AM, Pavel Plotnikov <
pavel.plotni...@team.wrike.com> wrote:

> Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in
> coarse-grained mode with dynamic resource allocation. And sometimes spark
> mesos scheduler declines mesos offers despite the fact that not all
> available resources were used (I have less workers than the possible
> maximum) and the maximum threshold in the spark configuration is not
> reached and the queue have lot of pending tasks.
>
> May be I have wrong spark or mesos configuration? Does anyone have the
> same problems?
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Devender Yadav
Hi Franke,


I want to convert DataFrame to JSON String.


Regards,
Devender

From: Jörn Franke 
Sent: Monday, April 24, 2017 11:15:08 PM
To: Devender Yadav
Cc: user@spark.apache.org
Subject: Re: Arraylist is empty after JavaRDD.foreach

I am not sure what you try to achieve here. You should never use the arraylist 
as you use it here as a global variable (an anti-pattern). Why don't you use 
the count function of the dataframe?

On 24. Apr 2017, at 19:36, Devender Yadav 
> wrote:


Hi All,


I am using Spark 1.6.2 and Java 7.


Sample json (total 100 records):

{"name":"dev","salary":1,"occupation":"engg","address":"noida"}

{"name":"karthik","salary":2,"occupation":"engg","address":"noida"}

Useful code:

   final List> jsonData = new ArrayList<>();

   DataFrame df =  
sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();

   rdd.foreach(new VoidFunction() {
   @Override
   public void call(String line)  {
   try {
   jsonData.add (new ObjectMapper().readValue(line, Map.class));
   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());
   } catch (IOException e) {
   e.printStackTrace();
   }
   }
   });

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData List is empty in the end.


Output:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Jörn Franke
I am not sure what you try to achieve here. You should never use the arraylist 
as you use it here as a global variable (an anti-pattern). Why don't you use 
the count function of the dataframe?

> On 24. Apr 2017, at 19:36, Devender Yadav  
> wrote:
> 
> Hi All,
> 
> 
> I am using Spark 1.6.2 and Java 7.
> 
> Sample json (total 100 records):
> 
> {"name":"dev","salary":1,"occupation":"engg","address":"noida"}
> 
> {"name":"karthik","salary":2,"occupation":"engg","address":"noida"}
> 
> Useful code:
> 
>final List> jsonData = new ArrayList<>();
> 
>DataFrame df =  
> sqlContext.read().json("file:///home/dev/data-json/emp.json");
>JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD(); 
> 
>rdd.foreach(new VoidFunction() {
>@Override
>public void call(String line)  {
>try {
>jsonData.add (new ObjectMapper().readValue(line, Map.class));
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
>} catch (IOException e) {
>e.printStackTrace();
>}
>}
>});
> 
>System.out.println(Thread.currentThread().getName());
>System.out.println("List size: "+jsonData.size());
> jsonData List is empty in the end. 
> 
> Output:
> 
> Executor task launch worker-1
> List size: 1
> Executor task launch worker-1
> List size: 2
> Executor task launch worker-1
> List size: 3
> .
> .
> .
> Executor task launch worker-1
> List size: 100
> 
> main
> List size: 0
> 
> 
> Regards,
> Devender
> 
> 
> 
> 
> 
> 
> 
> NOTE: This message may contain information that is confidential, proprietary, 
> privileged or otherwise protected by law. The message is intended solely for 
> the named addressee. If received in error, please destroy and notify the 
> sender. Any use of this email is prohibited when received in error. Impetus 
> does not represent, warrant and/or guarantee, that the integrity of this 
> communication has been maintained nor that the communication is free of 
> errors, virus, interception or interference.


How to convert DataFrame to JSON String in Java 7

2017-04-24 Thread Devender Yadav
Hi All,



How can I convert DataFrame to JSON String in Java 7. I am using Spark 1.6.3


I don't want to print on console. I need to return JSON return to another 
method.


Thanks for your attention!



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: community feedback on RedShift with Spark

2017-04-24 Thread Aakash Basu
Hey afshin,

Your point 1 is innumerably faster than the latter.

It further shoots up the speed if you know how to properly use distKey and
sortKey on the tables being loaded.

Thanks,
Aakash.
https://www.linkedin.com/in/aakash-basu-5278b363


On 24-Apr-2017 10:37 PM, "Afshin, Bardia" 
wrote:

I wanted to reach out to the community to get a understanding of what
everyones experience is in regardst to maximizing performance as in
decreasing load time on loading multiple large datasets to RedShift.



Two approaches:

1.   Spark writes file to S3, RedShift COPY INTO from S3 bucket.

2.   Spark directly writes results to RedShfit via JDBC



JDBC is known for poor performance, and RedShift (wihtout any provided
examples) claims you can speed up loading from s3 buckets via different
queues set up in your RedShift Workload Management.



What’s the communities experience with desiging processes which large
datasets are needed to be pushed into RedShfit and doing it in minimal time
taken to load the data to RedShift?

--

The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.


Arraylist is empty after JavaRDD.foreach

2017-04-24 Thread Devender Yadav
Hi All,


I am using Spark 1.6.2 and Java 7.


Sample json (total 100 records):

{"name":"dev","salary":1,"occupation":"engg","address":"noida"}

{"name":"karthik","salary":2,"occupation":"engg","address":"noida"}

Useful code:

   final List> jsonData = new ArrayList<>();

   DataFrame df =  
sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD rdd = df.repartition(1).toJSON().toJavaRDD();

   rdd.foreach(new VoidFunction() {
   @Override
   public void call(String line)  {
   try {
   jsonData.add (new ObjectMapper().readValue(line, Map.class));
   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());
   } catch (IOException e) {
   e.printStackTrace();
   }
   }
   });

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData List is empty in the end.


Output:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: community feedback on RedShift with Spark

2017-04-24 Thread Matt Deaver
Redshift COPY is immensely faster than trying to do insert statements. I
did some rough testing of inserting data using INSERT and COPY and COPY is
vastly superior to the point that if speed is at all an issue to your
process you shouldn't even consider using INSERT.

On Mon, Apr 24, 2017 at 11:07 AM, Afshin, Bardia <
bardia.afs...@capitalone.com> wrote:

> I wanted to reach out to the community to get a understanding of what
> everyones experience is in regardst to maximizing performance as in
> decreasing load time on loading multiple large datasets to RedShift.
>
>
>
> Two approaches:
>
> 1.   Spark writes file to S3, RedShift COPY INTO from S3 bucket.
>
> 2.   Spark directly writes results to RedShfit via JDBC
>
>
>
> JDBC is known for poor performance, and RedShift (wihtout any provided
> examples) claims you can speed up loading from s3 buckets via different
> queues set up in your RedShift Workload Management.
>
>
>
> What’s the communities experience with desiging processes which large
> datasets are needed to be pushed into RedShfit and doing it in minimal time
> taken to load the data to RedShift?
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


community feedback on RedShift with Spark

2017-04-24 Thread Afshin, Bardia
I wanted to reach out to the community to get a understanding of what everyones 
experience is in regardst to maximizing performance as in decreasing load time 
on loading multiple large datasets to RedShift.

Two approaches:

1.   Spark writes file to S3, RedShift COPY INTO from S3 bucket.

2.   Spark directly writes results to RedShfit via JDBC

JDBC is known for poor performance, and RedShift (wihtout any provided 
examples) claims you can speed up loading from s3 buckets via different queues 
set up in your RedShift Workload Management.

What’s the communities experience with desiging processes which large datasets 
are needed to be pushed into RedShfit and doing it in minimal time taken to 
load the data to RedShift?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


removing columns from file

2017-04-24 Thread Afshin, Bardia
Hi there,

I have a process that downloads thousands of files from s3 bucket, removes a 
set of columns from it, and upload it to s3.

S3 is currently not  the bottleneck, having a Single Master Node Spark instance 
is the bottleneck. One approach is to distribute the files on multiple Spark 
Master Node workers, that will make it faster.

Question:

1.   Is there a way to utilize master / slave node on Spark to distribute 
this downloading and processing of files – so it can say do 10 files at a time?

2.   Is there a way to scale workers with Spark downloading and processing 
files, even if they are all Single Master Node?

Thanks,
Bardia


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Ryan Blue
Looking at the code a bit more, it appears that blacklisting is disabled by
default. To enable it, set spark.blacklist.enabled=true.

The updates in 2.1.0 appear to provide much more fine-grained settings for
this, like the number of tasks that can fail before an executor is
blacklisted for a stage. In that version, you probably want to set
spark.blacklist.task.maxTaskAttemptsPerExecutor. See the settings docs
 and search for
“blacklist” to see all the options.

rb
​

On Mon, Apr 24, 2017 at 9:41 AM, Ryan Blue  wrote:

> Chawla,
>
> We hit this issue, too. I worked around it by setting spark.scheduler.
> executorTaskBlacklistTime=5000. The problem for us was that the scheduler
> was using locality to select the executor, even though it had already
> failed there. The executor task blacklist time controls how long the
> scheduler will avoid using an executor for a failed task, which will cause
> it to avoid rescheduling on the executor. The default was 0, so the
> executor was put back into consideration immediately.
>
> In 2.1.0 that setting has changed to spark.blacklist.timeout. I’m not
> sure if that does exactly the same thing. The default for that setting is
> 1h instead of 0. It’s better to have a non-zero default to avoid what
> you’re seeing.
>
> rb
> ​
>
> On Fri, Apr 21, 2017 at 1:32 PM, Chawla,Sumit 
> wrote:
>
>> I am seeing a strange issue. I had a bad behaving slave that failed the
>> entire job.  I have set spark.task.maxFailures to 8 for my job.  Seems
>> like all task retries happen on the same slave in case of failure.  My
>> expectation was that task will be retried on different slave in case of
>> failure, and chance of all 8 retries to happen on same slave is very less.
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Ryan Blue
Chawla,

We hit this issue, too. I worked around it by setting
spark.scheduler.executorTaskBlacklistTime=5000. The problem for us was that
the scheduler was using locality to select the executor, even though it had
already failed there. The executor task blacklist time controls how long
the scheduler will avoid using an executor for a failed task, which will
cause it to avoid rescheduling on the executor. The default was 0, so the
executor was put back into consideration immediately.

In 2.1.0 that setting has changed to spark.blacklist.timeout. I’m not sure
if that does exactly the same thing. The default for that setting is 1h
instead of 0. It’s better to have a non-zero default to avoid what you’re
seeing.

rb
​

On Fri, Apr 21, 2017 at 1:32 PM, Chawla,Sumit 
wrote:

> I am seeing a strange issue. I had a bad behaving slave that failed the
> entire job.  I have set spark.task.maxFailures to 8 for my job.  Seems
> like all task retries happen on the same slave in case of failure.  My
> expectation was that task will be retried on different slave in case of
> failure, and chance of all 8 retries to happen on same slave is very less.
>
>
> Regards
> Sumit Chawla
>
>


-- 
Ryan Blue
Software Engineer
Netflix


how to create List in pyspark

2017-04-24 Thread Selvam Raman
documentDF = spark.createDataFrame([

("Hi I heard about Spark".split(" "), ),

("I wish Java could use case classes".split(" "), ),

("Logistic regression models are neat".split(" "), )

], ["text"])


How can i achieve the same df while i am reading from source?

doc = spark.read.text("/Users/rs/Desktop/nohup.out")

how can i create array type with "sentences" column from
doc(dataframe)


The below one creates more than one column.

rdd.map(lambda rdd: rdd[0]).map(lambda row:row.split(" "))

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Afshin, Bardia
Is there a API available to do this via SparkSession?

Sent from my iPhone

On Apr 24, 2017, at 6:20 AM, Devender Yadav 
> wrote:


Thanks Hemanth for a quick reply.


From: Hemanth Gudela 
>
Sent: Monday, April 24, 2017 6:37:48 PM
To: Devender Yadav; user@spark.apache.org
Subject: Re: How to maintain order of key-value in DataFrame same as JSON?

Hi,

One option to use if you can is to force df to use the schema order you prefer 
like this.

DataFrame df = 
sqlContext.read().json(jsonPath).select("name","salary","occupation","address")

-Hemanth

From: Devender Yadav 
>
Date: Monday, 24 April 2017 at 15.45
To: "user@spark.apache.org" 
>
Subject: How to maintain order of key-value in DataFrame same as JSON?


{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: What is correct behavior for spark.task.maxFailures?

2017-04-24 Thread Dongjin Lee
Sumit,

I think the post below is describing the very case of you.

https://blog.cloudera.com/blog/2017/04/blacklisting-in-apache-spark/

Regards,
Dongjin

--
Dongjin Lee

Software developer in Line+.
So interested in massive-scale machine learning.

facebook: http://www.facebook.com/dongjin.lee.kr
linkedin: http://kr.linkedin.com/in/dongjinleekr
github: http://github.com/dongjinleekr
twitter: http://www.twitter.com/dongjinleekr

On 22 Apr 2017, 5:32 AM +0900, Chawla,Sumit , wrote:
> I am seeing a strange issue. I had a bad behaving slave that failed the 
> entire job. I have set spark.task.maxFailures to 8 for my job. Seems like all 
> task retries happen on the same slave in case of failure. My expectation was 
> that task will be retried on different slave in case of failure, and chance 
> of all 8 retries to happen on same slave is very less.
>
>
> Regards
> Sumit Chawla
>


Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-24 Thread vincent gromakowski
Look at Spark jobserver namedRDD that are supposed to be thread safe...

2017-04-24 16:01 GMT+02:00 Hemanth Gudela :

> Hello Gene,
>
>
>
> Thanks, but Alluxio did not solve my spark streaming use case because my
> source parquet files in Alluxio in-memory are not ”appended” but are
> periodically being ”overwritten” due to the nature of business need.
>
> Spark jobs fail when trying to read parquet files at the same time when
> other job is writing parquet files in Alluxio.
>
>
>
> Could you suggest a way to synchronize parquet reads and writes in Allxio
> in-memory. i.e. when one spark job is writing a dataframe as parquet file
> in alluxio in-memory, the other spark jobs trying to read must wait until
> the write is finished.
>
>
>
> Thanks,
>
> Hemanth
>
>
>
> *From: *Gene Pang 
> *Date: *Monday, 24 April 2017 at 16.41
> *To: *vincent gromakowski 
> *Cc: *Hemanth Gudela , "user@spark.apache.org"
> , Felix Cheung 
>
> *Subject: *Re: Spark SQL - Global Temporary View is not behaving as
> expected
>
>
>
> As Vincent mentioned, Alluxio helps with sharing data across different
> Spark contexts. This blog post about Spark dataframes and Alluxio
> discusses that use case
> .
>
>
>
> Thanks,
>
> Gene
>
>
>
> On Sat, Apr 22, 2017 at 2:14 AM, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
> Look at alluxio for sharing across drivers or spark jobserver
>
>
>
> Le 22 avr. 2017 10:24 AM, "Hemanth Gudela"  a
> écrit :
>
> Thanks for your reply.
>
>
>
> Creating a table is an option, but such approach slows down reads & writes
> for a real-time analytics streaming use case that I’m currently working on.
>
> If at all global temporary view could have been accessible across
> sessions/spark contexts, that would have simplified my usecase a lot.
>
>
>
> But yeah, thanks for explaining the behavior of global temporary view, now
> it’s clear J
>
>
>
> -Hemanth
>
>
>
> *From: *Felix Cheung 
> *Date: *Saturday, 22 April 2017 at 11.05
> *To: *Hemanth Gudela , "user@spark.apache.org"
> 
> *Subject: *Re: Spark SQL - Global Temporary View is not behaving as
> expected
>
>
>
> Cross session is this context is multiple spark sessions from the same
> spark context. Since you are running two shells, you are having different
> spark context.
>
>
>
> Do you have to you a temp view? Could you create a table?
>
>
>
> _
> From: Hemanth Gudela 
> Sent: Saturday, April 22, 2017 12:57 AM
> Subject: Spark SQL - Global Temporary View is not behaving as expected
> To: 
>
>
> Hi,
>
>
>
> According to documentation
> ,
> global temporary views are cross-session accessible.
>
>
>
> But when I try to query a global temporary view from another spark shell
> like thisà
>
> *Instance 1 of spark-shell*
>
> --
>
> scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")
>
>
>
> *Instance 2 of spark-shell *(while Instance 1 of spark-shell is still
> alive)
>
> -
>
> scala> spark.sql("select * from global_temp.gView1").show()
>
> org.apache.spark.sql.AnalysisException: Table or view not found:
> `global_temp`.`gView1`
>
> 'Project [*]
>
> +- 'UnresolvedRelation `global_temp`.`gView1`
>
>
>
> I am expecting that global temporary view created in shell 1 should be
> accessible in shell 2, but it isn’t!
>
> Please correct me if I missing something here.
>
>
>
> Thanks (in advance),
>
> Hemanth
>
>
>
>
>


Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-24 Thread Hemanth Gudela
Hello Gene,

Thanks, but Alluxio did not solve my spark streaming use case because my source 
parquet files in Alluxio in-memory are not ”appended” but are periodically 
being ”overwritten” due to the nature of business need.
Spark jobs fail when trying to read parquet files at the same time when other 
job is writing parquet files in Alluxio.

Could you suggest a way to synchronize parquet reads and writes in Allxio 
in-memory. i.e. when one spark job is writing a dataframe as parquet file in 
alluxio in-memory, the other spark jobs trying to read must wait until the 
write is finished.

Thanks,
Hemanth

From: Gene Pang 
Date: Monday, 24 April 2017 at 16.41
To: vincent gromakowski 
Cc: Hemanth Gudela , "user@spark.apache.org" 
, Felix Cheung 
Subject: Re: Spark SQL - Global Temporary View is not behaving as expected

As Vincent mentioned, Alluxio helps with sharing data across different Spark 
contexts. This blog post about Spark dataframes and Alluxio discusses that use 
case.

Thanks,
Gene

On Sat, Apr 22, 2017 at 2:14 AM, vincent gromakowski 
> wrote:
Look at alluxio for sharing across drivers or spark jobserver

Le 22 avr. 2017 10:24 AM, "Hemanth Gudela" 
> a écrit :
Thanks for your reply.

Creating a table is an option, but such approach slows down reads & writes for 
a real-time analytics streaming use case that I’m currently working on.
If at all global temporary view could have been accessible across 
sessions/spark contexts, that would have simplified my usecase a lot.

But yeah, thanks for explaining the behavior of global temporary view, now it’s 
clear ☺

-Hemanth

From: Felix Cheung >
Date: Saturday, 22 April 2017 at 11.05
To: Hemanth Gudela 
>, 
"user@spark.apache.org" 
>
Subject: Re: Spark SQL - Global Temporary View is not behaving as expected

Cross session is this context is multiple spark sessions from the same spark 
context. Since you are running two shells, you are having different spark 
context.

Do you have to you a temp view? Could you create a table?

_
From: Hemanth Gudela 
>
Sent: Saturday, April 22, 2017 12:57 AM
Subject: Spark SQL - Global Temporary View is not behaving as expected
To: >


Hi,

According to 
documentation,
 global temporary views are cross-session accessible.

But when I try to query a global temporary view from another spark shell like 
this-->
Instance 1 of spark-shell
--
scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")

Instance 2 of spark-shell (while Instance 1 of spark-shell is still alive)
-
scala> spark.sql("select * from global_temp.gView1").show()
org.apache.spark.sql.AnalysisException: Table or view not found: 
`global_temp`.`gView1`
'Project [*]
+- 'UnresolvedRelation `global_temp`.`gView1`

I am expecting that global temporary view created in shell 1 should be 
accessible in shell 2, but it isn’t!
Please correct me if I missing something here.

Thanks (in advance),
Hemanth




Re: Spark SQL - Global Temporary View is not behaving as expected

2017-04-24 Thread Gene Pang
As Vincent mentioned, Alluxio helps with sharing data across different
Spark contexts. This blog post about Spark dataframes and Alluxio discusses
that use case
.

Thanks,
Gene

On Sat, Apr 22, 2017 at 2:14 AM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Look at alluxio for sharing across drivers or spark jobserver
>
> Le 22 avr. 2017 10:24 AM, "Hemanth Gudela"  a
> écrit :
>
>> Thanks for your reply.
>>
>>
>>
>> Creating a table is an option, but such approach slows down reads &
>> writes for a real-time analytics streaming use case that I’m currently
>> working on.
>>
>> If at all global temporary view could have been accessible across
>> sessions/spark contexts, that would have simplified my usecase a lot.
>>
>>
>>
>> But yeah, thanks for explaining the behavior of global temporary view,
>> now it’s clear J
>>
>>
>>
>> -Hemanth
>>
>>
>>
>> *From: *Felix Cheung 
>> *Date: *Saturday, 22 April 2017 at 11.05
>> *To: *Hemanth Gudela , "user@spark.apache.org"
>> 
>> *Subject: *Re: Spark SQL - Global Temporary View is not behaving as
>> expected
>>
>>
>>
>> Cross session is this context is multiple spark sessions from the same
>> spark context. Since you are running two shells, you are having different
>> spark context.
>>
>>
>>
>> Do you have to you a temp view? Could you create a table?
>>
>>
>>
>> _
>> From: Hemanth Gudela 
>> Sent: Saturday, April 22, 2017 12:57 AM
>> Subject: Spark SQL - Global Temporary View is not behaving as expected
>> To: 
>>
>>
>>
>> Hi,
>>
>>
>>
>> According to documentation
>> ,
>> global temporary views are cross-session accessible.
>>
>>
>>
>> But when I try to query a global temporary view from another spark shell
>> like thisà
>>
>> *Instance 1 of spark-shell*
>>
>> --
>>
>> scala> spark.sql("select 1 as col1").createGlobalTempView("gView1")
>>
>>
>>
>> *Instance 2 of spark-shell *(while Instance 1 of spark-shell is still
>> alive)
>>
>> -
>>
>> scala> spark.sql("select * from global_temp.gView1").show()
>>
>> org.apache.spark.sql.AnalysisException: Table or view not found:
>> `global_temp`.`gView1`
>>
>> 'Project [*]
>>
>> +- 'UnresolvedRelation `global_temp`.`gView1`
>>
>>
>>
>> I am expecting that global temporary view created in shell 1 should be
>> accessible in shell 2, but it isn’t!
>>
>> Please correct me if I missing something here.
>>
>>
>>
>> Thanks (in advance),
>>
>> Hemanth
>>
>>
>>
>


Re: How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Devender Yadav
Thanks Hemanth for a quick reply.


From: Hemanth Gudela 
Sent: Monday, April 24, 2017 6:37:48 PM
To: Devender Yadav; user@spark.apache.org
Subject: Re: How to maintain order of key-value in DataFrame same as JSON?

Hi,

One option to use if you can is to force df to use the schema order you prefer 
like this.

DataFrame df = 
sqlContext.read().json(jsonPath).select("name","salary","occupation","address")

-Hemanth

From: Devender Yadav 
Date: Monday, 24 April 2017 at 15.45
To: "user@spark.apache.org" 
Subject: How to maintain order of key-value in DataFrame same as JSON?


{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Re: How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Hemanth Gudela
Hi,

One option to use if you can is to force df to use the schema order you prefer 
like this.

DataFrame df = 
sqlContext.read().json(jsonPath).select("name","salary","occupation","address")

-Hemanth

From: Devender Yadav 
Date: Monday, 24 April 2017 at 15.45
To: "user@spark.apache.org" 
Subject: How to maintain order of key-value in DataFrame same as JSON?


{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}


How to maintain order of key-value in DataFrame same as JSON?

2017-04-24 Thread Devender Yadav
Hi All,


Sample JSON data:

{"name": "dev","salary": 100,"occupation": "engg","address": "noida"}

{"name": "karthik","salary": 200,"occupation": "engg","address": "blore"}

Spark Java code:

DataFrame df = sqlContext.read().json(jsonPath);
df.printSchema();
df.show(false);


Output:

root
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- salary: long (nullable = true)


+---+---+--+--+
|address|name   |occupation|salary|
+---+---+--+--+
|noida  |dev|engg  |1 |
|blore  |karthik|engg  |2 |
+---+---+--+--+


Columns are arranged in the alphabetical order.


Is there any way to maintain natural order?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Spark registered view in "Future" - View changes updated in "Future" are lost in main thread

2017-04-24 Thread Hemanth Gudela
Hi,

I’m trying to write a background thread using “Future” which would periodically 
re-register a view with latest data from underlying database table.
However, the data changes updated in “Future” thread are lost in main thread.

In the below code,

1.   In the beginning, registered view “myView” has only 1 row (1, ‘a’), 
that is shown as first output

+---+-+

|id |value|

+---+-+

|1  |a|

+---+-+

2.   After a minute, a background “Future” thread inserts a new row (1, 
‘b’) in the database, and then re-registers “myView” with latest updates from 
underlying table.

a.   The second output clearly shows that “myView” in the “Future” has 2 
rows
  +---+-+

  |id |value|

  +---+-+

  |1  |a|

  |2  |b|

  +---+-+



3.   After 2 minutes, when I query “myView” in the main thread, it doesn’t 
show newly added row (1, ‘b”) even though “myView” has picked up the changes in 
“Future” thread. As you can observe, the third output shows only one row (1, 
‘a’) again!

+---+-+

|id |value|

+---+-+

|1  |a|

+---+-+

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val url = "jdbc:mysql://localhost:3306/myDb?sessionVariables=sql_mode='ANSI'" 
//mysql jdbc url

//set database properties
val dbProperties = new java.util.Properties
dbProperties.setProperty("user","myUser")
dbProperties.setProperty("password","myPass")

//read a database table, register a temp view and cache it
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") 
 //register a temp view named "myView"
spark.sql("cache table myView")
spark.sql("select * from myView").show //in the beginning, myView has just 1 
record
/*
+---+-+
|id |value|
+---+-+
|1  |a|
+---+-+
*/

Future { //in a background thread, insert a new row in database table, 
re-register temp view and refresh the cache
  Thread.sleep(1000*60) //(not necessary but) wait for a minute

  spark.sql("select 2 as id, 'b' as 
value").write.mode("append").jdbc(url,"myView",dbProperties)
  
spark.read.jdbc(url,"testTable",dbProperties).createOrReplaceTempView("myView") 
//re-register "myView"
  spark.sql("cache table myView")//refresh cache of myView again
  spark.sql("select * from myView").show  //myView now has 2 records
  /*
  +---+-+
  |id |value|
  +---+-+
  |1  |a|
  |2  |b|
  +---+-+
  */
}
Thread.sleep(1000*60*2) //wait for 2 minutes
spark.sql("select * from myView").show //Why is myView having only 1 record!?!
/*
+---+-+
|id |value|
+---+-+
|1  |a|
+---+-+
*/

I have assumed that a temp view registered in “Future” thread is thread local, 
but that doesn’t seem to be the case always.
When the data source is a database table, the data changes updated in a 
registered view “Future” are lost in main thread. However, when the data source 
is parquet, the changes updated in a registered view sustain even in the main 
thread.

Could you please throw some light on what’s happening in the behavior of 
registered view when the data source is database, and why the behavior is 
different when data source is parquet.

Thank you (in advance ☺)
Hemanth





Spark diclines mesos offers

2017-04-24 Thread Pavel Plotnikov
Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in
coarse-grained mode with dynamic resource allocation. And sometimes spark
mesos scheduler declines mesos offers despite the fact that not all
available resources were used (I have less workers than the possible
maximum) and the maximum threshold in the spark configuration is not
reached and the queue have lot of pending tasks.

May be I have wrong spark or mesos configuration? Does anyone have the same
problems?


Re: Questions related to writing data to S3

2017-04-24 Thread Steve Loughran

On 23 Apr 2017, at 19:49, Richard Hanson 
> wrote:


I have a streaming job which writes data to S3. I know there are saveAs 
functions helping write data to S3. But it bundles all elements then writes out 
to S3.

use Hadoop 2.8.x binaries and the fast output stream; this will stream up data 
in blocks of 5+MB (configurable), so eleminating/reducing the upload delay in 
the close(), and saving on disk space.

however, your new object isn't going to be visible until that close() call, and 
with the FS being eventually consistent, the list operation often visibly lags 
the actual object creation (or deletions, for that matter)



So my first question - Is there any way to let saveAs functions write data 
in batch or single elements instead of whole bundle?

Right now I use S3 TransferManager to upload files in batch. The code looks 
like below (sorry I don't have code at hand)

...

val manager = // initialize TransferManager...

stream.foreachRDD { rdd =>

  val elements = rdd.collect

  manager.upload...(elemnts)

}

...


I suppose there would have problem here because TransferManager instance is at 
driver program (Now the job is working that may be because I run spark as a 
single process). And checking on the internet, seemingly it is recommended to 
use foreachPartition instead, and prevent using function that cause actions 
such as rdd.collect. So another questions: what is the best practice regarding 
to this scenario (batch upload transformed data to external storage such as 
S3)? And what functions would cause 'action' to be triggered (like data to be 
sent back to driver program)?


once you've moved to the Hadoop 2.8 s3a client, you can just use save(path) on 
the dataframe to have it all done. S3A also manages sharing the transfer 
manager across all the workers in a process...it's tricker than you think as 
you want to share the available upload bandwidth while giving some B/W to all 
threads generating output...more than one thread pool is used to handle this 
(see HADOOP-13286 for an example).

getting those Hadoop 2.8.x binaries in is a bit tricky, because of transitive 
classpath pain; the SPARK-7481 patch shows how I do it


Re: Spark Mlib - java.lang.OutOfMemoryError: Java heap space

2017-04-24 Thread Selvam Raman
This is where job going out of memory

17/04/24 10:09:22 INFO TaskSetManager: Finished task 122.0 in stage 1.0
(TID 356) in 4260 ms on ip-...-45.dev (124/234)
17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_361 on
ip-10...-185.dev:36974 in memory (size: 5.2 MB, free: 8.5 GB)
17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_362 on
ip-...-45.dev:40963 in memory (size: 5.2 MB, free: 8.9 GB)
17/04/24 10:09:26 INFO TaskSetManager: Finished task 125.0 in stage 1.0
(TID 359) in 4383 ms on ip-...-45.dev (125/234)
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 15090"...
Killed

Node-45.dev contains 8.9GB free while it throws out of memory. Can anyone
please help me to understand the issue?

On Mon, Apr 24, 2017 at 11:22 AM, Selvam Raman  wrote:

> Hi,
>
> I have 1 master and 4 slave node. Input data size is 14GB.
> Slave Node config : 32GB Ram,16 core
>
>
> I am trying to train word embedding model using spark. It is going out of
> memory. To train 14GB of data how much memory do i require?.
>
>
> I have givem 20gb per executor but below shows it is using 11.8GB out of
> 20 GB.
> BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035
> (size: 4.6 KB, free: 11.8 GB)
>
>
> This is the code
> if __name__ == "__main__":
> sc = SparkContext(appName="Word2VecExample")  # SparkContext
>
> # $example on$
> inp = sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda
> row: row.split(" "))
>
> word2vec = Word2Vec()
> model = word2vec.fit(inp)
>
> model.save(sc, "s3://pysparkml/word2vecresult2/")
> sc.stop()
>
>
> Spark-submit Command:
> spark-submit --master yarn --conf 
> 'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal
> -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
> -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4
> --executor-cores 2 --executor-memory 20g Word2VecExample.py
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: splitting a huge file

2017-04-24 Thread Steve Loughran

> On 21 Apr 2017, at 19:36, Paul Tremblay  wrote:
> 
> We are tasked with loading a big file (possibly 2TB) into a data warehouse. 
> In order to do this efficiently, we need to split the file into smaller files.
> 
> I don't believe there is a way to do this with Spark, because in order for 
> Spark to distribute the file to the worker nodes, it first has to be split 
> up, right? 

if it is in HDFS, it's already been broken up by block size and scattered 
around the filesystem, so probably split up by 128/256MB blocks, 3x replicated 
each, offering lots of places for local data.

If its in another FS, different strategies may apply, including no lo

> 
> We ended up using a single machine with a single thread to do the splitting. 
> I just want to make sure I am not missing something obvious.
> 

you don't explicitly need to split up the file if you can run different workers 
against different parts of the same file, which means you need to split it up,

This is what org.apache.hadoop.mapreduce.InputFormat.getSplits() does: you will 
need to define an input format for your data source, and provide the split 
calculation

> Thanks!
> 
> -- 
> Paul Henry Tremblay
> Attunix


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Mlib - java.lang.OutOfMemoryError: Java heap space

2017-04-24 Thread Selvam Raman
Hi,

I have 1 master and 4 slave node. Input data size is 14GB.
Slave Node config : 32GB Ram,16 core


I am trying to train word embedding model using spark. It is going out of
memory. To train 14GB of data how much memory do i require?.


I have givem 20gb per executor but below shows it is using 11.8GB out of 20
GB.
BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035
(size: 4.6 KB, free: 11.8 GB)


This is the code
if __name__ == "__main__":
sc = SparkContext(appName="Word2VecExample")  # SparkContext

# $example on$
inp =
sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda row:
row.split(" "))

word2vec = Word2Vec()
model = word2vec.fit(inp)

model.save(sc, "s3://pysparkml/word2vecresult2/")
sc.stop()


Spark-submit Command:
spark-submit --master yarn --conf
'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal
-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4
--executor-cores 2 --executor-memory 20g Word2VecExample.py


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Cannot convert from JavaRDD to Dataframe

2017-04-24 Thread Radhwane Chebaane
Hi,

DataTypes is a Scala Array which corresponds in Java to Java Array. So you
must use a String[]. However since RowFactory.create expects an array of
Object as Columns content, it should be:

   public Row call(String line){
  return RowFactory.create(new String[][]{line.split(" ")});
   }

More details in this Stackoverflow question

.
Hope this works for you,

Cheers

2017-04-23 18:13 GMT+02:00 Chen, Mingrui :

> Hello everyone!
>
>
> I am a new Spark learner and trying to do a task seems very simple. I want
> to read a text file, save the content to JavaRDD and convert it to
> Dataframe, so I can use it for Word2Vec Model in the future. The code looks
> pretty simple but I cannot make it work:
>
>
> SparkSession spark = SparkSession.builder().appName("Word2Vec").
> getOrCreate();
> JavaRDD lines = spark.sparkContext().textFile("input.txt",
> 10).toJavaRDD();
> JavaRDD rows = lines.map(new Function(){
> public Row call(String line){
> return RowFactory.create(Arrays.asList(line.split(" ")));
> }
> });
> StructType schema = new StructType(new StructField[] {
> new StructField("text", new ArrayType(DataTypes.StringType, true), false,
> Metadata.empty())
> });
> Dataset input = spark.createDataFrame(rows, schema);
> input.show(3);
>
> It throws an exception at input.show(3):
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of
> scala.collection.immutable.List$SerializationProxy to field
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
> scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
>
> Seems it has problem converting the JavaRDD to Dataframe. However I
> cannot figure out what mistake I make here and the exception message is
> hard to understand. Anyone can help? Thanks!
>
>


-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  
Mobile: +33 695 588 906 <+33+695+588+906>

Skype: rad.cheb  
LinkedIn 



Re: Off heap memory settings and Tungsten

2017-04-24 Thread Saisai Shao
AFAIK, I don't think the off-heap memory settings is enabled automatically,
there're two configurations control the tungsten off-heap memory usage:

1. spark.memory.offHeap.enabled.
2. spark.memory.offHeap.size.



On Sat, Apr 22, 2017 at 7:44 PM, geoHeil  wrote:

> Hi,
> I wonder when to enable spark's off heap settings. Shouldn't tungsten
> enable
> these automatically in 2.1?
> http://stackoverflow.com/questions/43330902/spark-off-
> heap-memory-config-and-tungsten
>
> Regards,
> Georg
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Off-heap-memory-settings-and-Tungsten-tp28621.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?

2017-04-24 Thread Sam Elamin
you have 2 options
1 )Clean ->Write your own parser to through each property and create a
dataset
2) Hacky but simple -> Convert to json string then read in using
spark.read.json(jsonString)

Please bear in mind the second option is expensive which is why it is hacky

I wrote my own parser here

which you can use to convert between JsonObjects to StructType schemas

Regards
Sam


On Sun, Apr 23, 2017 at 7:50 PM, kant kodali  wrote:

> Hi All,
>
> How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That
> JsonObject is from Gson Library.
>
> Thanks!
>


How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?

2017-04-24 Thread kant kodali
Hi All,

How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That
JsonObject is from Gson Library.

Thanks!


accessing type signature

2017-04-24 Thread Bulldog20630405
When running spark from spark-shell, when each defined variable created the
shell prints out the type signature of that variable along with the
toString of the instance.

how can i programmatically generated the same signature without using the
shell (for debugging purposes) from a spark script or class?

example code run in spark shell (see bold output below)



code:


val data = Array("one", "two", "three", "two", "three", "three")
val dataRdd = sc.parallelize(data)
val dataTupleRdd =  dataRdd.map(word => (word, 1))
val countsRdd = dataTupleRdd.reduceByKey(_ + _)
countsRdd.foreach(println)




code run in spark shell (see bold output below: i want to generate that
from the api)




Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val data = Array("one", "two", "three", "two", "three", "three")
*data: Array[String]* = Array(one, two, three, two, three, three)

scala> val dataRdd = sc.parallelize(data)
*dataRdd: org.apache.spark.rdd.RDD[String] *= ParallelCollectionRDD[0] at
parallelize at :26

scala> val dataTupleRdd =  dataRdd.map(word => (word, 1))
*dataTupleRdd: org.apache.spark.rdd.RDD[(String, Int)] *=
MapPartitionsRDD[1] at map at :28

scala> val countsRdd = dataTupleRdd.reduceByKey(_ + _)
*countsRdd: org.apache.spark.rdd.RDD[(String, Int)]* = ShuffledRDD[2] at
reduceByKey at :30

scala> countsRdd.foreach(println)
(two,2)
(one,1)
(three,3)


Authorizations in thriftserver

2017-04-24 Thread vincent gromakowski
Hi,
Can someone confirm authorizations aren't implemented in Spark thriftserver
for SQL standard based hive authorizations?
https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization
If confirmed, any plan to implement it ?
Thanks


Re: Spark Testing Library Discussion

2017-04-24 Thread Holden Karau
The (tentative) link for those interested is
https://hangouts.google.com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue .

On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau  wrote:

> So 14 people have said they are available on Tuesday the 25th at 1PM
> pacific so we will do this meeting then ( https://doodle.com/poll/
> 69y6yab4pyf7u8bn ).
>
> Since hangouts tends to work ok on the Linux distro I'm running my default
> is to host this as a "hangouts-on-air" unless there are alternative ideas.
>
> I'll record the hangout and if it isn't terrible I'll post it for those
> who weren't able to make it (and for next time I'll include more European
> friendly time options - Doodle wouldn't let me update it once posted).
>
> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau 
> wrote:
>
>> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>>
>> Awhile back on one of the many threads about testing in Spark there was
>> some interest in having a chat about the state of Spark testing and what
>> people want/need.
>>
>> So if you are interested in joining an online (with maybe an IRL
>> component if enough people are SF based) chat about Spark testing please
>> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn
>>
>> I think reasonable topics of discussion could be:
>>
>> 1) What is the state of the different Spark testing libraries in the
>> different core (Scala, Python, R, Java) and extended languages (C#,
>> Javascript, etc.)?
>> 2) How do we make these more easily discovered by users?
>> 3) What are people looking for in their testing libraries that we are
>> missing? (can be functionality, documentation, etc.)
>> 4) Are there any examples of well tested open source Spark projects and
>> where are they?
>>
>> If you have other topics that's awesome.
>>
>> To clarify this about libraries and best practices for people testing
>> their Spark applications, and less about testing Spark's internals
>> (although as illustrated by some of the libraries there is some strong
>> overlap in what is required to make that work).
>>
>> Cheers,
>>
>> Holden :)
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Spark Testing Library Discussion

2017-04-24 Thread Holden Karau
So 14 people have said they are available on Tuesday the 25th at 1PM
pacific so we will do this meeting then (
https://doodle.com/poll/69y6yab4pyf7u8bn ).

Since hangouts tends to work ok on the Linux distro I'm running my default
is to host this as a "hangouts-on-air" unless there are alternative ideas.

I'll record the hangout and if it isn't terrible I'll post it for those who
weren't able to make it (and for next time I'll include more European
friendly time options - Doodle wouldn't let me update it once posted).

On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau  wrote:

> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>
> Awhile back on one of the many threads about testing in Spark there was
> some interest in having a chat about the state of Spark testing and what
> people want/need.
>
> So if you are interested in joining an online (with maybe an IRL component
> if enough people are SF based) chat about Spark testing please fill out
> this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn
>
> I think reasonable topics of discussion could be:
>
> 1) What is the state of the different Spark testing libraries in the
> different core (Scala, Python, R, Java) and extended languages (C#,
> Javascript, etc.)?
> 2) How do we make these more easily discovered by users?
> 3) What are people looking for in their testing libraries that we are
> missing? (can be functionality, documentation, etc.)
> 4) Are there any examples of well tested open source Spark projects and
> where are they?
>
> If you have other topics that's awesome.
>
> To clarify this about libraries and best practices for people testing
> their Spark applications, and less about testing Spark's internals
> (although as illustrated by some of the libraries there is some strong
> overlap in what is required to make that work).
>
> Cheers,
>
> Holden :)
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau