Re: How to start spark streaming application with recent past timestamp for replay of old batches?

2016-02-21 Thread Akhil Das
On Mon, Feb 22, 2016 at 12:18 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Folks,
>
>
>
> I am exploring spark for streaming from two sources (a) Kinesis and (b)
> HDFS for some of our use-cases. Since we maintain state gathered over last
> x hours in spark streaming, we would like to replay the data from last x
> hours as batches during deployment. I have gone through the Spark APIs but
> could not find anything that initiates with older timestamp. Appreciate
> your input on the same.
>
> 1.  Restarting with check-pointing runs the batches faster for missed
> timestamp period, but when we upgrade with new code, the same checkpoint
> directory cannot be reused.
>
​=> It is true that you won't be able to use the checkpoint when you
upgrade your code, the production codes are not upgraded every now and
then. You can basically create a configuration file in which you can put
most of then stuffs (like streaming duration, parameters etc) instead of
updating them in the code and breaking the checkpoint. ​


> 2.  For the case with kinesis as source, we can change the last
> checked sequence number in DynamoDB to get the data from last x hours, but
> this will be one large bunch of data for first restarted batch. So the data
> is not processed as natural multiple batches inside spark.
>
​=> The kinesis API has a way to limit the data rate, you might want to
look into that and implement a custom receiver for your use-case.​

​http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html​

> 3.  For the source from HDFS, I could not find any alternative to
> start the streaming from old timestamp data, unless I manually (or with
> script) rename the old files after starting the stream. (This workaround
> leads to other complications too further).
>
​=> What you can do is, once the files are processed, you can move them to
a different directory and when you restart the stream for whatever reason,
you can make it pick all the files instead of the latest ones (by passing
the *newFilesOnly* boolean param)​


> 4.  May I know how is the zero data loss is achieved while having
> hdfs as source? i.e. if the driver fails while processing a micro batch,
> what happens when the application is restarted? Is the same micro-batch
> reprocessed?
>
​=> Yes, If the application is restarted then the micro-batch will be
reprocessed.​


>
>
> Regards
>
> Ashok
>


[Example] : read custom schema from file

2016-02-21 Thread Divya Gehlot
Hi,
Can anybody help me by providing  me example how can we read schema of the
data set from the file.



Thanks,
Divya


Re: [Please Help] Log redirection on EMR

2016-02-21 Thread Sabarish Sasidharan
Your logs are getting archived in your logs bucket in S3.

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-debugging.html

Regards
Sab

On Mon, Feb 22, 2016 at 12:14 PM, HARSH TAKKAR 
wrote:

> Hi
>
> In am using an EMR cluster  for running my spark jobs, but after the job
> finishes logs disappear,
>
> I have added a log4j.properties in my jar, but all the logs still
> redirects to EMR resource manager which vanishes after jobs completes, is
> there a way i could redirect the logs to a location in file  syatem, I am
> working on price points and its very critical for me to maintain logs.
>
> Just to add i get following error when my application starts.
>
> java.io.FileNotFoundException: /etc/spark/conf/log4j.properties (No such file 
> or directory)
>   at java.io.FileInputStream.open(Native Method)
>   at java.io.FileInputStream.(FileInputStream.java:146)
>   at java.io.FileInputStream.(FileInputStream.java:101)
>   at 
> sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
>   at 
> sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
>   at 
> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
>   at 
> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
>   at org.apache.log4j.LogManager.(LogManager.java:127)
>   at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
>   at 
> org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
>   at org.apache.spark.Logging$class.log(Logging.scala:51)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:607)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:621)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


How to start spark streaming application with recent past timestamp for replay of old batches?

2016-02-21 Thread ashokkumar rajendran
Hi Folks,



I am exploring spark for streaming from two sources (a) Kinesis and (b)
HDFS for some of our use-cases. Since we maintain state gathered over last
x hours in spark streaming, we would like to replay the data from last x
hours as batches during deployment. I have gone through the Spark APIs but
could not find anything that initiates with older timestamp. Appreciate
your input on the same.

1.  Restarting with check-pointing runs the batches faster for missed
timestamp period, but when we upgrade with new code, the same checkpoint
directory cannot be reused.

2.  For the case with kinesis as source, we can change the last checked
sequence number in DynamoDB to get the data from last x hours, but this
will be one large bunch of data for first restarted batch. So the data is
not processed as natural multiple batches inside spark.

3.  For the source from HDFS, I could not find any alternative to start
the streaming from old timestamp data, unless I manually (or with script)
rename the old files after starting the stream. (This workaround leads to
other complications too further).

4.  May I know how is the zero data loss is achieved while having hdfs
as source? i.e. if the driver fails while processing a micro batch, what
happens when the application is restarted? Is the same micro-batch
reprocessed?



Regards

Ashok


[Please Help] Log redirection on EMR

2016-02-21 Thread HARSH TAKKAR
Hi

In am using an EMR cluster  for running my spark jobs, but after the job
finishes logs disappear,

I have added a log4j.properties in my jar, but all the logs still redirects
to EMR resource manager which vanishes after jobs completes, is there a way
i could redirect the logs to a location in file  syatem, I am working on
price points and its very critical for me to maintain logs.

Just to add i get following error when my application starts.

java.io.FileNotFoundException: /etc/spark/conf/log4j.properties (No
such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at java.io.FileInputStream.(FileInputStream.java:101)
at 
sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
at 
sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
at 
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
at 
org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:607)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:621)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


Re: Accessing Web UI

2016-02-21 Thread Vasanth Bhat
Thanks Gourav, Eduardo

I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/  .  Both
cases the forefox just hangs.

Also I tried with lynx text based browser.   I get the message  "HTTP
request sent; waiting for response."  and it hangs as well.

Is there way to enable debug logs in spark master service, to understand
what's going wrong?


Thanks
Vasanth


On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta 
wrote:

> can you please try localhost:8080?
>
> Regards,
> Gourav Sengupta
>
> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>
>> Hi,
>>
>>I have installed the spark1.6 and  trying to start the master
>> (start-master.sh) and access the webUI.
>>
>> I get the following logs on running the start-master.sh
>>
>> Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp
>>
>> /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
>> -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA --port
>> 7077 --webui-port 8080
>> 
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 16/02/19 03:07:30 INFO Master: Registered signal handlers for [TERM, HUP,
>> INT]
>> 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to: sluser
>> 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to: sluser
>> 16/02/19 03:07:31 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(sluser);
>> users
>> with modify permissions: Set(sluser)
>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'sparkMaster'
>> on
>> port 7077.
>> 16/02/19 03:07:32 INFO Master: Starting Spark master at
>> spark://OAhtvJ5MCA:7077
>> 16/02/19 03:07:32 INFO Master: Running Spark version 1.6.0
>> 16/02/19 03:07:32 WARN AbstractConnector: insufficient threads configured
>> for SelectChannelConnector@0.0.0.0:8080
>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI' on
>> port 8080.
>> 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
>> http://127.0.0.1:8080
>> 16/02/19 03:07:32 WARN AbstractConnector: insufficient threads configured
>> for SelectChannelConnector@OAhtvJ5MCA:6066
>> 16/02/19 03:07:32 INFO Utils: Successfully started service on port 6066.
>> 16/02/19 03:07:32 INFO StandaloneRestServer: Started REST server for
>> submitting applications on port 6066
>> 16/02/19 03:07:33 INFO Master: I have been elected leader! New state:
>> ALIVE
>>
>> --
>>  Through netstat I can see that port 8080 is Listening
>>  Now when I start firefox and access http://127.0.0.1:8080  ,  firefox
>> just
>> hangs with the message
>>
>> Waiting for "127.0.0.1"   and  does not connect to UI.
>>
>>  How do I enable debug for the spark master daemon, to understand what's
>> happening.
>>
>> Thanks
>> Vasanth
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Accessing-Web-UI-tp23029p26276.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Error :Type mismatch error when passing hdfs file path to spark-csv load method

2016-02-21 Thread Jonathan Kelly
On the line preceding the one that the compiler is complaining about (which
doesn't actually have a problem in itself), you declare df as
"df"+fileName, making it a string. Then you try to assign a DataFrame to
df, but it's already a string. I don't quite understand your intent with
that previous line, but I'm guessing you didn't mean to assign a string to
df.

~ Jonathan
On Sun, Feb 21, 2016 at 8:45 PM Divya Gehlot 
wrote:

> Hi,
> I am trying to dynamically create Dataframe by reading subdirectories
> under parent directory
>
> My code looks like
>
>> import org.apache.spark._
>> import org.apache.spark.sql._
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>> val hdfsConn = org.apache.hadoop.fs.FileSystem.get(new
>> java.net.URI("hdfs://xxx.xx.xx.xxx:8020"), hadoopConf)
>> hdfsConn.listStatus(new
>> org.apache.hadoop.fs.Path("/TestDivya/Spark/ParentDir/")).foreach{
>> fileStatus =>
>>val filePathName = fileStatus.getPath().toString()
>>val fileName = fileStatus.getPath().getName().toLowerCase()
>>var df =  "df"+fileName
>>df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").option("inferSchema", "true").load(filePathName)
>> }
>
>
> getting below error
>
>> :35: error: type mismatch;
>>  found   : org.apache.spark.sql.DataFrame
>>  required: String
>>  df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").option("inferSchema", "true").load(filePathName)
>
>
> Am I missing something ?
>
> Would really appreciate the help .
>
>
> Thanks,
> Divya
>
>


Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-21 Thread @Sanjiv Singh
Compaction would have been triggered automatically as following properties
already set in *hive-site.xml*. and also *NO_AUTO_COMPACTION* property not
been set for these tables.




  hive.compactor.initiator.on

  true





  hive.compactor.worker.threads

  1




Documentation is upset sometimes.




Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Mon, Feb 22, 2016 at 9:49 AM, Varadharajan Mukundan  wrote:

> Yes, I was burned down by this issue couple of weeks back. This also means
> that after every insert job, compaction should be run to access new rows
> from Spark. Sad that this issue is not documented / mentioned anywhere.
>
> On Mon, Feb 22, 2016 at 9:27 AM, @Sanjiv Singh 
> wrote:
>
>> Hi Varadharajan,
>>
>> Thanks for your response.
>>
>> Yes it is transnational table; See below *show create table. *
>>
>> Table hardly have 3 records , and after triggering minor compaction on
>> tables , it start showing results on spark SQL.
>>
>>
>> > *ALTER TABLE hivespark COMPACT 'major';*
>>
>>
>> > *show create table hivespark;*
>>
>>   CREATE TABLE `hivespark`(
>>
>> `id` int,
>>
>> `name` string)
>>
>>   CLUSTERED BY (
>>
>> id)
>>
>>   INTO 32 BUCKETS
>>
>>   ROW FORMAT SERDE
>>
>> 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
>>
>>   STORED AS INPUTFORMAT
>>
>> 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
>>
>>   OUTPUTFORMAT
>>
>> 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
>>
>>   LOCATION
>>
>> 'hdfs://myhost:8020/apps/hive/warehouse/mydb.db/hivespark'
>>   TBLPROPERTIES (
>>
>> 'COLUMN_STATS_ACCURATE'='true',
>>
>> 'last_modified_by'='root',
>>
>> 'last_modified_time'='1455859079',
>>
>> 'numFiles'='37',
>>
>> 'numRows'='3',
>>
>> 'rawDataSize'='0',
>>
>> 'totalSize'='11383',
>>
>> 'transactional'='true',
>>
>> 'transient_lastDdlTime'='1455864121') ;
>>
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>> On Mon, Feb 22, 2016 at 9:01 AM, Varadharajan Mukundan <
>> srinath...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Is the transaction attribute set on your table? I observed that hive
>>> transaction storage structure do not work with spark yet. You can confirm
>>> this by looking at the transactional attribute in the output of "desc
>>> extended " in hive console.
>>>
>>> If you'd need to access transactional table, consider doing a major
>>> compaction and then try accessing the tables
>>>
>>> On Mon, Feb 22, 2016 at 8:57 AM, @Sanjiv Singh 
>>> wrote:
>>>
 Hi,


 I have observed that Spark SQL is not returning records for hive
 bucketed ORC tables on HDP.



 On spark SQL , I am able to list all tables , but queries on hive
 bucketed tables are not returning records.

 I have also tried the same for non-bucketed hive tables. it is working
 fine.



 Same is working on plain Apache setup.

 Let me know if needs other details.

 Regards
 Sanjiv Singh
 Mob :  +091 9990-447-339

>>>
>>>
>>>
>>> --
>>> Thanks,
>>> M. Varadharajan
>>>
>>> 
>>>
>>> "Experience is what you get when you didn't get what you wanted"
>>>-By Prof. Randy Pausch in "The Last Lecture"
>>>
>>> My Journal :- http://varadharajan.in
>>>
>>
>>
>
>
> --
> Thanks,
> M. Varadharajan
>
> 
>
> "Experience is what you get when you didn't get what you wanted"
>-By Prof. Randy Pausch in "The Last Lecture"
>
> My Journal :- http://varadharajan.in
>


Re: Specify number of executors in standalone cluster mode

2016-02-21 Thread Hemant Bhanawat
Max number of cores per executor can be controlled using
spark.executor.cores. And maximum number of executors on a single worker
can be determined by environment variable: SPARK_WORKER_INSTANCES.

However, to ensure that all available cores are used, you will have to take
care of how the stream is partitioned. Copy pasting help text of Spark.



*The number of tasks per receiver per batch will be approximately (batch
interval / block interval). For example, block interval of 200 ms will
create 10 tasks per 2 second batches. If the number of tasks is too low
(that is, less than the number of cores per machine), then it will be
inefficient as all available cores will not be used to process the data. To
increase the number of tasks for a given batch interval, reduce the block
interval. However, the recommended minimum value of block interval is about
50 ms, below which the task launching overheads may be a problem.An
alternative to receiving data with multiple input streams / receivers is to
explicitly repartition the input data stream (using
inputStream.repartition()). This distributes the
received batches of data across the specified number of machines in the
cluster before further processing.*

Hemant Bhanawat 
www.snappydata.io

On Sun, Feb 21, 2016 at 11:01 PM, Saiph Kappa  wrote:

> Hi,
>
> I'm running a spark streaming application onto a spark cluster that spans
> 6 machines/workers. I'm using spark cluster standalone mode. Each machine
> has 8 cores. Is there any way to specify that I want to run my application
> on all 6 machines and just use 2 cores on each machine?
>
> Thanks
>


Error :Type mismatch error when passing hdfs file path to spark-csv load method

2016-02-21 Thread Divya Gehlot
Hi,
I am trying to dynamically create Dataframe by reading subdirectories under
parent directory

My code looks like

> import org.apache.spark._
> import org.apache.spark.sql._
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val hdfsConn = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://xxx.xx.xx.xxx:8020"), hadoopConf)
> hdfsConn.listStatus(new
> org.apache.hadoop.fs.Path("/TestDivya/Spark/ParentDir/")).foreach{
> fileStatus =>
>val filePathName = fileStatus.getPath().toString()
>val fileName = fileStatus.getPath().getName().toLowerCase()
>var df =  "df"+fileName
>df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load(filePathName)
> }


getting below error

> :35: error: type mismatch;
>  found   : org.apache.spark.sql.DataFrame
>  required: String
>  df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("inferSchema", "true").load(filePathName)


Am I missing something ?

Would really appreciate the help .


Thanks,
Divya


Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-21 Thread Varadharajan Mukundan
Yes, I was burned down by this issue couple of weeks back. This also means
that after every insert job, compaction should be run to access new rows
from Spark. Sad that this issue is not documented / mentioned anywhere.

On Mon, Feb 22, 2016 at 9:27 AM, @Sanjiv Singh 
wrote:

> Hi Varadharajan,
>
> Thanks for your response.
>
> Yes it is transnational table; See below *show create table. *
>
> Table hardly have 3 records , and after triggering minor compaction on
> tables , it start showing results on spark SQL.
>
>
> > *ALTER TABLE hivespark COMPACT 'major';*
>
>
> > *show create table hivespark;*
>
>   CREATE TABLE `hivespark`(
>
> `id` int,
>
> `name` string)
>
>   CLUSTERED BY (
>
> id)
>
>   INTO 32 BUCKETS
>
>   ROW FORMAT SERDE
>
> 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
>
>   STORED AS INPUTFORMAT
>
> 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
>
>   OUTPUTFORMAT
>
> 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
>
>   LOCATION
>
> 'hdfs://myhost:8020/apps/hive/warehouse/mydb.db/hivespark'
>   TBLPROPERTIES (
>
> 'COLUMN_STATS_ACCURATE'='true',
>
> 'last_modified_by'='root',
>
> 'last_modified_time'='1455859079',
>
> 'numFiles'='37',
>
> 'numRows'='3',
>
> 'rawDataSize'='0',
>
> 'totalSize'='11383',
>
> 'transactional'='true',
>
> 'transient_lastDdlTime'='1455864121') ;
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Mon, Feb 22, 2016 at 9:01 AM, Varadharajan Mukundan <
> srinath...@gmail.com> wrote:
>
>> Hi,
>>
>> Is the transaction attribute set on your table? I observed that hive
>> transaction storage structure do not work with spark yet. You can confirm
>> this by looking at the transactional attribute in the output of "desc
>> extended " in hive console.
>>
>> If you'd need to access transactional table, consider doing a major
>> compaction and then try accessing the tables
>>
>> On Mon, Feb 22, 2016 at 8:57 AM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> I have observed that Spark SQL is not returning records for hive
>>> bucketed ORC tables on HDP.
>>>
>>>
>>>
>>> On spark SQL , I am able to list all tables , but queries on hive
>>> bucketed tables are not returning records.
>>>
>>> I have also tried the same for non-bucketed hive tables. it is working
>>> fine.
>>>
>>>
>>>
>>> Same is working on plain Apache setup.
>>>
>>> Let me know if needs other details.
>>>
>>> Regards
>>> Sanjiv Singh
>>> Mob :  +091 9990-447-339
>>>
>>
>>
>>
>> --
>> Thanks,
>> M. Varadharajan
>>
>> 
>>
>> "Experience is what you get when you didn't get what you wanted"
>>-By Prof. Randy Pausch in "The Last Lecture"
>>
>> My Journal :- http://varadharajan.in
>>
>
>


-- 
Thanks,
M. Varadharajan



"Experience is what you get when you didn't get what you wanted"
   -By Prof. Randy Pausch in "The Last Lecture"

My Journal :- http://varadharajan.in


Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-21 Thread @Sanjiv Singh
Hi Varadharajan,

Thanks for your response.

Yes it is transnational table; See below *show create table. *

Table hardly have 3 records , and after triggering minor compaction on
tables , it start showing results on spark SQL.


> *ALTER TABLE hivespark COMPACT 'major';*


> *show create table hivespark;*

  CREATE TABLE `hivespark`(

`id` int,

`name` string)

  CLUSTERED BY (

id)

  INTO 32 BUCKETS

  ROW FORMAT SERDE

'org.apache.hadoop.hive.ql.io.orc.OrcSerde'

  STORED AS INPUTFORMAT

'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'

  OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'

  LOCATION

'hdfs://myhost:8020/apps/hive/warehouse/mydb.db/hivespark'
  TBLPROPERTIES (

'COLUMN_STATS_ACCURATE'='true',

'last_modified_by'='root',

'last_modified_time'='1455859079',

'numFiles'='37',

'numRows'='3',

'rawDataSize'='0',

'totalSize'='11383',

'transactional'='true',

'transient_lastDdlTime'='1455864121') ;


Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Mon, Feb 22, 2016 at 9:01 AM, Varadharajan Mukundan  wrote:

> Hi,
>
> Is the transaction attribute set on your table? I observed that hive
> transaction storage structure do not work with spark yet. You can confirm
> this by looking at the transactional attribute in the output of "desc
> extended " in hive console.
>
> If you'd need to access transactional table, consider doing a major
> compaction and then try accessing the tables
>
> On Mon, Feb 22, 2016 at 8:57 AM, @Sanjiv Singh 
> wrote:
>
>> Hi,
>>
>>
>> I have observed that Spark SQL is not returning records for hive bucketed
>> ORC tables on HDP.
>>
>>
>>
>> On spark SQL , I am able to list all tables , but queries on hive
>> bucketed tables are not returning records.
>>
>> I have also tried the same for non-bucketed hive tables. it is working
>> fine.
>>
>>
>>
>> Same is working on plain Apache setup.
>>
>> Let me know if needs other details.
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>
>
>
> --
> Thanks,
> M. Varadharajan
>
> 
>
> "Experience is what you get when you didn't get what you wanted"
>-By Prof. Randy Pausch in "The Last Lecture"
>
> My Journal :- http://varadharajan.in
>


Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-21 Thread Varadharajan Mukundan
Hi,

Is the transaction attribute set on your table? I observed that hive
transaction storage structure do not work with spark yet. You can confirm
this by looking at the transactional attribute in the output of "desc
extended " in hive console.

If you'd need to access transactional table, consider doing a major
compaction and then try accessing the tables

On Mon, Feb 22, 2016 at 8:57 AM, @Sanjiv Singh 
wrote:

> Hi,
>
>
> I have observed that Spark SQL is not returning records for hive bucketed
> ORC tables on HDP.
>
>
>
> On spark SQL , I am able to list all tables , but queries on hive bucketed
> tables are not returning records.
>
> I have also tried the same for non-bucketed hive tables. it is working
> fine.
>
>
>
> Same is working on plain Apache setup.
>
> Let me know if needs other details.
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>



-- 
Thanks,
M. Varadharajan



"Experience is what you get when you didn't get what you wanted"
   -By Prof. Randy Pausch in "The Last Lecture"

My Journal :- http://varadharajan.in


Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-21 Thread @Sanjiv Singh
Hi,


I have observed that Spark SQL is not returning records for hive bucketed
ORC tables on HDP.



On spark SQL , I am able to list all tables , but queries on hive bucketed
tables are not returning records.

I have also tried the same for non-bucketed hive tables. it is working fine.



Same is working on plain Apache setup.

Let me know if needs other details.

Regards
Sanjiv Singh
Mob :  +091 9990-447-339


RE: Submitting Jobs Programmatically

2016-02-21 Thread Patrick Mi
Hi there,

I had similar problem in Java with the standalone cluster on Linux but got
that working by passing the following option

-Dspark.jars=file:/path/to/sparkapp.jar

sparkapp.jar has the launch application

Hope that helps.

Regards,

Patrick



-Original Message-
From: Arko Provo Mukherjee [mailto:arkoprovomukher...@gmail.com]
Sent: Saturday, 20 February 2016 4:27 p.m.
To: Ted Yu
Cc: Holden Karau; user
Subject: Re: Submitting Jobs Programmatically

Hello,

Thanks much. I could start the service.

When I run my program, the launcher is not being able to find the app class:

java.lang.ClassNotFoundException: SparkSubmitter
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
Spark job complete. Exit code:101
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:639)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My launch code is as follows:
val spark = new SparkLauncher()
.setSparkHome("C:\\spark-1.5.1-bin-hadoop2.6")

.setAppResource("C:\\SparkService\\Scala\\RequestSubmitter\\target\\scala-2.10\\spark-submitter_2.10-0.0.1.jar")
.setMainClass("SparkSubmitter")
.addAppArgs(inputQuery)
.setMaster("spark://157.54.189.70:7077")
.launch()
spark.waitFor()

I added the spark-submitter_2.10-0.0.1.jar in the classpath as well
but that didn't help.

Thanks & regards
Arko

On Fri, Feb 19, 2016 at 6:49 PM, Ted Yu  wrote:
> Cycling old bits:
>
> http://search-hadoop.com/m/q3RTtHrxMj2abwOk2
>
> On Fri, Feb 19, 2016 at 6:40 PM, Arko Provo Mukherjee
>  wrote:
>>
>> Hi,
>>
>> Thanks for your response. Is there a similar link for Windows? I am
>> not sure the .sh scripts would run on windows.
>>
>> My default the start-all.sh doesn't work and I don't see anything in
>> localhos:8080
>>
>> I will do some more investigation and come back.
>>
>> Thanks again for all your help!
>>
>> Thanks & regards
>> Arko
>>
>>
>> On Fri, Feb 19, 2016 at 6:35 PM, Ted Yu  wrote:
>> > Please see https://spark.apache.org/docs/latest/spark-standalone.html
>> >
>> > On Fri, Feb 19, 2016 at 6:27 PM, Arko Provo Mukherjee
>> >  wrote:
>> >>
>> >> Hi,
>> >>
>> >> Thanks for your response, that really helped.
>> >>
>> >> However, I don't believe the job is being submitted. When I run spark
>> >> from the shell, I don't need to start it up explicitly. Do I need to
>> >> start up Spark on my machine before running this program?
>> >>
>> >> I see the following in the SPARK_HOME\bin directory:
>> >> Name
>> >> 
>> >> beeline.cmd
>> >> load-spark-env.cmd
>> >> pyspark.cmd
>> >> pyspark2.cmd
>> >> run-example.cmd
>> >> run-example2.cmd
>> >> spark-class.cmd
>> >> spark-class2.cmd
>> >> spark-shell.cmd
>> >> spark-shell2.cmd
>> >> spark-submit.cmd
>> >> spark-submit2.cmd
>> >> sparkR.cmd
>> >> sparkR2.cmd
>> >>
>> >> Do I need to run anyone of them before submitting the job via the
>> >> program?
>> >>
>> >> Thanks & regards
>> >> Arko
>> >>
>> >> On Fri, Feb 19, 2016 at 6:01 PM, Holden Karau 
>> >> wrote:
>> >> > How are you trying to launch your application? Do you have the Spark
>> >> > jars on
>> >> > your class path?
>> >> >
>> >> >
>> >> > On Friday, February 19, 2016, Arko Provo Mukherjee
>> >> >  wrote:
>> >> >>
>> >> >> Hello,
>> >> >>
>> >> >> I am trying to submit a spark job via a program.
>> >> >>
>> >> >> When I run it, I receive the following error:
>> >> >> Exception in thread "Thread-1" java.lang.NoClassDefFoundError:
>> >> >> org/apache/spark/launcher/SparkLauncher
>> >> >> at Spark.SparkConnector.run(MySpark.scala:33)
>> >> >> at java.lang.Thread.run(Thread.java:745)
>> >> >> Caused by: java.lang.ClassNotFoundException:
>> >> >> org.apache.spark.launcher.SparkLauncher
>> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> >> >> at java.security.AccessController.doPrivileged(Native
>> >> >> Method)
>> >> >> at
>> >> >> 

Re:RE: how to set database in DataFrame.saveAsTable?

2016-02-21 Thread Mich Talebzadeh
 

Well my version of Spark is 1.5.2 

On 21/02/2016 23:54, Jacky Wang wrote: 

> df.write.saveAsTable("db_name.tbl_name") // works, spark-shell, latest spark 
> version 1.6.0 
> df.write.saveAsTable("db_name.tbl_name") // NOT work, spark-shell, old spark 
> version 1.4 
> 
> --
> 
> Jacky Wang 
> 
> At 2016-02-21 17:35:53, "Mich Talebzadeh"  wrote:
> 
> I looked at doc on this. It is not clear what goes behind the scene. Very 
> little documentation on it 
> 
> First in Hive a database has to exist before it can be used so sql("use 
> mytable") will not create a database for you. 
> 
> Also you cannot call your table mytable in database mytable! 
> 
> Remember in Hive the hierarchy is the database followed by tables. 
> 
> If you want to create a Hive database (I would not for every table) for this 
> purpose you can do 
> 
> scala> sql("create database if not exists mytable_db") 
> 
> res10: org.apache.spark.sql.DataFrame = [result: string] 
> 
> scala> sql("use mytable_db") 
> 
> res12: org.apache.spark.sql.DataFrame = [result: string] 
> 
> This puts you in the context of mytable_db database 
> 
> If you do 
> 
> hdfs dfs -ls /user/hive/warehouse 
> 
> You will see a directory called mytable_db.db is created 
> 
> /user/hive/warehouse/mytable.db 
> 
> Then you can create a table in Hive in mytable_db if you wish. The way I do 
> it personally is to register your DF as a temporary table and do 
> insert/select into Hive table 
> 
> scala> sql("use mytable_db") 
> 
> res21: org.apache.spark.sql.DataFrame = [result: string] 
> 
> scala> """ 
> 
> | CREATE TABLE mytable ( 
> 
> | INVOICENUMBER INT 
> 
> | ,PAYMENTDATE timestamp 
> 
> | ,NET DECIMAL(20,2) 
> 
> | ,VAT DECIMAL(20,2) 
> 
> | ,TOTAL DECIMAL(20,2) 
> 
> | ) 
> 
> | COMMENT 'a test table' 
> 
> | STORED AS ORC 
> 
> | TBLPROPERTIES ( "orc.compress"="ZLIB" ) 
> 
> | """ 
> 
> res22: String = 
> 
> " 
> 
> CREATE TABLE mytable ( 
> 
> INVOICENUMBER INT 
> 
> ,PAYMENTDATE timestamp 
> 
> ,NET DECIMAL(20,2) 
> 
> ,VAT DECIMAL(20,2) 
> 
> ,TOTAL DECIMAL(20,2) 
> 
> ) 
> 
> COMMENT 'a test table' 
> 
> STORED AS ORC 
> 
> TBLPROPERTIES ( "orc.compress"="ZLIB" ) 
> 
> " 
> 
> scala> sql(sqltext) 
> 
> res6: org.apache.spark.sql.DataFrame = [result: string] 
> 
> My DF is called "a" below so I register it as a temp table called tmp 
> 
> a.toDF.registerTempTable("tmp") 
> 
> Then just insert/select into Hicve table mytable from tmp 
> 
> scala> sqltext = "INSERT INTO mytable SELECT * FROM tmp" 
> 
> sqltext: String = INSERT INTO mytable SELECT * FROM tmp 
> 
> scala> sql(sqltext) 
> 
> res10: org.apache.spark.sql.DataFrame = [] 
> 
> scala> sql("select count(1) from mytable").show 
> 
> +---+ 
> 
> |_c0| 
> 
> +---+ 
> 
> | 65| 
> 
> HTH 
> 
> Dr Mich Talebzadeh 
> 
> LinkedIn _ 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  [1]_ 
> 
> http://talebzadehmich.wordpress.com [2] 
> 
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility. 
> 
> FROM: Glen [mailto:cng...@gmail.com] 
> SENT: 21 February 2016 03:26
> TO: gen tang 
> CC: user@spark.apache.org
> SUBJECT: Re: how to set database in DataFrame.saveAsTable? 
> 
> Any example code? 
> 
> In pyspark: 
> 
> sqlContex.sql("use mytable") 
> 
> my_df.saveAsTable("tmp_spark_debug", mode="overwrite") 
> 
> 1. The code above seems not register the table in hive. I have to create 
> table from hdfs in hive, it reports some format error: rcformat and parquet. 
> 
> 2. Rerun the saveAsTable using mode="overwrite" in saveAsTable, it reports 
> the table already exists. 
> 
> 3. Sometimes it creates a directory in hive/warehouse/tmp_spark_debug, not in 
> hive/warehouse/mytable/tmp_spark_debug. 
> 
> My goal is simple: 
> 
> df.saveAsTable('blablabla') // create a hive table in some database, then it 
> can be visited by hive. 
> 
> I tried lots of time, it seems there are lots of bug in pyspark. Or my mehtod 
> is wrong? 
> 
> 2016-02-21 10:04 GMT+08:00 gen tang : 
> 
> Hi, 
> 
> You can use 
> 
> sqlContext.sql("use ") 
> 
> before use dataframe.saveAsTable 
> 
> Hope it could be helpful 
> 
> Cheers 
> 
> Gen 
> 
> On Sun, Feb 21, 2016 at 9:55 AM, Glen  wrote: 
> 
> For dataframe in spark, so the table can be visited by hive.
> 
> -- 
> 
> Jacky Wang

-- 

Jacky Wang 

-- 

Dr Mich Talebzadeh

LinkedIn

Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
>
> Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
> For reference, final solution:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
> // create a StreamingContext, the main entry point for all streaming 
> functionality
> val ssc = new StreamingContext(sc, Seconds(2))
> val inputStream = ssc.socketTextStream("hostname", )
> val parsedDstream = inputStream
>   .map(line => {
> val splitLines = line.split(",")
> (splitLines(1), splitLines.slice(2, 
> splitLines.length).map(_.trim.toInt))
>   })
>   .reduceByKey((first, second) => {
> val listOfArrays = ArrayBuffer(first, second)
> listOfArrays.toList.transpose.map(_.sum).toArray
>   })
>   .foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> }
>
>
> Regards,
> Vinti
>
> On Sun, Feb 21, 2016 at 2:22 PM, ayan guha  wrote:
>
>> I believe the best way would be to use reduceByKey operation.
>>
>> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> You will need to do a collect and update a global map if you want to.
>>>
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>  .foreachRDD(rdd => {
>>>rdd.collect().foreach((fileName, valueTuple) => >> global map here>)
>>>  })
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari >> > wrote:
>>>
 Nevermind, seems like an executor level mutable map is not recommended
 as stated in
 http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

 On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari  wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
> def parseCoverageLine(str: String) = {
>   val arr = str.split(",")
>   ...
>   ...
>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
> }
>
> Then in the grouping, can i use a global hash map per executor /
> partition to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
> // if exists in global map, append result else add new key
>
> // globalMap
> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar  > wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and
>> value tuple and then you can simply use groupByKey and handle the values
>> the way you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>> + r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>> vinti.u...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying
>>> to figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure
>>> how to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i
>>> am not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>   counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>

>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)
// create a StreamingContext, the main entry point for all
streaming functionality
val ssc = new StreamingContext(sc, Seconds(2))
val inputStream = ssc.socketTextStream("hostname", )
val parsedDstream = inputStream
  .map(line => {
val splitLines = line.split(",")
(splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
  })
  .reduceByKey((first, second) => {
val listOfArrays = ArrayBuffer(first, second)
listOfArrays.toList.transpose.map(_.sum).toArray
  })
  .foreachRDD(rdd => rdd.foreach(Blaher.blah))

}


Regards,
Vinti

On Sun, Feb 21, 2016 at 2:22 PM, ayan guha  wrote:

> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>  .foreachRDD(rdd => {
>>rdd.collect().foreach((fileName, valueTuple) => > map here>)
>>  })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
>>> wrote:
>>>
 Thanks for your reply Jatin. I changed my parsing logic to what you
 suggested:

 def parseCoverageLine(str: String) = {
   val arr = str.split(",")
   ...
   ...
   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
 }

 Then in the grouping, can i use a global hash map per executor /
 partition to aggregate the results?

 val globalMap:[String: List[Int]] = Map()
 val coverageDStream = inputStream.map(parseCoverageLine)
 coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
 // if exists in global map, append result else add new key

 // globalMap
 // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
 })

 Thanks,
 Vinti

 On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
 wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and
> value tuple and then you can simply use groupByKey and handle the values
> the way you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
> + r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
> vinti.u...@gmail.com> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying
>> to figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure
>> how to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am
>> not sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>   counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re:RE: how to set database in DataFrame.saveAsTable?

2016-02-21 Thread Jacky Wang
df.write.saveAsTable("db_name.tbl_name")  // works, spark-shell, latest spark 
version 1.6.0
df.write.saveAsTable("db_name.tbl_name")  // NOT work, spark-shell, old spark 
version 1.4






--

Jacky Wang

At 2016-02-21 17:35:53, "Mich Talebzadeh"  wrote:


I looked at doc on this. It is not clear what goes behind the scene. Very 
little documentation on it

 

First in Hive a database has to exist before it can be used so sql(“use 
mytable”) will not create a database for you.

 

Also you cannot call your table mytable in database mytable!

 

Remember in Hive the hierarchy is the database followed by tables.

 

If you want to create a Hive database (I would not for every table) for this 
purpose you can do

 

scala> sql("create database if not exists mytable_db")

res10: org.apache.spark.sql.DataFrame = [result: string]

 

scala> sql("use mytable_db")

res12: org.apache.spark.sql.DataFrame = [result: string]

 

This puts you in the context of mytable_db database

 

If you do

 

hdfs dfs -ls /user/hive/warehouse

 

You will see a directory called mytable_db.db is created

 

/user/hive/warehouse/mytable.db

 

Then you can create a table in Hive in mytable_db if you wish.  The way I do it 
personally is to register your DF as a temporary table and do insert/select 
into Hive table

 

scala> sql("use mytable_db")

res21: org.apache.spark.sql.DataFrame = [result: string]

 

scala> """

 | CREATE TABLE mytable (

 | INVOICENUMBER  INT

 | ,PAYMENTDATEtimestamp

 | ,NETDECIMAL(20,2)

 | ,VATDECIMAL(20,2)

 | ,TOTAL  DECIMAL(20,2)

 | )

 |  COMMENT 'a test table'

 | STORED AS ORC

 | TBLPROPERTIES ( "orc.compress"="ZLIB" )

 | """

res22: String =

"

CREATE TABLE mytable (

INVOICENUMBER  INT

,PAYMENTDATEtimestamp

,NETDECIMAL(20,2)

,VATDECIMAL(20,2)

,TOTAL  DECIMAL(20,2)

)

COMMENT 'a test table'

STORED AS ORC

TBLPROPERTIES ( "orc.compress"="ZLIB" )

"

scala> sql(sqltext)

res6: org.apache.spark.sql.DataFrame = [result: string]

 

 

My DF is called “a” below so I register it as a temp table called tmp

 

a.toDF.registerTempTable("tmp")

 

Then just insert/select into Hicve table mytable from tmp

 

scala> sqltext = "INSERT INTO mytable SELECT * FROM tmp"

sqltext: String = INSERT INTO mytable SELECT * FROM tmp

 

scala> sql(sqltext)

res10: org.apache.spark.sql.DataFrame = []

 

scala> sql("select count(1) from mytable").show

+---+

|_c0|

+---+

| 65|

|

 

|

 

HTH

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Glen [mailto:cng...@gmail.com]
Sent: 21 February 2016 03:26
To: gen tang 
Cc:user@spark.apache.org
Subject: Re: how to set database in DataFrame.saveAsTable?

 

Any example code?

 

In pyspark:

sqlContex.sql("use mytable")

my_df.saveAsTable("tmp_spark_debug", mode="overwrite")

 

1. The code above seems not register the table in hive. I have to create table 
from hdfs in hive, it reports some format error: rcformat and parquet.

2. Rerun the saveAsTable using  mode="overwrite" in saveAsTable, it reports the 
table already exists.

3. Sometimes it creates a directory in  hive/warehouse/tmp_spark_debug, not in 
hive/warehouse/mytable/tmp_spark_debug.

 

 

My goal is simple:

df.saveAsTable('blablabla')  // create a hive table in some database, then it 
can be visited by hive.

 

I tried lots of time, it seems there are lots of bug in pyspark. Or my mehtod 
is wrong?

 

2016-02-21 10:04 GMT+08:00 gen tang :

Hi,

 

You can use 

sqlContext.sql("use ")

before use dataframe.saveAsTable

 

Hope it could be helpful

 

Cheers

Gen

 

 

On Sun, Feb 21, 2016 at 9:55 AM, Glen  wrote:

For dataframe in spark, so the table can be visited by hive.


 

--

Jacky Wang

 





 

--

Jacky Wang

Re: Stream group by

2016-02-21 Thread ayan guha
I believe the best way would be to use reduceByKey operation.

On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>  .foreachRDD(rdd => {
>rdd.collect().foreach((fileName, valueTuple) =>  map here>)
>  })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>> def parseCoverageLine(str: String) = {
>>>   val arr = str.split(",")
>>>   ...
>>>   ...
>>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>> }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>> // if exists in global map, append result else add new key
>>>
>>> // globalMap
>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
>>> wrote:
>>>
 Hello Vinti,

 One way to get this done is you split your input line into key and
 value tuple and then you can simply use groupByKey and handle the values
 the way you want. For example:

 Assuming you have already split the values into a 5 tuple:
 myDStream.map(record => (record._2, (record._3, record_4, record._5))
  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
 r2._3))

 I hope that helps.

 --
 Thanks
 Jatin Kumar | Rocket Scientist
 +91-7696741743 m

 On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
 vinti.u...@gmail.com> wrote:

> Hello,
>
> I have input lines like below
>
> *Input*
> t1, file1, 1, 1, 1
> t1, file1, 1, 2, 3
> t1, file2, 2, 2, 2, 2
> t2, file1, 5, 5, 5
> t2, file2, 1, 1, 2, 2
>
> and i want to achieve the output like below rows which is a vertical
> addition of the corresponding numbers.
>
> *Output*
> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>
> I am in a spark streaming context and i am having a hard time trying
> to figure out the way to group by file name.
>
> It seems like i will need to use something like below, i am not sure
> how to get to the correct syntax. Any inputs will be helpful.
>
> myDStream.foreachRDD(rdd => rdd.groupBy())
>
> I know how to do the vertical sum of array of given numbers, but i am
> not sure how to feed that function to the group by.
>
>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>   counts.toList.transpose.map(_.sum)
>   }
>
> ~Thanks,
> Vinti
>


>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Job Hanging on Join

2016-02-21 Thread Gourav Sengupta
Sorry,

please include the following questions to the list above:

the SPARK version?
whether you are using RDD or DataFrames?
is the code run locally or in SPARK Cluster mode or in AWS EMR?


Regards,
Gourav Sengupta

On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta 
wrote:

> Hi Tamara,
>
> few basic questions first.
>
> How many executors are you using?
> Is the data getting all cached into the same executor?
> How many partitions do you have of the data?
> How many fields are you trying to use in the join?
>
> If you need any help in finding answer to these questions please let me
> know. From what I reckon joins like yours should not take more than a few
> milliseconds.
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt  wrote:
>
>> Hi all,
>>
>> I am running a Spark job that gets stuck attempting to join two
>> dataframes. The dataframes are not very large, one is about 2 M rows, and
>> the other a couple of thousand rows and the resulting joined dataframe
>> should be about the same size as the smaller dataframe. I have tried
>> triggering execution of the join using the 'first' operator, which as far
>> as I understand would not require processing the entire resulting dataframe
>> (maybe I am mistaken though). The Spark UI is not telling me anything, just
>> showing the task to be stuck.
>>
>> When I run the exact same job on a slightly smaller dataset it works
>> without hanging.
>>
>> I have used the same environment to run joins on much larger dataframes,
>> so I am confused as to why in this particular case my Spark job is just
>> hanging. I have also tried running the same join operation using pyspark on
>> two 2 Million row dataframes (exactly like the one I am trying to join in
>> the job that gets stuck) and it runs succesfully.
>>
>> I have tried caching the joined dataframe to see how much memory it is
>> requiring but the job gets stuck on this action too. I have also tried
>> using persist to memory and disk on the join, and the job seems to be stuck
>> all the same.
>>
>> Any help as to where to look for the source of the problem would be much
>> appreciated.
>>
>> Cheers,
>>
>> Tamara
>>
>>
>


Re: Spark Job Hanging on Join

2016-02-21 Thread Gourav Sengupta
Hi Tamara,

few basic questions first.

How many executors are you using?
Is the data getting all cached into the same executor?
How many partitions do you have of the data?
How many fields are you trying to use in the join?

If you need any help in finding answer to these questions please let me
know. From what I reckon joins like yours should not take more than a few
milliseconds.


Regards,
Gourav Sengupta

On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt  wrote:

> Hi all,
>
> I am running a Spark job that gets stuck attempting to join two
> dataframes. The dataframes are not very large, one is about 2 M rows, and
> the other a couple of thousand rows and the resulting joined dataframe
> should be about the same size as the smaller dataframe. I have tried
> triggering execution of the join using the 'first' operator, which as far
> as I understand would not require processing the entire resulting dataframe
> (maybe I am mistaken though). The Spark UI is not telling me anything, just
> showing the task to be stuck.
>
> When I run the exact same job on a slightly smaller dataset it works
> without hanging.
>
> I have used the same environment to run joins on much larger dataframes,
> so I am confused as to why in this particular case my Spark job is just
> hanging. I have also tried running the same join operation using pyspark on
> two 2 Million row dataframes (exactly like the one I am trying to join in
> the job that gets stuck) and it runs succesfully.
>
> I have tried caching the joined dataframe to see how much memory it is
> requiring but the job gets stuck on this action too. I have also tried
> using persist to memory and disk on the join, and the job seems to be stuck
> all the same.
>
> Any help as to where to look for the source of the problem would be much
> appreciated.
>
> Cheers,
>
> Tamara
>
>


Re: Evaluating spark streaming use case

2016-02-21 Thread Ted Yu
w.r.t. the new mapWithState(), there have been some bug fixes since the
release of 1.6.0
e.g.

SPARK-13121 java mapWithState mishandles scala Option

Looks like 1.6.1 RC should come out next week.

FYI

On Sun, Feb 21, 2016 at 10:47 AM, Chris Fregly  wrote:

> good catch on the cleaner.ttl
>
> @jatin-  when you say "memory-persisted RDD", what do you mean exactly?
>  and how much data are you talking about?  remember that spark can evict
> these memory-persisted RDDs at any time.  they can be recovered from Kafka,
> but this is not a good situation to be in.
>
> also, is this spark 1.6 with the new mapState() or the old
> updateStateByKey()?  you definitely want the newer 1.6 mapState().
>
> and is there any other way to store and aggregate this data outside of
> spark?  I get a bit nervous when I see people treat spark/streaming like an
> in-memory database.
>
> perhaps redis or another type of in-memory store is more appropriate.  or
> just write to long-term storage using parquet.
>
> if this is a lot of data, you may want to use approximate probabilistic
> data structures like CountMin Sketch or HyperLogLog.  here's some relevant
> links with more info - including how to use these with redis:
>
>
> https://www.slideshare.net/cfregly/advanced-apache-spark-meetup-approximations-and-probabilistic-data-structures-jan-28-2016-galvanize
>
>
> https://github.com/fluxcapacitor/pipeline/tree/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx
>
> you can then setup a cron (or airflow) spark job to do the compute and
> aggregate against either redis or long-term storage.
>
> this reference pipeline contains the latest airflow workflow scheduler:
> https://github.com/fluxcapacitor/pipeline/wiki
>
> my advice with spark streaming is to get the data out of spark streaming
> as quickly as possible - and into a more durable format more suitable for
> aggregation and compute.
>
> this greatly simplifies your operational concerns, in my
> opinion.
>
> good question.  very common use case.
>
> On Feb 21, 2016, at 12:22 PM, Ted Yu  wrote:
>
> w.r.t. cleaner TTL, please see:
>
> [SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0
>
> FYI
>
> On Sun, Feb 21, 2016 at 4:16 AM, Gerard Maas 
> wrote:
>
>>
>> It sounds like another  window operation on top of the 30-min window will
>> achieve the  desired objective.
>> Just keep in mind that you'll need to set the clean TTL (
>> spark.cleaner.ttl) to a long enough value and you will require enough
>> resources (mem & disk) to keep the required data.
>>
>> -kr, Gerard.
>>
>> On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello Spark users,
>>>
>>> I have to aggregate messages from kafka and at some fixed interval (say
>>> every half hour) update a memory persisted RDD and run some computation.
>>> This computation uses last one day data. Steps are:
>>>
>>> - Read from realtime Kafka topic X in spark streaming batches of 5
>>> seconds
>>> - Filter the above DStream messages and keep some of them
>>> - Create windows of 30 minutes on above DStream and aggregate by Key
>>> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
>>> - Maintain last N such RDDs in a deque persisting them on disk. While
>>> adding new RDD, subtract oldest RDD from the combinedRdd.
>>> - Final step consider last N such windows (of 30 minutes each) and do
>>> final aggregation
>>>
>>> Does the above way of using spark streaming looks reasonable? Is there a
>>> better way of doing the above?
>>>
>>> --
>>> Thanks
>>> Jatin
>>>
>>>
>>
>


Re: Evaluating spark streaming use case

2016-02-21 Thread Chris Fregly
good catch on the cleaner.ttl

@jatin-  when you say "memory-persisted RDD", what do you mean exactly?  and 
how much data are you talking about?  remember that spark can evict these 
memory-persisted RDDs at any time.  they can be recovered from Kafka, but this 
is not a good situation to be in.

also, is this spark 1.6 with the new mapState() or the old updateStateByKey()?  
you definitely want the newer 1.6 mapState().

and is there any other way to store and aggregate this data outside of spark?  
I get a bit nervous when I see people treat spark/streaming like an in-memory 
database.

perhaps redis or another type of in-memory store is more appropriate.  or just 
write to long-term storage using parquet.

if this is a lot of data, you may want to use approximate probabilistic data 
structures like CountMin Sketch or HyperLogLog.  here's some relevant links 
with more info - including how to use these with redis: 

https://www.slideshare.net/cfregly/advanced-apache-spark-meetup-approximations-and-probabilistic-data-structures-jan-28-2016-galvanize

https://github.com/fluxcapacitor/pipeline/tree/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx

you can then setup a cron (or airflow) spark job to do the compute and 
aggregate against either redis or long-term storage.

this reference pipeline contains the latest airflow workflow scheduler:  
https://github.com/fluxcapacitor/pipeline/wiki

my advice with spark streaming is to get the data out of spark streaming as 
quickly as possible - and into a more durable format more suitable for 
aggregation and compute.  

this greatly simplifies your operational concerns, in my
opinion.

good question.  very common use case.

> On Feb 21, 2016, at 12:22 PM, Ted Yu  wrote:
> 
> w.r.t. cleaner TTL, please see:
> 
> [SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0
> 
> FYI
> 
>> On Sun, Feb 21, 2016 at 4:16 AM, Gerard Maas  wrote:
>> 
>> It sounds like another  window operation on top of the 30-min window will 
>> achieve the  desired objective. 
>> Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl) 
>> to a long enough value and you will require enough resources (mem & disk) to 
>> keep the required data.  
>> 
>> -kr, Gerard.
>> 
>>> On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar 
>>>  wrote:
>>> Hello Spark users,
>>> 
>>> I have to aggregate messages from kafka and at some fixed interval (say 
>>> every half hour) update a memory persisted RDD and run some computation. 
>>> This computation uses last one day data. Steps are:
>>> 
>>> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
>>> - Filter the above DStream messages and keep some of them
>>> - Create windows of 30 minutes on above DStream and aggregate by Key
>>> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
>>> - Maintain last N such RDDs in a deque persisting them on disk. While 
>>> adding new RDD, subtract oldest RDD from the combinedRdd.
>>> - Final step consider last N such windows (of 30 minutes each) and do final 
>>> aggregation
>>> 
>>> Does the above way of using spark streaming looks reasonable? Is there a 
>>> better way of doing the above?
>>> 
>>> --
>>> Thanks
>>> Jatin
> 


Re: RDD[org.apache.spark.sql.Row] filter ERROR

2016-02-21 Thread Tenghuan He
Hi Ted,

Thanks a lot for you reply

I tried your code in spark-shell on my laptop it works well.
 But when I tried it on another computer installed with spark I got an Error
​​
scala> val df0 = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B",
"C", "num")
:11: error: value toDF is not a member of Seq[(String, String,
String, Int)]
   val df0 = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B",
"C", "num")
 ^
do you know why?

And when I tried the same code using Intellij IDEA scala console [Intellij
IDEA 14.1.5 with scala plugin], I got the same error.*value toDF is not a
member of Seq[(String, String, String, Int)]*

And here is the details of my code.
package com.blop

object DFUpdater {

  def update(sqlContext: SQLContext, df0: DataFrame): DataFrame = {
val deletedIds = List("1", "2", "3")
val schema = df0.schema
val rdd0 = df0.rdd
val rdd1 = rdd0.filter(r => !deletedIds.contains(r(0)))
return sqlContext.createDataFrame(rdd1, schema)
  }
}

The above code was compiled into a jar and I call

Updater.update(sc, someDF) then got the classNotFoundException

Looks like that it's the same error that caused the [*value toDF is
not a member of...] *above that leads to the longlong
ClassNotFoundException

I'll see what's wrong with my Spark configuration, if something found
I will let you know:)


Thanks a lot!


On Sun, Feb 21, 2016 at 10:53 PM, Ted Yu  wrote:

> I tried the following in spark-shell:
>
> scala> val df0 = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A",
> "B", "C", "num")
> df0: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more
> fields]
>
> scala> val idList = List("1", "2", "3")
> idList: List[String] = List(1, 2, 3)
>
> scala> val rdd0 = df0.rdd
> rdd0: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
> MapPartitionsRDD[2] at rdd at :24
>
> scala> val schema = df0.schema
> schema: org.apache.spark.sql.types.StructType =
> StructType(StructField(A,StringType,true), StructField(B,StringType,true),
> StructField(C,StringType,true), StructField(num,IntegerType,false))
>
> scala> val rdd1 = rdd0.filter(r => !idList.contains(r(3)))
> rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
> MapPartitionsRDD[3] at filter at :28
>
> scala> val df1 = sqlContext.createDataFrame(rdd1, schema)
> df1: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more
> fields]
>
> scala> df1.count()
> res0: Long = 2
>
> Can you disclose more of your code where the ClassNotFoundException can
> be reproduced ?
>
> On Sun, Feb 21, 2016 at 6:42 AM, Tenghuan He  wrote:
>
>> Hi, everyone.
>>
>> I have a spark program, where df0 is a DataFrame
>>
>> val idList = List("1", "2", "3", ...)
>> val rdd0 = df0.rdd
>> val schema = df0.schema
>> val rdd1 = rdd0.filter(r => !idList.contains(r(0)))
>> val df1 = sc.createDataFrame(rdd1, schema)
>>
>> When I run df1.count(), I got the following Exception at the end.
>>
>>
>> I found this 
>> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-td4657.html,
>>  but it doesn't work for me.
>>
>> If you enclose a field member of an object into a closure, not only this 
>> field but also the whole outer object is enclosed into the closure. If the 
>> outer object is not serializable, then RDD DAG serialization would fail. You 
>> can simply reference the field member with a separate variable to workaround 
>> this:
>>
>>
>> Can anyone tell me why? Thanks in advance :)
>>
>>
>> Thanks & Best regards
>>
>> Tenghuan He
>>
>>
>> [Stage 11:>(0 +
>> 7) / 19]org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 12 in stage 11.0 failed 4 times, most recent failure: Lost task 12.3
>> in stage 11.0 (TID 99, 127.0.0.1): java.lang.ClassNotFoundException:
>> com.blop.Updater$$anonfun$11
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> 

Re: Stream group by

2016-02-21 Thread Jatin Kumar
You will need to do a collect and update a global map if you want to.

myDStream.map(record => (record._2, (record._3, record_4, record._5))
 .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))
 .foreachRDD(rdd => {
   rdd.collect().foreach((fileName, valueTuple) => )
 })

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari 
wrote:

> Nevermind, seems like an executor level mutable map is not recommended as
> stated in
> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>
> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
> wrote:
>
>> Thanks for your reply Jatin. I changed my parsing logic to what you
>> suggested:
>>
>> def parseCoverageLine(str: String) = {
>>   val arr = str.split(",")
>>   ...
>>   ...
>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>> }
>>
>> Then in the grouping, can i use a global hash map per executor /
>> partition to aggregate the results?
>>
>> val globalMap:[String: List[Int]] = Map()
>> val coverageDStream = inputStream.map(parseCoverageLine)
>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>> // if exists in global map, append result else add new key
>>
>> // globalMap
>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>> })
>>
>> Thanks,
>> Vinti
>>
>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
>> wrote:
>>
>>> Hello Vinti,
>>>
>>> One way to get this done is you split your input line into key and value
>>> tuple and then you can simply use groupByKey and handle the values the way
>>> you want. For example:
>>>
>>> Assuming you have already split the values into a 5 tuple:
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>
>>> I hope that helps.
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari >> > wrote:
>>>
 Hello,

 I have input lines like below

 *Input*
 t1, file1, 1, 1, 1
 t1, file1, 1, 2, 3
 t1, file2, 2, 2, 2, 2
 t2, file1, 5, 5, 5
 t2, file2, 1, 1, 2, 2

 and i want to achieve the output like below rows which is a vertical
 addition of the corresponding numbers.

 *Output*
 “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
 “file2” : [ 2+1, 2+1, 2+2, 2+2 ]

 I am in a spark streaming context and i am having a hard time trying to
 figure out the way to group by file name.

 It seems like i will need to use something like below, i am not sure
 how to get to the correct syntax. Any inputs will be helpful.

 myDStream.foreachRDD(rdd => rdd.groupBy())

 I know how to do the vertical sum of array of given numbers, but i am
 not sure how to feed that function to the group by.

   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
   counts.toList.transpose.map(_.sum)
   }

 ~Thanks,
 Vinti

>>>
>>>
>>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Nevermind, seems like an executor level mutable map is not recommended as
stated in
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari 
wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
> def parseCoverageLine(str: String) = {
>   val arr = str.split(",")
>   ...
>   ...
>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
> }
>
> Then in the grouping, can i use a global hash map per executor / partition
> to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
> // if exists in global map, append result else add new key
>
> // globalMap
> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
> wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and value
>> tuple and then you can simply use groupByKey and handle the values the way
>> you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
>> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying to
>>> figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure how
>>> to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i am
>>> not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>   counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>


Re: Stream group by

2016-02-21 Thread Vinti Maheshwari
Thanks for your reply Jatin. I changed my parsing logic to what you
suggested:

def parseCoverageLine(str: String) = {
  val arr = str.split(",")
  ...
  ...
  (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
}

Then in the grouping, can i use a global hash map per executor / partition
to aggregate the results?

val globalMap:[String: List[Int]] = Map()
val coverageDStream = inputStream.map(parseCoverageLine)
coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
// if exists in global map, append result else add new key

// globalMap
// { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
})

Thanks,
Vinti

On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar 
wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and value
> tuple and then you can simply use groupByKey and handle the values the way
> you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying to
>> figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure how
>> to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am not
>> sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>   counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>


Re: Stream group by

2016-02-21 Thread Jatin Kumar
Hello Vinti,

One way to get this done is you split your input line into key and value
tuple and then you can simply use groupByKey and handle the values the way
you want. For example:

Assuming you have already split the values into a 5 tuple:
myDStream.map(record => (record._2, (record._3, record_4, record._5))
 .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))

I hope that helps.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari 
wrote:

> Hello,
>
> I have input lines like below
>
> *Input*
> t1, file1, 1, 1, 1
> t1, file1, 1, 2, 3
> t1, file2, 2, 2, 2, 2
> t2, file1, 5, 5, 5
> t2, file2, 1, 1, 2, 2
>
> and i want to achieve the output like below rows which is a vertical
> addition of the corresponding numbers.
>
> *Output*
> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>
> I am in a spark streaming context and i am having a hard time trying to
> figure out the way to group by file name.
>
> It seems like i will need to use something like below, i am not sure how
> to get to the correct syntax. Any inputs will be helpful.
>
> myDStream.foreachRDD(rdd => rdd.groupBy())
>
> I know how to do the vertical sum of array of given numbers, but i am not
> sure how to feed that function to the group by.
>
>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>   counts.toList.transpose.map(_.sum)
>   }
>
> ~Thanks,
> Vinti
>


Specify number of executors in standalone cluster mode

2016-02-21 Thread Saiph Kappa
Hi,

I'm running a spark streaming application onto a spark cluster that spans 6
machines/workers. I'm using spark cluster standalone mode. Each machine has
8 cores. Is there any way to specify that I want to run my application on
all 6 machines and just use 2 cores on each machine?

Thanks


Re: Behind the scene of RDD to DataFrame

2016-02-21 Thread Weiwei Zhang
Thanks a lot!

Best Regards,
Weiwei

On Sat, Feb 20, 2016 at 11:53 PM, Hemant Bhanawat 
wrote:

> toDF internally calls sqlcontext.createDataFrame which transforms the RDD
> to RDD[InternalRow]. This RDD[InternalRow] is then mapped to a dataframe.
>
> Type conversions (from scala types to catalyst types) are involved but no
> shuffling.
>
> Hemant Bhanawat 
> www.snappydata.io
>
> On Sun, Feb 21, 2016 at 11:48 AM, Weiwei Zhang 
> wrote:
>
>> Hi there,
>>
>> Could someone explain to me what is behind the scene of rdd.toDF()? More
>> importantly, will this step involve a lot of shuffles and cause the surge
>> of the size of intermediate files? Thank you.
>>
>> Best Regards,
>> Vivian
>>
>
>


Re: Evaluating spark streaming use case

2016-02-21 Thread Ted Yu
w.r.t. cleaner TTL, please see:

[SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0

FYI

On Sun, Feb 21, 2016 at 4:16 AM, Gerard Maas  wrote:

>
> It sounds like another  window operation on top of the 30-min window will
> achieve the  desired objective.
> Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl)
> to a long enough value and you will require enough resources (mem & disk)
> to keep the required data.
>
> -kr, Gerard.
>
> On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello Spark users,
>>
>> I have to aggregate messages from kafka and at some fixed interval (say
>> every half hour) update a memory persisted RDD and run some computation.
>> This computation uses last one day data. Steps are:
>>
>> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
>> - Filter the above DStream messages and keep some of them
>> - Create windows of 30 minutes on above DStream and aggregate by Key
>> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
>> - Maintain last N such RDDs in a deque persisting them on disk. While
>> adding new RDD, subtract oldest RDD from the combinedRdd.
>> - Final step consider last N such windows (of 30 minutes each) and do
>> final aggregation
>>
>> Does the above way of using spark streaming looks reasonable? Is there a
>> better way of doing the above?
>>
>> --
>> Thanks
>> Jatin
>>
>>
>


Stream group by

2016-02-21 Thread Vinti Maheshwari
Hello,

I have input lines like below

*Input*
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2

and i want to achieve the output like below rows which is a vertical
addition of the corresponding numbers.

*Output*
“file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
“file2” : [ 2+1, 2+1, 2+2, 2+2 ]

I am in a spark streaming context and i am having a hard time trying to
figure out the way to group by file name.

It seems like i will need to use something like below, i am not sure how to
get to the correct syntax. Any inputs will be helpful.

myDStream.foreachRDD(rdd => rdd.groupBy())

I know how to do the vertical sum of array of given numbers, but i am not
sure how to feed that function to the group by.

  def compute_counters(counts : ArrayBuffer[List[Int]]) = {
  counts.toList.transpose.map(_.sum)
  }

~Thanks,
Vinti


Re: spark-xml can't recognize schema

2016-02-21 Thread Dave Moyers
Make sure the xml input file is well formed (check your end tags). 

Sent from my iPhone

> On Feb 21, 2016, at 8:14 AM, Prathamesh Dharangutte  
> wrote:
> 
> This is the code I am using for parsing xml file: 
> 
> 
> 
> import org.apache.spark.{SparkConf,SparkContext}
> import org.apache.spark.sql.{DataFrame,SQLContext}
> import com.databricks.spark.xml
> 
> 
> object XmlProcessing {
> 
> def main(args : Array[String]) = {
> 
> val conf = new SparkConf()
> .setAppName("XmlProcessing")
> .setMaster("local")
> 
> val sc = new SparkContext(conf)
> val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc)
> 
> loadXMLdata(sqlContext)
> 
> }
> 
> def loadXMLdata(sqlContext : SQLContext) = {
> 
> var df : DataFrame = null
> 
> var newDf : DataFrame = null
> 
> df = sqlContext.read
> .format("com.databricks.spark.xml")
> .option("rowTag","book")
> .load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml")
> 
> df.printSchema()
> 
> 
> }
> 
> }
> 
> 
> 
> 
> 
> 
>> On Sun, Feb 21, 2016 at 7:10 PM, Sebastian Piu  
>> wrote:
>> Can you paste the code you are using?
>> 
>> 
>>> On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte  
>>> wrote:
>>> I am trying to parse xml file using spark-xml. But for some reason when i 
>>> print schema it only shows  root instead of the hierarchy. I am using 
>>> sqlcontext to read the data. I am proceeding according to this video :
>>> https://www.youtube.com/watch?v=NemEp53yGbI
>>> 
>>> The structure of xml file is somewhat like this:
>>> 
>>> 
>>>   
>>>  
>>>  
>>>  
>>> 
>>>
>>>//Some more data
>>>
>>> 
>>> 
>>> For some books there,are multiple orders i.e. large number of orders while 
>>> for some it just occurs once as empty. I use the "rowtag" attribute as 
>>> book. How do i proceed or is there any other way to tackle this problem?  
>>> Help would be much appreciated. Thank you.
> 


Re: RDD[org.apache.spark.sql.Row] filter ERROR

2016-02-21 Thread Ted Yu
I tried the following in spark-shell:

scala> val df0 = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B",
"C", "num")
df0: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more
fields]

scala> val idList = List("1", "2", "3")
idList: List[String] = List(1, 2, 3)

scala> val rdd0 = df0.rdd
rdd0: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
MapPartitionsRDD[2] at rdd at :24

scala> val schema = df0.schema
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(A,StringType,true), StructField(B,StringType,true),
StructField(C,StringType,true), StructField(num,IntegerType,false))

scala> val rdd1 = rdd0.filter(r => !idList.contains(r(3)))
rdd1: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
MapPartitionsRDD[3] at filter at :28

scala> val df1 = sqlContext.createDataFrame(rdd1, schema)
df1: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more
fields]

scala> df1.count()
res0: Long = 2

Can you disclose more of your code where the ClassNotFoundException can be
reproduced ?

On Sun, Feb 21, 2016 at 6:42 AM, Tenghuan He  wrote:

> Hi, everyone.
>
> I have a spark program, where df0 is a DataFrame
>
> val idList = List("1", "2", "3", ...)
> val rdd0 = df0.rdd
> val schema = df0.schema
> val rdd1 = rdd0.filter(r => !idList.contains(r(0)))
> val df1 = sc.createDataFrame(rdd1, schema)
>
> When I run df1.count(), I got the following Exception at the end.
>
>
> I found this 
> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-td4657.html,
>  but it doesn't work for me.
>
> If you enclose a field member of an object into a closure, not only this 
> field but also the whole outer object is enclosed into the closure. If the 
> outer object is not serializable, then RDD DAG serialization would fail. You 
> can simply reference the field member with a separate variable to workaround 
> this:
>
>
> Can anyone tell me why? Thanks in advance :)
>
>
> Thanks & Best regards
>
> Tenghuan He
>
>
> [Stage 11:>(0 + 7)
> / 19]org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 12 in stage 11.0 failed 4 times, most recent failure: Lost task 12.3
> in stage 11.0 (TID 99, 127.0.0.1): java.lang.ClassNotFoundException:
> com.blop.Updater$$anonfun$11
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at 

Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
No because you didn't say that explicitly. Can you share a sample file too?

On Sun, 21 Feb 2016, 14:34 Prathamesh Dharangutte 
wrote:

> I am using spark 1.4.0 with scala 2.10.4  and 0.3.2 of spark-xml
> Orderid is empty for some books and multiple entries of it for other
> books,did you include ‎that in your xml file?
>
> *From: *Sebastian Piu
> *Sent: *Sunday, 21 February 2016 20:00
> *To: *Prathamesh Dharangutte
> *Cc: *user@spark.apache.org
> *Subject: *Re: spark-xml can't recognize schema
>
> Just ran that code and it works fine, here is the output:
>
> What version are you using?
>
> val ctx = SQLContext.getOrCreate(sc)
> val df = ctx.read.format("com.databricks.spark.xml").option("rowTag", 
> "book").load("file:///tmp/sample.xml")
> df.printSchema()
>
> root
>  |-- name: long (nullable = true)
>  |-- orderId: long (nullable = true)
>  |-- price: long (nullable = true)
>
>
>
> On Sun, Feb 21, 2016 at 2:14 PM Prathamesh Dharangutte <
> pratham.d...@gmail.com> wrote:
>
>> This is the code I am using for parsing xml file:
>>
>>
>>
>> import org.apache.spark.{SparkConf,SparkContext}
>> import org.apache.spark.sql.{DataFrame,SQLContext}
>> import com.databricks.spark.xml
>>
>>
>> object XmlProcessing {
>>
>> def main(args : Array[String]) = {
>>
>> val conf = new SparkConf()
>> .setAppName("XmlProcessing")
>> .setMaster("local")
>>
>> val sc = new SparkContext(conf)
>> val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc)
>>
>> loadXMLdata(sqlContext)
>>
>> }
>>
>> def loadXMLdata(sqlContext : SQLContext) = {
>>
>> var df : DataFrame = null
>>
>> var newDf : DataFrame = null
>>
>> df = sqlContext.read
>> .format("com.databricks.spark.xml")
>> .option("rowTag","book")
>> .load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml")
>>
>> df.printSchema()
>>
>>
>>
>> }
>>
>> }
>>
>>
>>
>>
>>
>>
>> On Sun, Feb 21, 2016 at 7:10 PM, Sebastian Piu 
>> wrote:
>>
>>> Can you paste the code you are using?
>>>
>>> On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte <
>>> pratham.d...@gmail.com> wrote:
>>>
 I am trying to parse xml file using spark-xml. But for some reason when
 i print schema it only shows  root instead of the hierarchy. I am using
 sqlcontext to read the data. I am proceeding according to this video :
 https://www.youtube.com/watch?v=NemEp53yGbI

 The structure of xml file is somewhat like this:

 
   
  
  
  
   

//Some more data

 

 For some books there,are multiple orders i.e. large number of orders
 while for some it just occurs once as empty. I use the "rowtag" attribute
 as book. How do i proceed or is there any other way to tackle this
 problem?  Help would be much appreciated. Thank you.

>>>
>>
>


Re: spark-xml can't recognize schema

2016-02-21 Thread Prathamesh Dharangutte
  I am using spark 1.4.0 with scala 2.10.4  and 0.3.2 of spark-xmlOrderid is empty for some books and multiple entries of it for other books,did you include ‎that in your xml file? From: Sebastian PiuSent: Sunday, 21 February 2016 20:00To: Prathamesh DharangutteCc: user@spark.apache.orgSubject: Re: spark-xml can't recognize schemaJust ran that code and it works fine, here is the output:What version are you using?val ctx = SQLContext.getOrCreate(sc)val df = ctx.read.format("com.databricks.spark.xml").option("rowTag", "book").load("file:///tmp/sample.xml")df.printSchema()root |-- name: long (nullable = true) |-- orderId: long (nullable = true) |-- price: long (nullable = true)On Sun, Feb 21, 2016 at 2:14 PM Prathamesh Dharangutte  wrote:This is the code I am using for parsing xml file: import org.apache.spark.{SparkConf,SparkContext}import org.apache.spark.sql.{DataFrame,SQLContext}import com.databricks.spark.xmlobject XmlProcessing {def main(args : Array[String]) = {    val conf = new SparkConf()        .setAppName("XmlProcessing")        .setMaster("local")    val sc = new SparkContext(conf)    val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc)        loadXMLdata(sqlContext)                }def loadXMLdata(sqlContext : SQLContext) = {    var df : DataFrame = null        var newDf : DataFrame = null    df = sqlContext.read        .format("com.databricks.spark.xml")        .option("rowTag","book")        .load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml")                df.printSchema()            }}On Sun, Feb 21, 2016 at 7:10 PM, Sebastian Piu  wrote: Can you paste the code you are using?
On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte  wrote:I am trying to parse xml file using spark-xml. But for some reason when i print schema it only shows  root instead of the hierarchy. I am using sqlcontext to read the data. I am proceeding according to this video :    https://www.youtube.com/watch?v=NemEp53yGbIThe structure of xml file is somewhat like this:               //Some more data   For some books there,are multiple orders i.e. large number of orders while for some it just occurs once as empty. I use the "rowtag" attribute as book. How do i proceed or is there any other way to tackle this problem?  Help would be much appreciated. Thank you.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
Just ran that code and it works fine, here is the output:

What version are you using?

val ctx = SQLContext.getOrCreate(sc)
val df = ctx.read.format("com.databricks.spark.xml").option("rowTag",
"book").load("file:///tmp/sample.xml")
df.printSchema()

root
 |-- name: long (nullable = true)
 |-- orderId: long (nullable = true)
 |-- price: long (nullable = true)



On Sun, Feb 21, 2016 at 2:14 PM Prathamesh Dharangutte <
pratham.d...@gmail.com> wrote:

> This is the code I am using for parsing xml file:
>
>
>
> import org.apache.spark.{SparkConf,SparkContext}
> import org.apache.spark.sql.{DataFrame,SQLContext}
> import com.databricks.spark.xml
>
>
> object XmlProcessing {
>
> def main(args : Array[String]) = {
>
> val conf = new SparkConf()
> .setAppName("XmlProcessing")
> .setMaster("local")
>
> val sc = new SparkContext(conf)
> val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc)
>
> loadXMLdata(sqlContext)
>
> }
>
> def loadXMLdata(sqlContext : SQLContext) = {
>
> var df : DataFrame = null
>
> var newDf : DataFrame = null
>
> df = sqlContext.read
> .format("com.databricks.spark.xml")
> .option("rowTag","book")
> .load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml")
>
> df.printSchema()
>
>
>
> }
>
> }
>
>
>
>
>
>
> On Sun, Feb 21, 2016 at 7:10 PM, Sebastian Piu 
> wrote:
>
>> Can you paste the code you are using?
>>
>> On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte 
>> wrote:
>>
>>> I am trying to parse xml file using spark-xml. But for some reason when
>>> i print schema it only shows  root instead of the hierarchy. I am using
>>> sqlcontext to read the data. I am proceeding according to this video :
>>> https://www.youtube.com/watch?v=NemEp53yGbI
>>>
>>> The structure of xml file is somewhat like this:
>>>
>>> 
>>>   
>>>  
>>>  
>>>  
>>>   
>>>
>>>//Some more data
>>>
>>> 
>>>
>>> For some books there,are multiple orders i.e. large number of orders
>>> while for some it just occurs once as empty. I use the "rowtag" attribute
>>> as book. How do i proceed or is there any other way to tackle this
>>> problem?  Help would be much appreciated. Thank you.
>>>
>>
>


Re: spark-xml can't recognize schema

2016-02-21 Thread Prathamesh Dharangutte
This is the code I am using for parsing xml file:



import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.sql.{DataFrame,SQLContext}
import com.databricks.spark.xml


object XmlProcessing {

def main(args : Array[String]) = {

val conf = new SparkConf()
.setAppName("XmlProcessing")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc)

loadXMLdata(sqlContext)

}

def loadXMLdata(sqlContext : SQLContext) = {

var df : DataFrame = null

var newDf : DataFrame = null

df = sqlContext.read
.format("com.databricks.spark.xml")
.option("rowTag","book")
.load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml")

df.printSchema()


}

}






On Sun, Feb 21, 2016 at 7:10 PM, Sebastian Piu 
wrote:

> Can you paste the code you are using?
>
> On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte 
> wrote:
>
>> I am trying to parse xml file using spark-xml. But for some reason when i
>> print schema it only shows  root instead of the hierarchy. I am using
>> sqlcontext to read the data. I am proceeding according to this video :
>> https://www.youtube.com/watch?v=NemEp53yGbI
>>
>> The structure of xml file is somewhat like this:
>>
>> 
>>   
>>  
>>  
>>  
>>   
>>
>>//Some more data
>>
>> 
>>
>> For some books there,are multiple orders i.e. large number of orders
>> while for some it just occurs once as empty. I use the "rowtag" attribute
>> as book. How do i proceed or is there any other way to tackle this
>> problem?  Help would be much appreciated. Thank you.
>>
>


Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
Can you paste the code you are using?

On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte 
wrote:

> I am trying to parse xml file using spark-xml. But for some reason when i
> print schema it only shows  root instead of the hierarchy. I am using
> sqlcontext to read the data. I am proceeding according to this video :
> https://www.youtube.com/watch?v=NemEp53yGbI
>
> The structure of xml file is somewhat like this:
>
> 
>   
>  
>  
>  
>   
>
>//Some more data
>
> 
>
> For some books there,are multiple orders i.e. large number of orders while
> for some it just occurs once as empty. I use the "rowtag" attribute as
> book. How do i proceed or is there any other way to tackle this problem?
> Help would be much appreciated. Thank you.
>


spark-xml can't recognize schema

2016-02-21 Thread Prathamesh Dharangutte
I am trying to parse xml file using spark-xml. But for some reason when i
print schema it only shows  root instead of the hierarchy. I am using
sqlcontext to read the data. I am proceeding according to this video :
https://www.youtube.com/watch?v=NemEp53yGbI

The structure of xml file is somewhat like this:


  
 
 
 
  
   
   //Some more data
   


For some books there,are multiple orders i.e. large number of orders while
for some it just occurs once as empty. I use the "rowtag" attribute as
book. How do i proceed or is there any other way to tackle this problem?
Help would be much appreciated. Thank you.


Re: Evaluating spark streaming use case

2016-02-21 Thread Gerard Maas
It sounds like another  window operation on top of the 30-min window will
achieve the  desired objective.
Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl)
to a long enough value and you will require enough resources (mem & disk)
to keep the required data.

-kr, Gerard.

On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello Spark users,
>
> I have to aggregate messages from kafka and at some fixed interval (say
> every half hour) update a memory persisted RDD and run some computation.
> This computation uses last one day data. Steps are:
>
> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
> - Filter the above DStream messages and keep some of them
> - Create windows of 30 minutes on above DStream and aggregate by Key
> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
> - Maintain last N such RDDs in a deque persisting them on disk. While
> adding new RDD, subtract oldest RDD from the combinedRdd.
> - Final step consider last N such windows (of 30 minutes each) and do
> final aggregation
>
> Does the above way of using spark streaming looks reasonable? Is there a
> better way of doing the above?
>
> --
> Thanks
> Jatin
>
>


Fwd: Evaluating spark streaming use case

2016-02-21 Thread Jatin Kumar
Hello Spark users,

I have to aggregate messages from kafka and at some fixed interval (say
every half hour) update a memory persisted RDD and run some computation.
This computation uses last one day data. Steps are:

- Read from realtime Kafka topic X in spark streaming batches of 5 seconds
- Filter the above DStream messages and keep some of them
- Create windows of 30 minutes on above DStream and aggregate by Key
- Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
- Maintain last N such RDDs in a deque persisting them on disk. While
adding new RDD, subtract oldest RDD from the combinedRdd.
- Final step consider last N such windows (of 30 minutes each) and do final
aggregation

Does the above way of using spark streaming looks reasonable? Is there a
better way of doing the above?

--
Thanks
Jatin


filter by dict() key in pySpark

2016-02-21 Thread Franc Carter
I have a DataFrame that has a Python dict() as one of the columns. I'd like
to filter he DataFrame for those Rows that where the dict() contains a
specific value. e.g something like this:-

DF2 = DF1.filter('name' in DF1.params)

but that gives me this error

ValueError: Cannot convert column into bool: please use '&' for 'and', '|'
for 'or', '~' for 'not' when building DataFrame boolean expressions.

How do I express this correctly ?

thanks

-- 
Franc


Re: Element appear in both 2 splits of RDD after using randomSplit

2016-02-21 Thread nguyen duc tuan
That's very useful information.
The reason for weird problem is because of the non-determination of RDD
before applying randomSplit.
By caching RDD, we can make RDD become deterministic and so problem is
solved.
Thank you for your help.

2016-02-21 11:12 GMT+07:00 Ted Yu :

> Have you looked at:
> SPARK-12662 Fix DataFrame.randomSplit to avoid creating overlapping splits
>
> Cheers
>
> On Sat, Feb 20, 2016 at 7:01 PM, tuan3w  wrote:
>
>> I'm training a model using MLLib. When I try to split data into training
>> and
>> test data, I found a weird problem. I can't figure what problem is
>> happening
>> here.
>>
>> Here is my code in experiment:
>>
>> val  logData = rdd.map(x => (x._1, x._2)).distinct()
>> val ratings: RDD[Rating] = logData.map(x => Rating(x._1, x._2, 1))
>> val userProducts = ratings.map(x => (x.user, x.product))
>> val splits = userProducts.randomSplit(Array(0.7, 0.3))
>> val train = splits(0)
>> train.count() // 1660895
>> val test = splits(1)
>> test.count() // 712306
>> // test if an element appear in both splits
>> train.map(x => (x._1 + "_" + x._2, 1)).join(test.map(x => (x._1 + "_" +
>> x._2, 2))).take(5)
>> //return res153: Array[(String, (Int, Int))] = Array((1172491_2899,(1,2)),
>> (1206777_1567,(1,2)), (91828_571,(1,2)), (329210_2435,(1,2)),
>> (24356_135,(1,2)))
>>
>> If I try to save to hdfs and load RDD from HDFS this problem doesn't
>> happen.
>>
>> userProducts.map(x => x._1 + ":" +
>> x._2).saveAsTextFile("/user/tuannd/test2.txt")
>> val userProducts = sc.textFile("/user/tuannd/test2.txt").map(x => {
>> val d =x.split(":")
>> (d(0).toInt(), d(1).toInt())
>> })
>> // other steps are as same as above
>>
>> I'm using spark 1.5.2.
>> Thanks for all your help.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Element-appear-in-both-2-splits-of-RDD-after-using-randomSplit-tp26281.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


RE: how to set database in DataFrame.saveAsTable?

2016-02-21 Thread Mich Talebzadeh
I looked at doc on this. It is not clear what goes behind the scene. Very 
little documentation on it

 

First in Hive a database has to exist before it can be used so sql(“use 
mytable”) will not create a database for you. 

 

Also you cannot call your table mytable in database mytable!

 

Remember in Hive the hierarchy is the database followed by tables.

 

If you want to create a Hive database (I would not for every table) for this 
purpose you can do

 

scala> sql("create database if not exists mytable_db")

res10: org.apache.spark.sql.DataFrame = [result: string]

 

scala> sql("use mytable_db")

res12: org.apache.spark.sql.DataFrame = [result: string]

 

This puts you in the context of mytable_db database

 

If you do

 

hdfs dfs -ls /user/hive/warehouse

 

You will see a directory called mytable_db.db is created

 

/user/hive/warehouse/mytable.db

 

Then you can create a table in Hive in mytable_db if you wish.  The way I do it 
personally is to register your DF as a temporary table and do insert/select 
into Hive table

 

scala> sql("use mytable_db")

res21: org.apache.spark.sql.DataFrame = [result: string]

 

scala> """

 | CREATE TABLE mytable (

 | INVOICENUMBER  INT

 | ,PAYMENTDATEtimestamp

 | ,NETDECIMAL(20,2)

 | ,VATDECIMAL(20,2)

 | ,TOTAL  DECIMAL(20,2)

 | )

 |  COMMENT 'a test table'

 | STORED AS ORC

 | TBLPROPERTIES ( "orc.compress"="ZLIB" )

 | """

res22: String =

"

CREATE TABLE mytable (

INVOICENUMBER  INT

,PAYMENTDATEtimestamp

,NETDECIMAL(20,2)

,VATDECIMAL(20,2)

,TOTAL  DECIMAL(20,2)

)

COMMENT 'a test table'

STORED AS ORC

TBLPROPERTIES ( "orc.compress"="ZLIB" )

"

scala> sql(sqltext)

res6: org.apache.spark.sql.DataFrame = [result: string]

 

 

My DF is called “a” below so I register it as a temp table called tmp

 

a.toDF.registerTempTable("tmp")

 

Then just insert/select into Hicve table mytable from tmp

 

scala> sqltext = "INSERT INTO mytable SELECT * FROM tmp"

sqltext: String = INSERT INTO mytable SELECT * FROM tmp

 

scala> sql(sqltext)

res10: org.apache.spark.sql.DataFrame = []

 

scala> sql("select count(1) from mytable").show

+---+

|_c0|

+---+

| 65|


 

 

HTH

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com  

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Glen [mailto:cng...@gmail.com] 
Sent: 21 February 2016 03:26
To: gen tang 
Cc: user@spark.apache.org
Subject: Re: how to set database in DataFrame.saveAsTable?

 

Any example code?

 

In pyspark:

sqlContex.sql("use mytable")

my_df.saveAsTable("tmp_spark_debug", mode="overwrite")

 

1. The code above seems not register the table in hive. I have to create table 
from hdfs in hive, it reports some format error: rcformat and parquet.

2. Rerun the saveAsTable using  mode="overwrite" in saveAsTable, it reports the 
table already exists.

3. Sometimes it creates a directory in  hive/warehouse/tmp_spark_debug, not in 
hive/warehouse/mytable/tmp_spark_debug.

 

 

My goal is simple:

df.saveAsTable('blablabla')  // create a hive table in some database, then it 
can be visited by hive.

 

I tried lots of time, it seems there are lots of bug in pyspark. Or my mehtod 
is wrong?

 

2016-02-21 10:04 GMT+08:00 gen tang  >:

Hi,

 

You can use 

sqlContext.sql("use ")

before use dataframe.saveAsTable

 

Hope it could be helpful

 

Cheers

Gen

 

 

On Sun, Feb 21, 2016 at 9:55 AM, Glen  > wrote:

For dataframe in spark, so the table can be visited by hive.


 

-- 

Jacky Wang

 





 

-- 

Jacky Wang