Re: PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles

2016-07-30 Thread ayan guha
Hi

Glad that your problem is resolved. spark-submit is the recommended way of
submitting application (Pyspark internally does spark-submit)

Yes, the process remains same single node vs multiple node. However, I
would suggest to use any of the cluster mode instead of the local mode. In
single node, you can start up standalone master.

I would suggest to go through deployment section in Spark documentation.

Best
Ayan

On Sat, Jul 30, 2016 at 4:32 PM, Bhaarat Sharma <bhaara...@gmail.com> wrote:

> That worked perfectly, Thank You! Had to make few modifications to my
> script but nothing major.
>
> What is the difference in my running this via "pyspark myscript.py" vs.
> "spark-submit myscript.py --py-files dependency.py" ? Is it that the
> dependency is on all executors with the latter?
>
> Additionally, I'm currently running spark on a single box. If I had a 10
> node cluster, would the process be the same? On a 10 node cluster will the
> processing of my job split across the nodes? I should also add that the
> bulk of the processing work is being done in dependency.py.
>
> I would appreciate any resources relevant to these questions.
>
> Thanks again.
>
>
>
> On Sat, Jul 30, 2016 at 1:42 AM, Bhaarat Sharma <bhaara...@gmail.com>
> wrote:
>
>> Great, let me give that a shot.
>>
>> On Sat, Jul 30, 2016 at 1:40 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> http://spark.apache.org/docs/latest/submitting-applications.html
>>>
>>> For Python, you can use the --py-files argument of spark-submit to add
>>> .py, .zip or .egg files to be distributed with your application. If you
>>> depend on multiple Python files we recommend packaging them into a .zip
>>>  or .egg.
>>>
>>>
>>>
>>> On Sat, Jul 30, 2016 at 3:37 PM, Bhaarat Sharma <bhaara...@gmail.com>
>>> wrote:
>>>
>>>> I'm very new to Spark. Im running it on a single CentOS7 box. How would
>>>> I add a test.py to spark submit? Point to any resources would be great.
>>>> Thanks for your help.
>>>>
>>>> On Sat, Jul 30, 2016 at 1:28 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> I think you need to add test.py in spark submit so that it gets
>>>>> shipped to all executors
>>>>>
>>>>> On Sat, Jul 30, 2016 at 3:24 PM, Bhaarat Sharma <bhaara...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I am using PySpark 1.6.1. In my python program I'm using ctypes and
>>>>>> trying to load the liblept library via the liblept.so.4.0.2 file on my
>>>>>> system.
>>>>>>
>>>>>> While trying to load the library via
>>>>>> cdll.LoadLibrary("liblept.so.4.0.2") I get an error
>>>>>> : 'builtin_function_or_method' object has no attribute '__code__'
>>>>>>
>>>>>> Here are my files
>>>>>>
>>>>>> test.py
>>>>>>
>>>>>> from ctypes import *
>>>>>>
>>>>>> class FooBar:
>>>>>> def __init__(self, options=None, **kwargs):
>>>>>> if options is not None:
>>>>>> self.options = options
>>>>>>
>>>>>> def read_image_from_bytes(self, bytes):
>>>>>> return "img"
>>>>>>
>>>>>> def text_from_image(self, img):
>>>>>> self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2")
>>>>>> return "test from foobar"
>>>>>>
>>>>>>
>>>>>> spark.py
>>>>>>
>>>>>> from pyspark import SparkContext
>>>>>> import test
>>>>>> import numpy as np
>>>>>> sc = SparkContext("local", "test")
>>>>>> foo = test.FooBar()
>>>>>>
>>>>>> def file_bytes(rawdata):
>>>>>> return np.asarray(bytearray(rawdata),dtype=np.uint8)
>>>>>>
>>>>>> def do_some_with_bytes(bytes):
>>>>>> return 
>>>>>> foo.do_something_on_image(foo.read_image_from_bytes(bytes))
>>>>>>
>>>>>> images = sc.binaryFiles("/myimages/*.jpg")
>>>>>> image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata))
>>>>>> print images.values().map(image_to_text).take(1) #this gives an error
>>>>>>
>>>>>>
>>>>>> What is the way to load this library?
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: PySpark 1.6.1: 'builtin_function_or_method' object has no attribute '__code__' in Pickles

2016-07-29 Thread ayan guha
I think you need to add test.py in spark submit so that it gets shipped to
all executors

On Sat, Jul 30, 2016 at 3:24 PM, Bhaarat Sharma <bhaara...@gmail.com> wrote:

> I am using PySpark 1.6.1. In my python program I'm using ctypes and trying
> to load the liblept library via the liblept.so.4.0.2 file on my system.
>
> While trying to load the library via cdll.LoadLibrary("liblept.so.4.0.2")
> I get an error : 'builtin_function_or_method' object has no attribute
> '__code__'
>
> Here are my files
>
> test.py
>
> from ctypes import *
>
> class FooBar:
> def __init__(self, options=None, **kwargs):
> if options is not None:
> self.options = options
>
> def read_image_from_bytes(self, bytes):
> return "img"
>
> def text_from_image(self, img):
> self.leptonica = cdll.LoadLibrary("liblept.so.4.0.2")
> return "test from foobar"
>
>
> spark.py
>
> from pyspark import SparkContext
> import test
> import numpy as np
> sc = SparkContext("local", "test")
> foo = test.FooBar()
>
> def file_bytes(rawdata):
> return np.asarray(bytearray(rawdata),dtype=np.uint8)
>
> def do_some_with_bytes(bytes):
> return foo.do_something_on_image(foo.read_image_from_bytes(bytes))
>
> images = sc.binaryFiles("/myimages/*.jpg")
> image_to_text = lambda rawdata: do_some_with_bytes(file_bytes(rawdata))
> print images.values().map(image_to_text).take(1) #this gives an error
>
>
> What is the way to load this library?
>
>


-- 
Best Regards,
Ayan Guha


Re: Java Recipes for Spark

2016-07-29 Thread ayan guha
Hi

Is there anything similar with Python? Else I can create one.

On Sat, Jul 30, 2016 at 2:19 PM, Shiva Ramagopal <tr.s...@gmail.com> wrote:

> +1 for the Java love :-)
>
> On 30-Jul-2016 4:39 AM, "Renato Perini" <renato.per...@gmail.com> wrote:
>
>> Not only very useful, but finally some Java love :-)
>>
>> Thank you.
>>
>>
>> Il 29/07/2016 22:30, Jean Georges Perrin ha scritto:
>>
>>> Sorry if this looks like a shameless self promotion, but some of you
>>> asked me to say when I'll have my Java recipes for Apache Spark updated.
>>> It's done here: http://jgp.net/2016/07/22/spark-java-recipes/ and in
>>> the GitHub repo.
>>>
>>> Enjoy / have a great week-end.
>>>
>>> jg
>>>
>>>
>>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


-- 
Best Regards,
Ayan Guha


Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread ayan guha
Thanks Sumit, please post back how your test with Hbase go.



On Fri, Jul 29, 2016 at 8:06 PM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Hey Ayan,
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Well how we approached this was to broadcast the primary keys, since they
> say is better because a smaller table (we make sure that our run frequency
> is shrunk enlarged based on traffic somehow) so much so that the
> cardinality | unique delta primary keys | is a small and broadcastable
> number indeed. Then what follows next is a filter function on each executor
> which has the keys to be upserted against , all with them(I believe in
> memory, broadcast writes the keys in executor memory isnt it ? ). As in,
> that was the only optimization I could think of. with option A, as well as
> B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?
>
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> 1. is beyond doubt true, because my any column key back in time/partition
> space may get updated in the next run. So is 2, as in we make the entire
> partition pass through the filter for only updating 1 or 2-3 affected keys.
>
> I do not think with the current use case if I can ensure that keys get
> partitioned well and delta corresponds to just one partition, that will
> happen if I only and only maintain the date-wise partitions and some
> concept of recency is observed. Let me see how HBase might efficiently
> tackle this classic upsert case.
>
> Thanks,
> Sumit
>
> On Fri, Jul 29, 2016 at 3:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> This is a classic case compared to hadoop vs DWH implmentation.
>>
>> Source (Delta table): SRC. Target: TGT
>>
>> Requirement: Pure Upsert, ie just keep the latest information for each
>> key.
>>
>> Options:
>>
>> A. Create a table TGT1 as (select key,info from delta UNION ALL select
>> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
>> TGT. Not in can be written other variations using Outer Join
>> B. Assuming SRC and TGT have a timestamp,
>>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
>> OVER PARTITION BY (Key order by timestamp desc)
>>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>>
>> Both options are costly. And essentially more effort can be introduced to
>> write complex manipulations by partitioning data based on key and read only
>> partitions which are "changed". 3 issues:
>> 1.  if updates are fairly spred across keys, the scheme does not give
>> much benefit as number of partition read ~= total number of partition.
>> 2.  This scheme often shows long tail problem (Think 1 key changed in a
>> partition).
>>
>> This may be good when partition is based on keys and keys increase
>> monotonically. This adds maintenance of adding more partitions but do well
>> well to contain number of partitions read.
>>
>> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
>> history, just add timestamp in the key (in reverse). Computation engines
>> easily support HBase.
>>
>> Best
>> Ayan
>>
>> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in>
>> wrote:
>>
>>> Just a note, I had the delta_df keys for the filter as in NOT
>>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>>> efficient move enough.
>>>
>>> Thanks,
>>>
>>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> the very first run :
>>>>
>>>> glossary :
>>>>
>>>> delta_df := current run / execution changes dataframe.
>>>>
>>>> def deduplicate :
>>>> apply windowing function and group by
>>>>
>>>> def partitionDataframe(delta_df) :
>>>> get unique keys of that data frame and then return an array of data
>>>> frames each co

Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread ayan guha
This is a classic case compared to hadoop vs DWH implmentation.

Source (Delta table): SRC. Target: TGT

Requirement: Pure Upsert, ie just keep the latest information for each key.

Options:

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
  B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
  B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Both options are costly. And essentially more effort can be introduced to
write complex manipulations by partitioning data based on key and read only
partitions which are "changed". 3 issues:
1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

This may be good when partition is based on keys and keys increase
monotonically. This adds maintenance of adding more partitions but do well
well to contain number of partitions read.

My advise: Give HBase a shot. It gives UPSERT out of box. If you want
history, just add timestamp in the key (in reverse). Computation engines
easily support HBase.

Best
Ayan

On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
> udf broadcasted to all the worker nodes. Which I think is an efficient move
> enough.
>
> Thanks,
>
> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Hey,
>>
>> the very first run :
>>
>> glossary :
>>
>> delta_df := current run / execution changes dataframe.
>>
>> def deduplicate :
>> apply windowing function and group by
>>
>> def partitionDataframe(delta_df) :
>> get unique keys of that data frame and then return an array of data
>> frames each containing just that very same key as the column.
>> this will give the above dataframe partitoned as say by date column or
>> gender column or age group column etc etc.
>>
>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>> deduplicating key column ]
>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>
>> subsequent runs :
>>
>> for each partition :
>> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>> 1. load df from previous hdfs location of that partition
>> 2. filter the above df(p) where p is the partiton no. such that keys not
>> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
>> delta_df(p). done via a basic ! in UDF.
>> 3. delta_df.unionAll(filtered df above).
>> 4. persist the output of 3. as df.write.mode.format.
>>
>> Is this the right way of doing the upserts partiton wise?  all in all it
>> is taking 2 hours for inserting / upserting 5ooK records in parquet format
>> in some hdfs location where each location gets mapped to one partition.
>>
>> My spark conf specs are :
>>
>> yarn cluster mode. single node.
>> spark.executor.memory 8g
>> spark.rpc.netty.dispatcher.numThreads 2
>>
>> Thanks,
>> Sumit
>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: ORC v/s Parquet for Spark 2.0

2016-07-27 Thread ayan guha
 mind
>>>> that both formats work best if they are sorted on filter columns (which is
>>>> your responsibility) and if their optimatizations are correctly configured
>>>> (min max index, bloom filter, compression etc) .
>>>>
>>>> If you need to ingest sensor data you may want to store it first in
>>>> hbase and then batch process it in large files in Orc or parquet format.
>>>>
>>>> On 26 Jul 2016, at 04:09, janardhan shetty <janardhan...@gmail.com>
>>>> wrote:
>>>>
>>>> Just wondering advantages and disadvantages to convert data into ORC or
>>>> Parquet.
>>>>
>>>> In the documentation of Spark there are numerous examples of Parquet
>>>> format.
>>>>
>>>> Any strong reasons to chose Parquet over ORC file format ?
>>>>
>>>> Also : current data compression is bzip2
>>>>
>>>>
>>>> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>>>> This seems like biased.
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: spark sql aggregate function "Nth"

2016-07-26 Thread ayan guha
You can use rank with window function. Rank=1 is same as calling first().

Not sure how you would randomly pick records though, if there is no Nth
record. In your example, what happens if data is of only 2 rows?
On 27 Jul 2016 00:57, "Alex Nastetsky" 
wrote:

> Spark SQL has a "first" function that returns the first item in a group.
> Is there a similar function, perhaps in a third party lib, that allows you
> to return an arbitrary (e.g. 3rd) item from the group? Was thinking of
> writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is
> to be able to select a random item from the group, using random number
> generator.
>
> Thanks.
>


Re: Executors assigned to STS and number of workers in Stand Alone Mode

2016-07-25 Thread ayan guha
STS works on YARN, as a yarn-client application.

One issue: STS is not HA-supported, though there was some discussion to
make it HA similar to Hive Server. So what we did is to run sts on multiple
nodes and tie them to a load balancer. .

On Tue, Jul 26, 2016 at 8:06 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Correction.
>
> STS uses the same UI to display details about all processes running
> against it which is helpful but gets crowded
>
> :)
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 July 2016 at 22:26, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> We also should remember that STS is a pretty useful tool. With JDBC you
>> can use beeline, Zeppelin, Squirrel and other tools against it.
>>
>> One thing I like to change is the UI port that the thrift server listens
>> and you can change it at startup using spark.ui.port. This is fixed at
>> thrift startup and can only display one sql query at a time which is kind
>> not useful.
>>
>> As one can run multiple clients against STS, it is a
>> limitation that one cannot change the UI port at runtime.
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 25 July 2016 at 22:04, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> On Mon, Jul 25, 2016 at 10:57 PM, Mich Talebzadeh
>>> <mich.talebza...@gmail.com> wrote:
>>>
>>> > Yarn promises the best resource management I believe. Having said that
>>> I have not used Mesos myself.
>>>
>>> I'm glad you've mentioned it.
>>>
>>> I think Cloudera (and Hortonworks?) guys are doing a great job with
>>> bringing all the features of YARN to Spark and I think Spark on YARN
>>> shines features-wise.
>>>
>>> I'm not in a position to compare YARN vs Mesos for their resource
>>> management, but Spark on Mesos is certainly lagging behind Spark on
>>> YARN regarding the features Spark uses off the scheduler backends --
>>> security, data locality, queues, etc. (or I might be simply biased
>>> after having spent months with Spark on YARN mostly?).
>>>
>>> Jacek
>>>
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Hive and distributed sql engine

2016-07-25 Thread ayan guha
In order to use existing pg UDF, you may create a view in pg and expose the
view to hive.
Spark to database connection happens from each executors, so you must have
a connection or a pool of connection per worker. Executors of the same
worker can share connection pool.

Best
Ayan
On 25 Jul 2016 16:48, "Marco Colombo"  wrote:

> Hi all!
> Among other use cases, I want to use spark as a distributed sql engine
> via thrift server.
> I have some tables in postegres and Cassandra: I need to expose them via
> hive for custom reporting.
> Basic implementation is simple and works, but I have some concerns and
> open question:
> - is there a better approach rather than mapping a temp table as a select
> of the full table?
> - What about query setup cost? I mean, is there a way to avoid db
> connection setup costs using a pre-created connection pool?
> - is it possibile from hiveql to use functions defined in the pg database
> or should I have to rewrite them as udaf?
>
> Thanks!
>
>
>
> --
> Ing. Marco Colombo
>


Re: calculate time difference between consecutive rows

2016-07-21 Thread ayan guha
Please post your code and results. Lag will be null for the first record.
Also, what data type you are using? Are you using cast?
On 21 Jul 2016 14:28, "Divya Gehlot"  wrote:

> I have a dataset of time as shown below :
> Time1
> 07:30:23
> 07:34:34
> 07:38:23
> 07:39:12
> 07:45:20
>
> I need to find the diff between two consecutive rows
> I googled and found the *lag *function in *spark *helps in finding it .
> but its  giving me *null *in the result set.
>
> Would really appreciate the help.
>
>
> Thanks,
> Divya
>
>


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread ayan guha
Just as a rain check, saving data to hbase for analytics may not be the
best choice. Any specific reason for not using hdfs or hive?
On 20 Jul 2016 20:57, "Rabin Banerjee"  wrote:

> Hi Wei ,
>
> You can do something like this ,
>
> foreachPartition( (part) => {val conn = 
> ConnectionFactory.createConnection(HBaseConfiguration.create());val table 
> = conn.getTable(TableName.valueOf(tablename));
> //part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by line 
> put   table.put(part.toList.asJava)table.close();conn.close();
>
>
> \
>
> Now if you want to wrap it inside a DAO,its upto you. Making DAO will
> abstract thing , but ultimately going to use the same code .
>
> Note: Always use Hbase ConnectionFactory to get connection ,and dump data
> per partition basis.
>
> Regards,
> Rabin Banerjee
>
>
> On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei  wrote:
>
>> I need to write all data received from MQTT data into hbase for further
>> processing.
>>
>> They're not final result.  I also need to read the data from hbase for
>> analysis.
>>
>>
>> Is it good choice to use DAO in such situation?
>>
>>
>> Thx,
>>
>> Jared
>>
>>
>> --
>> *From:* Deepak Sharma 
>> *Sent:* Wednesday, July 20, 2016 12:34:07 PM
>> *To:* Yu Wei
>> *Cc:* spark users
>> *Subject:* Re: Is it good choice to use DAO to store results generated
>> by spark application?
>>
>>
>> I am using DAO in spark application to write the final computation to
>> Cassandra  and it performs well.
>> What kinds of issues you foresee using DAO for hbase ?
>>
>> Thanks
>> Deepak
>>
>> On 19 Jul 2016 10:04 pm, "Yu Wei"  wrote:
>>
>>> Hi guys,
>>>
>>>
>>> I write spark application and want to store results generated by spark
>>> application to hbase.
>>>
>>> Do I need to access hbase via java api directly?
>>>
>>> Or is it better choice to use DAO similar as traditional RDBMS?  I
>>> suspect that there is major performance downgrade and other negative
>>> impacts using DAO. However, I have little knowledge in this field.
>>>
>>>
>>> Any advice?
>>>
>>>
>>> Thanks,
>>>
>>> Jared
>>>
>>>
>>>
>>>
>


Re: Missing Exector Logs From Yarn After Spark Failure

2016-07-19 Thread ayan guha
If YARN log aggregation is enabled then logs will be moved to HDFS. You can
use yarn logs -applicationId  to view those logs.

On Wed, Jul 20, 2016 at 8:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> What's the value for yarn.log-aggregation.retain-seconds
> and yarn.log-aggregation-enable ?
>
> Which hadoop release are you using ?
>
> Thanks
>
> On Tue, Jul 19, 2016 at 3:23 PM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
>> I am trying to find the root cause of recent Spark application failure in
>> production. When the Spark application is running I can check NodeManager's
>> yarn.nodemanager.log-dir property to get the Spark executor container logs.
>>
>> The container has logs for both the running Spark applications
>>
>> Here is the view of the container logs: drwx--x--- 3 yarn yarn 51 Jul 19
>> 09:04 application_1467068598418_0209 drwx--x--- 5 yarn yarn 141 Jul 19
>> 09:04 application_1467068598418_0210
>>
>> But when the application is killed both the application logs are
>> automatically deleted. I have set all the log retention setting etc in Yarn
>> to a very large number. But still these logs are deleted as soon as the
>> Spark applications are crashed.
>>
>> Question: How can we retain these Spark application logs in Yarn for
>> debugging when the Spark application is crashed for some reason.
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Little idea needed

2016-07-19 Thread ayan guha
Well this one keeps cropping up in every project especially when hadoop
implemented alongside MPP.
For the fact, there is no reliable out of box update operation available in
hdfs or hive or SPARK.
Hence, one approach is what Mitch suggested, that do not update. Rather
just keep all source records, by timestamping their arrival.
Another way is, if I think an data warehouse with open and closed records,
you can create a partition in hive only for open records. So, you can
refresh that partition in every run.
On 20 Jul 2016 06:08, "Mich Talebzadeh"  wrote:

> Well this is a classic.
>
> The initial load can be done through Sqoop (outside of Spark) or through
> JDBC connection in Spark. 10 million rows in nothing.
>
> Then you have to think of updates and deletes in addition to new rows.
>
> With Sqoop you can load from the last ID in the source table, assuming
> that you have a unique key in Your Oracle table.
>
> If you have 10 new roes and I assume you know how to load these rows from
> Oracle.
>
> I suggest that you add two additional columns to your HDFS/target table,
>
> ,op_type int
> ,op_time timestamp
>
> These two columns will specify the row type op_type = 1,2,3
> INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp())
> AS op_time) when the record was added.
>
> So you will end up with two additional columns in your HDFS table compared
> to Oracle table and that will be your staging table.
>
> Of course you can do real time analytics through Oracle GoldenGate that
> read the redolog of the source table in Oracle or better Sap Replication
> Server (SRS). You will achieve real-time integration between RDBMS tables
> and Big Data.
>
> Once you have you have the staging table (immutable) and the rest is
> pretty easy. You have the full Entity Life History in this case for records
> and you can do your queries on them.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 July 2016 at 20:27, Aakash Basu  wrote:
>
>> Hi all,
>>
>> I'm trying to pull a full table from oracle, which is huge with some 10
>> million records which will be the initial load to HDFS.
>>
>> Then I will do delta loads everyday in the same folder in HDFS.
>>
>> Now, my query here is,
>>
>> DAY 0 - I did the initial load (full dump).
>>
>> DAY 1 - I'll load only that day's data which has suppose 10 records (5
>> old with some column's value altered and 5 new).
>>
>> Here, my question is, how will I push this file to HDFS through Spark
>> code, if I do append, it will create duplicates (which i don't want), if i
>> keep separate files and while using it in other program am giving the path
>> of it as folder which contains all files /. But in this case also the
>> registerTempTable will have duplicates for those 5 old rows.
>>
>> What is the BEST logic to be applied here?
>>
>> I tried to resolve this by doing a search in that file of the records if
>> matching load the new ones by deleting the old, but this will be time
>> consuming for such a huge record, right?
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>


Re: Inode for STS

2016-07-18 Thread ayan guha
Hi

Thanks for this. However, I am interested in regular deletion of temp while
server is up. Additionally, the link says it is not of use for multi-user
environment. Any other idea? is there any variation of cleaner.ttl?

On Mon, Jul 18, 2016 at 8:00 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi Ayan,
> I seem like you mention this
> https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.start.cleanup.scratchdir
> Default it was set false by default.
>
>
>
>
>
> On Jul 13, 2016, at 5:01 PM, ayan guha <guha.a...@gmail.com> wrote:
>
> Thanks Christophe. Any comment from Spark dev community member would
> really helpful on the Jira.
>
> What I saw today is shutting down the thrift server process lead to a
> clean up. Also, we started removing any empty folders from /tmp. Is there
> any other or better method?
>
> On Wed, Jul 13, 2016 at 5:25 PM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi Ayan,
>>
>> I have opened a JIRA about this issues, but there are no answer so far:
>> SPARK-15401 <https://issues.apache.org/jira/browse/SPARK-15401>
>>
>> Regards,
>> Christophe.
>>
>>
>> On 13/07/16 05:54, ayan guha wrote:
>>
>> Hi
>>
>> We are running Spark Thrift Server as a long running application.
>> However,  it looks like it is filling up /tmp/hive folder with lots of
>> small files and directories with no file in them, blowing out inode limit
>> and preventing any connection with "No Space Left in Device" issue.
>>
>> What is the best way to clean up those small files periodically?
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>> --
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Dataframe Transformation with Inner fields in Complex Datatypes.

2016-07-17 Thread ayan guha
Hi

withColumn adds the column. If you want different name, please use .alias()
function.

On Mon, Jul 18, 2016 at 2:16 AM, java bigdata <hadoopst...@gmail.com> wrote:

> Hi Team,
>
> I am facing a major issue while transforming dataframe containing complex
> datatype columns. I need to update the inner fields of complex datatype,
> for eg: converting one inner field to UPPERCASE letters, and return the
> same dataframe with new upper case values in it. Below is my issue
> description. Kindly suggest/guide me a way forward.
>
> *My suggestion: *can we have a new version of 
> *dataframe.withcolumn(,
> udf($innerfieldreference), )*,
> so that when this method gets executed, i get same dataframe with
> transformed values.
>
>
> *Issue Description:*
> Using dataframe.withColumn(,udf($colname)) for inner fields in
> struct/complex datatype, results in a new dataframe with the a new column
> appended to it. "colname" in the above argument is given as fullname with
> dot notation to access the struct/complex fields.
>
> For eg: hive table has columns: (id int, address struct buildname:string, stname:string>>, line2:string>)
>
> I need to update the inner field 'buildname'. I can select the inner field
> through dataframe as : df.select($"address.line1.buildname"), however when
> I use df.withColumn("address.line1.buildname",
> toUpperCaseUDF($"address.line1.buildname")), it is resulting in a new
> dataframe with new column: "address.line1.buildname" appended, with
> toUpperCaseUDF values from inner field buildname.
>
> How can I update the inner fields of the complex data types. Kindly
> suggest.
>
> Thanks in anticipation.
>
> Best Regards,
> Naveen Kumar.
>



-- 
Best Regards,
Ayan Guha


Re: Call http request from within Spark

2016-07-15 Thread ayan guha
Can you explain what do you mean by count never stops?
On 15 Jul 2016 00:53, "Amit Dutta"  wrote:

> Hi All,
>
>
> I have a requirement to call a rest service url for 300k customer ids.
>
> Things I have tried so far is
>
>
> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User
> Hashed LCID List.csv') #getting all the customer ids and building adds
>
> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0]))
>
> profile_rdd.count()
>
>
> #getprofile is the method to do the http call
>
> def getProfile(cust_id):
>
> api_key = 'txt'
>
> api_secret = 'yuyuy'
>
> profile_uri = 'https://profile.localytics.com/x1/customers/{}'
>
> customer_id = cust_id
>
>
> if customer_id is not None:
>
> data = requests.get(profile_uri.format(customer_id),
> auth=requests.auth.HTTPBasicAuth(api_key, api_secret))
>
> # print json.dumps(data.json(), indent=4)
>
> return data
>
>
> when I print the json dump of the data i see it returning results from the
> rest call. But the count never stops.
>
>
> Is there an efficient way of dealing this? Some post says we have to
> define a batch size etc but don't know how.
>
>
> Appreciate your help
>
>
> Regards,
>
> Amit
>
>


Re: Spark Thrift Server performance

2016-07-13 Thread ayan guha
Not really, that is not the primary intention. Our main goal is poor man's
high availability (as STS does not provide HA mechanism like HS2) :).
Additionally, we have made STS part of Ambari AUTO_START group, so Ambari
brings up STS if it goes down for some intermittent reason.



On Thu, Jul 14, 2016 at 1:38 AM, Michael Segel <msegel_had...@hotmail.com>
wrote:

> Hey, silly question?
>
> If you’re running a load balancer, are you trying to reuse the RDDs
> between jobs?
>
> TIA
> -Mike
>
> On Jul 13, 2016, at 9:08 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> My 2 cents:
>
> Yes, we are running multiple STS (we are running on different nodes, but
> you can run on same node, different ports). Using Ambari, it is really
> convenient to manage.
>
> We have set up a nginx load balancer as well pointing to both services and
> all our external BI tools connect to the load balancer.
>
> STS works as an YARN Client application, where STS is the driver.
>
>
>
> On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I need some feedback on the performance of the Spark Thrift Server (STS)
>>
>> As far I can ascertain one can start STS passing the usual spark
>> parameters
>>
>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>> --master spark://50.140.197.217:7077 \
>> --hiveconf hive.server2.thrift.port=10055 \
>> --packages  \
>> --driver-memory 2G \
>> --num-executors 2 \
>> --executor-memory 2G \
>> --conf "spark.scheduler.mode=FAIR" \
>> --conf
>> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" \
>> --jars  \
>> --conf "spark.ui.port=12345"
>>
>>
>>   And accessing it via beeline JDBC client
>>
>> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>>
>> Now the questions I have
>>
>>
>>1. What is the limit on the number of users accessing the thrift
>>server.
>>2. Clearly the thrift server can start with resource configuration.
>>In a simple way does STS act as a gateway to Spark (meaning Spark apps can
>>use their own resources) or one is limited to resource that STS offers?
>>3. Can one start multiple thrift servers
>>
>> As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
>> Indeed this is what it says:
>>
>> Connecting to jdbc:hive2://rhes564:10055
>> Connected to: Spark SQL (version 1.6.1)
>> Driver: Spark Project Core (version 1.6.1)
>> Transaction isolation: TRANSACTION_REPEATABLE_READ
>> Beeline version 1.6.1 by Apache Hive
>> 0: jdbc:hive2://rhes564:10055>
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Thrift Server performance

2016-07-13 Thread ayan guha
My 2 cents:

Yes, we are running multiple STS (we are running on different nodes, but
you can run on same node, different ports). Using Ambari, it is really
convenient to manage.

We have set up a nginx load balancer as well pointing to both services and
all our external BI tools connect to the load balancer.

STS works as an YARN Client application, where STS is the driver.



On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> I need some feedback on the performance of the Spark Thrift Server (STS)
>
> As far I can ascertain one can start STS passing the usual spark parameters
>
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master spark://50.140.197.217:7077 \
> --hiveconf hive.server2.thrift.port=10055 \
> --packages  \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-memory 2G \
> --conf "spark.scheduler.mode=FAIR" \
> --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" \
> --jars  \
> --conf "spark.ui.port=12345"
>
>
>   And accessing it via beeline JDBC client
>
> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>
> Now the questions I have
>
>
>1. What is the limit on the number of users accessing the thrift
>server.
>2. Clearly the thrift server can start with resource configuration. In
>a simple way does STS act as a gateway to Spark (meaning Spark apps can use
>their own resources) or one is limited to resource that STS offers?
>3. Can one start multiple thrift servers
>
> As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
> Indeed this is what it says:
>
> Connecting to jdbc:hive2://rhes564:10055
> Connected to: Spark SQL (version 1.6.1)
> Driver: Spark Project Core (version 1.6.1)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.6.1 by Apache Hive
> 0: jdbc:hive2://rhes564:10055>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>



-- 
Best Regards,
Ayan Guha


Spark 7736

2016-07-13 Thread ayan guha
Hi

I am facing same issue reporting on Spark 7736
<https://issues.apache.org/jira/browse/SPARK-7736> on Spark 1.6.0. Is it
any way to reopen the Jira?

Reproduction steps attached.



-- 
Best Regards,
Ayan Guha


Spark 7736.docx
Description: MS-Word 2007 document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Inode for STS

2016-07-13 Thread ayan guha
Thanks Christophe. Any comment from Spark dev community member would really
helpful on the Jira.

What I saw today is shutting down the thrift server process lead to a clean
up. Also, we started removing any empty folders from /tmp. Is there any
other or better method?

On Wed, Jul 13, 2016 at 5:25 PM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

> Hi Ayan,
>
> I have opened a JIRA about this issues, but there are no answer so far:
> SPARK-15401 <https://issues.apache.org/jira/browse/SPARK-15401>
>
> Regards,
> Christophe.
>
>
> On 13/07/16 05:54, ayan guha wrote:
>
> Hi
>
> We are running Spark Thrift Server as a long running application. However,
>  it looks like it is filling up /tmp/hive folder with lots of small files
> and directories with no file in them, blowing out inode limit and
> preventing any connection with "No Space Left in Device" issue.
>
> What is the best way to clean up those small files periodically?
>
> --
> Best Regards,
> Ayan Guha
>
>
>
> --
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>



-- 
Best Regards,
Ayan Guha


Inode for STS

2016-07-12 Thread ayan guha
Hi

We are running Spark Thrift Server as a long running application. However,
 it looks like it is filling up /tmp/hive folder with lots of small files
and directories with no file in them, blowing out inode limit and
preventing any connection with "No Space Left in Device" issue.

What is the best way to clean up those small files periodically?

-- 
Best Regards,
Ayan Guha


Fwd: Fast database with writes per second and horizontal scaling

2016-07-11 Thread ayan guha
HI

HBase is pretty neat itself. But speed is not the criteria to choose Hbase
over Cassandra (or vicey versa).. Slowness can very well because of design
issues, and unfortunately it will not help changing technology in that case
:)

I would suggest you to quantify "slow"-ness in conjunction
with infrastructure you have and I am sure good people here will help.

Best
Ayan

On Tue, Jul 12, 2016 at 3:01 PM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Anyone in Spark as well
>
> My colleague has been using Cassandra. However, he says it is too slow
> and not user friendly/
> MongodDB as a doc databases is pretty neat but not fast enough
>
> May main concern is fast writes per second and good scaling.
>
>
> Hive on Spark or Tez?
>
> How about Hbase. or anything else
>
> Any expert advice warmly acknowledged..
>
> thanking yo
>
>
> On Monday, 11 July 2016, 17:24, Ashok Kumar <ashok34...@yahoo.com> wrote:
>
>
> Hi Gurus,
>
> Advice appreciated from Hive gurus.
>
> My colleague has been using Cassandra. However, he says it is too slow
> and not user friendly/
> MongodDB as a doc databases is pretty neat but not fast enough
>
> May main concern is fast writes per second and good scaling.
>
>
> Hive on Spark or Tez?
>
> How about Hbase. or anything else
>
> Any expert advice warmly acknowledged..
>
> thanking you
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-11 Thread ayan guha
tion of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 July 2016 at 19:25, Gopal Vijayaraghavan <gop...@apache.org> wrote:
>>
>>>
>>> > Status: Finished successfully in 14.12 seconds
>>> > OK
>>> > 1
>>> > Time taken: 14.38 seconds, Fetched: 1 row(s)
>>>
>>> That might be an improvement over MR, but that still feels far too slow.
>>>
>>>
>>> Parquet numbers are in general bad in Hive, but that's because the
>>> Parquet
>>> reader gets no actual love from the devs. The community, if it wants to
>>> keep using Parquet heavily needs a Hive dev to go over to Parquet-mr and
>>> cut a significant number of memory copies out of the reader.
>>>
>>> The Spark 2.0 build for instance, has a custom Parquet reader for
>>> SparkSQL
>>> which does this. SPARK-12854 does for Spark+Parquet what Hive 2.0 does
>>> for
>>> ORC (actually, it looks more like hive's VectorizedRowBatch than
>>> Tungsten's flat layouts).
>>>
>>> But that reader cannot be used in Hive-on-Spark, because it is not a
>>> public reader impl.
>>>
>>>
>>> Not to pick an arbitrary dataset, my workhorse example is a TPC-H
>>> lineitem
>>> at 10Gb scale with a single 16 box.
>>>
>>> hive(tpch_flat_orc_10)> select max(l_discount) from lineitem;
>>> Query ID = gopal_20160711175917_f96371aa-2721-49c8-99a0-f7c4a1eacfda
>>> Total jobs = 1
>>> Launching Job 1 out of 1
>>>
>>>
>>> Status: Running (Executing on YARN cluster with App id
>>> application_1466700718395_0256)
>>>
>>>
>>> ---
>>> ---
>>> VERTICES  MODESTATUS  TOTAL  COMPLETED  RUNNING
>>> PENDING  FAILED  KILLED
>>>
>>> ---
>>> ---
>>> Map 1 ..  llap SUCCEEDED 13 130
>>> 0   0   0
>>> Reducer 2 ..  llap SUCCEEDED  1  10
>>> 0   0   0
>>>
>>> ---
>>> ---
>>> VERTICES: 02/02  [==>>] 100%  ELAPSED TIME: 0.71
>>> s
>>>
>>>
>>> ---
>>> ---
>>> Status: DAG finished successfully in 0.71 seconds
>>>
>>> Query Execution Summary
>>>
>>> ---
>>> ---
>>> OPERATIONDURATION
>>>
>>> ---
>>> ---
>>> Compile Query   0.21s
>>> Prepare Plan0.13s
>>> Submit Plan 0.34s
>>> Start DAG   0.23s
>>> Run DAG 0.71s
>>>
>>> ---
>>> ---
>>>
>>> Task Execution Summary
>>>
>>> ---
>>> ---
>>>   VERTICES   DURATION(ms)  CPU_TIME(ms)  GC_TIME(ms)  INPUT_RECORDS
>>> OUTPUT_RECORDS
>>>
>>> ---
>>> ---
>>>  Map 1 604.00 00 59,957,438
>>>   13
>>>  Reducer 2 105.00 00 13
>>>0
>>>
>>> ---
>>> ---
>>>
>>> LLAP IO Summary
>>>
>>> ---
>>> ---
>>>   VERTICES ROWGROUPS  META_HIT  META_MISS  DATA_HIT  DATA_MISS
>>> ALLOCATION
>>> USED  TOTAL_IO
>>>
>>> ---
>>> ---
>>>  Map 1  6036 01460B68.86MB
>>> 491.00MB
>>> 479.89MB 7.94s
>>>
>>> ---
>>> ---
>>>
>>> OK
>>> 0.1
>>> Time taken: 1.669 seconds, Fetched: 1 row(s)
>>> hive(tpch_flat_orc_10)>
>>>
>>>
>>> This is running against a single 16 core box & I would assume it would
>>> take <1.4s to read twice as much (13 tasks is barely touching the load
>>> factors).
>>>
>>> It would probably be a bit faster if the cache had hits, but in general
>>> 14s to read a 100M rows is nearly a magnitude off where Hive 2.2.0 is.
>>>
>>> Cheers,
>>> Gopal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-11 Thread ayan guha
Hi

When you say "Zeppelin and STS", I am assuming you mean "Spark Interpreter"
and "JDBC interpreter" respectively.

Through Zeppelin, you can either run your own spark application (by using
Zeppelin's own spark context) using spark interpreter OR you can access
STS, which  is a spark application ie separate Spark Context using JDBC
interpreter. There should not be any need for these 2 contexts to coexist.

If you want to share data, save it to hive from either context, and you
should be able to see the data from other context.

Best
Ayan



On Mon, Jul 11, 2016 at 3:00 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi Ayan,
> I tested It works fine but one more confuse is If my (technical) users
> want to write some code in zeppelin to apply thing into Hive table?
> Zeppelin and STS can’t share Spark Context that mean we need separated
> process? Is there anyway to use the same Spark Context of STS?
>
> Regards,
> Chanh
>
>
> On Jul 11, 2016, at 10:05 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
> Hi,
>
> ISTM multiple sparkcontexts are not recommended in spark.
> See: https://issues.apache.org/jira/browse/SPARK-2243
>
> // maropu
>
>
> On Mon, Jul 11, 2016 at 12:01 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on
>> YARN for few months now without much issue.
>>
>> On Mon, Jul 11, 2016 at 12:48 PM, Chanh Le <giaosu...@gmail.com> wrote:
>>
>>> Hi everybody,
>>> We are using Spark to query big data and currently we’re using Zeppelin
>>> to provide a UI for technical users.
>>> Now we also need to provide a UI for business users so we use Oracle BI
>>> tools and set up a Spark Thrift Server (STS) for it.
>>>
>>> When I run both Zeppelin and STS throw error:
>>>
>>> INFO [2016-07-11 09:40:21,905] ({pool-2-thread-4}
>>> SchedulerFactory.java[jobStarted]:131) - Job
>>> remoteInterpretJob_1468204821905 started by scheduler
>>> org.apache.zeppelin.spark.SparkInterpreter835015739
>>>  INFO [2016-07-11 09:40:21,911] ({pool-2-thread-4}
>>> Logging.scala[logInfo]:58) - Changing view acls to: giaosudau
>>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
>>> Logging.scala[logInfo]:58) - Changing modify acls to: giaosudau
>>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
>>> Logging.scala[logInfo]:58) - SecurityManager: authentication disabled; ui
>>> acls disabled; users with view permissions: Set(giaosudau); users with
>>> modify permissions: Set(giaosudau)
>>>  INFO [2016-07-11 09:40:21,918] ({pool-2-thread-4}
>>> Logging.scala[logInfo]:58) - Starting HTTP Server
>>>  INFO [2016-07-11 09:40:21,919] ({pool-2-thread-4}
>>> Server.java[doStart]:272) - jetty-8.y.z-SNAPSHOT
>>>  INFO [2016-07-11 09:40:21,920] ({pool-2-thread-4}
>>> AbstractConnector.java[doStart]:338) - Started
>>> SocketConnector@0.0.0.0:54818
>>>  INFO [2016-07-11 09:40:21,922] ({pool-2-thread-4}
>>> Logging.scala[logInfo]:58) - Successfully started service 'HTTP class
>>> server' on port 54818.
>>>  INFO [2016-07-11 09:40:22,408] ({pool-2-thread-4}
>>> SparkInterpreter.java[createSparkContext]:233) - -- Create new
>>> SparkContext local[*] ---
>>>  WARN [2016-07-11 09:40:22,411] ({pool-2-thread-4}
>>> Logging.scala[logWarning]:70) - Another SparkContext is being constructed
>>> (or threw an exception in its constructor).  This may indicate an error,
>>> since only one SparkContext may be running in this JVM (see SPARK-2243).
>>> The other SparkContext was created at:
>>>
>>> Is that mean I need to setup allow multiple context? Because It’s only
>>> test in local with local mode If I deploy on mesos cluster what would
>>> happened?
>>>
>>> Need you guys suggests some solutions for that. Thanks.
>>>
>>> Chanh
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>


-- 
Best Regards,
Ayan Guha


Re: "client / server" config

2016-07-10 Thread ayan guha
Yes, that is expected to move on. If it looks it is waiting for something,
my first instinct would be to check network connectivity such as your
cluster must have access back to your Mac to read the file (it is probably
waiting to time out)

On Mon, Jul 11, 2016 at 12:59 PM, Jean Georges Perrin <j...@jgp.net> wrote:

> Good for the file :)
>
> No it goes on... Like if it was waiting for something
>
> jg
>
>
> On Jul 10, 2016, at 22:55, ayan guha <guha.a...@gmail.com> wrote:
>
> Is this terminating the execution or spark application still runs after
> this error?
>
> One thing for sure, it is looking for local file on driver (ie your mac) @
> location: file:/Users/jgp/Documents/Data/restaurants-data.json
>
> On Mon, Jul 11, 2016 at 12:33 PM, Jean Georges Perrin <j...@jgp.net> wrote:
>
>>
>> I have my dev environment on my Mac. I have a dev Spark server on a
>> freshly installed physical Ubuntu box.
>>
>> I had some connection issues, but it is now all fine.
>>
>> In my code, running on the Mac, I have:
>>
>> 1 SparkConf conf = new SparkConf().setAppName("myapp").setMaster("
>> spark://10.0.100.120:7077");
>> 2 JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>> 3 javaSparkContext.setLogLevel("WARN");
>> 4 SQLContext sqlContext = new SQLContext(javaSparkContext);
>> 5
>> 6 // Restaurant Data
>> 7 df = sqlContext.read().option("dateFormat", "-mm-dd").json(source
>> .getLocalStorage());
>>
>>
>> 1) Clarification question: This code runs on my mac, connects to the
>> server, but line #7 assumes the file is on my mac, not on the server, right?
>>
>> 2) On line 7, I get an exception:
>>
>> 16-07-10 22:20:04:143 DEBUG  - address: jgp-MacBook-Air.local/
>> 10.0.100.100 isLoopbackAddress: false, with host 10.0.100.100
>> jgp-MacBook-Air.local
>> 16-07-10 22:20:04:240 INFO
>> org.apache.spark.sql.execution.datasources.json.JSONRelation - Listing
>> file:/Users/jgp/Documents/Data/restaurants-data.json on driver
>> 16-07-10 22:20:04:288 DEBUG org.apache.hadoop.util.Shell - Failed to
>> detect a valid hadoop home directory
>> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>> at org.apache.hadoop.util.Shell.(Shell.java:250)
>> at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
>> at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
>> FileInputFormat.java:447)
>> at org.apache.spark.sql.execution.datasources.json.JSONRelation.org
>> $apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd(JSONRelation.scala:98)
>> at
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4$$anonfun$apply$1.apply(JSONRelation.scala:115)
>> at
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4$$anonfun$apply$1.apply(JSONRelation.scala:115)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:115)
>> at
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:109)
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> Do I have to install HADOOP on the server? - I imagine that from:
>> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>>
>> TIA,
>>
>> jg
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


-- 
Best Regards,
Ayan Guha


Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-10 Thread ayan guha
Hi

Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on
YARN for few months now without much issue.

On Mon, Jul 11, 2016 at 12:48 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi everybody,
> We are using Spark to query big data and currently we’re using Zeppelin to
> provide a UI for technical users.
> Now we also need to provide a UI for business users so we use Oracle BI
> tools and set up a Spark Thrift Server (STS) for it.
>
> When I run both Zeppelin and STS throw error:
>
> INFO [2016-07-11 09:40:21,905] ({pool-2-thread-4}
> SchedulerFactory.java[jobStarted]:131) - Job
> remoteInterpretJob_1468204821905 started by scheduler
> org.apache.zeppelin.spark.SparkInterpreter835015739
>  INFO [2016-07-11 09:40:21,911] ({pool-2-thread-4}
> Logging.scala[logInfo]:58) - Changing view acls to: giaosudau
>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
> Logging.scala[logInfo]:58) - Changing modify acls to: giaosudau
>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
> Logging.scala[logInfo]:58) - SecurityManager: authentication disabled; ui
> acls disabled; users with view permissions: Set(giaosudau); users with
> modify permissions: Set(giaosudau)
>  INFO [2016-07-11 09:40:21,918] ({pool-2-thread-4}
> Logging.scala[logInfo]:58) - Starting HTTP Server
>  INFO [2016-07-11 09:40:21,919] ({pool-2-thread-4}
> Server.java[doStart]:272) - jetty-8.y.z-SNAPSHOT
>  INFO [2016-07-11 09:40:21,920] ({pool-2-thread-4}
> AbstractConnector.java[doStart]:338) - Started
> SocketConnector@0.0.0.0:54818
>  INFO [2016-07-11 09:40:21,922] ({pool-2-thread-4}
> Logging.scala[logInfo]:58) - Successfully started service 'HTTP class
> server' on port 54818.
>  INFO [2016-07-11 09:40:22,408] ({pool-2-thread-4}
> SparkInterpreter.java[createSparkContext]:233) - -- Create new
> SparkContext local[*] ---
>  WARN [2016-07-11 09:40:22,411] ({pool-2-thread-4}
> Logging.scala[logWarning]:70) - Another SparkContext is being constructed
> (or threw an exception in its constructor).  This may indicate an error,
> since only one SparkContext may be running in this JVM (see SPARK-2243).
> The other SparkContext was created at:
>
> Is that mean I need to setup allow multiple context? Because It’s only
> test in local with local mode If I deploy on mesos cluster what would
> happened?
>
> Need you guys suggests some solutions for that. Thanks.
>
> Chanh
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: "client / server" config

2016-07-10 Thread ayan guha
Is this terminating the execution or spark application still runs after
this error?

One thing for sure, it is looking for local file on driver (ie your mac) @
location: file:/Users/jgp/Documents/Data/restaurants-data.json

On Mon, Jul 11, 2016 at 12:33 PM, Jean Georges Perrin <j...@jgp.net> wrote:

>
> I have my dev environment on my Mac. I have a dev Spark server on a
> freshly installed physical Ubuntu box.
>
> I had some connection issues, but it is now all fine.
>
> In my code, running on the Mac, I have:
>
> 1 SparkConf conf = new SparkConf().setAppName("myapp").setMaster("
> spark://10.0.100.120:7077");
> 2 JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
> 3 javaSparkContext.setLogLevel("WARN");
> 4 SQLContext sqlContext = new SQLContext(javaSparkContext);
> 5
> 6 // Restaurant Data
> 7 df = sqlContext.read().option("dateFormat", "-mm-dd").json(source
> .getLocalStorage());
>
>
> 1) Clarification question: This code runs on my mac, connects to the
> server, but line #7 assumes the file is on my mac, not on the server, right?
>
> 2) On line 7, I get an exception:
>
> 16-07-10 22:20:04:143 DEBUG  - address: jgp-MacBook-Air.local/10.0.100.100
> isLoopbackAddress: false, with host 10.0.100.100 jgp-MacBook-Air.local
> 16-07-10 22:20:04:240 INFO
> org.apache.spark.sql.execution.datasources.json.JSONRelation - Listing
> file:/Users/jgp/Documents/Data/restaurants-data.json on driver
> 16-07-10 22:20:04:288 DEBUG org.apache.hadoop.util.Shell - Failed to
> detect a valid hadoop home directory
> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
> at org.apache.hadoop.util.Shell.(Shell.java:250)
> at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
> at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(
> FileInputFormat.java:447)
> at org.apache.spark.sql.execution.datasources.json.JSONRelation.org
> $apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd(JSONRelation.scala:98)
> at
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4$$anonfun$apply$1.apply(JSONRelation.scala:115)
> at
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4$$anonfun$apply$1.apply(JSONRelation.scala:115)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:115)
> at
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:109)
> at scala.Option.getOrElse(Option.scala:120)
>
> Do I have to install HADOOP on the server? - I imagine that from:
> java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>
> TIA,
>
> jg
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark as sql engine on S3

2016-07-07 Thread ayan guha
Yes, it can.

On Fri, Jul 8, 2016 at 3:03 PM, Ashok Kumar <ashok34...@yahoo.com> wrote:

> thanks so basically Spark Thrift Server runs on a port much like beeline
> that uses JDBC to connect to Hive?
>
> Can Spark thrift server access Hive tables?
>
> regards
>
>
> On Friday, 8 July 2016, 5:27, ayan guha <guha.a...@gmail.com> wrote:
>
>
> Spark Thrift Server..works as jdbc server. you can connect to it from
> any jdbc tool like squirrel
>
> On Fri, Jul 8, 2016 at 3:50 AM, Ashok Kumar <ashok34...@yahoo.com.invalid>
> wrote:
>
> Hello gurus,
>
> We are storing data externally on Amazon S3
>
> What is the optimum or best way to use Spark as SQL engine to access data
> on S3?
>
> Any info/write up will be greatly appreciated.
>
> Regards
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Any ways to connect BI tool to Spark without Hive

2016-07-07 Thread ayan guha
Yes, absolutely. You need to "save" the table using saveAsTable function.
Underlying storage is HDFS or any other storage and you are basically using
spark's embedded hive services (when you do not have hadoop in the set up).

Think STS as a JDBC server in front of your datastore. STS runs as a spark
application so you can also monitor it using Spark master UI, assuming you
are using standalone cluster.

On Fri, Jul 8, 2016 at 2:34 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi Ayan,
>
> Thanks for replying. It’s sound great. Let me check.
> One thing confuse is there any way to share things between too? I mean
> Zeppelin and Thift Server. For example: I update, insert data to a table on
> Zeppelin and external tool connect through STS can get it.
>
> Thanks & regards,
> Chanh
>
> On Jul 8, 2016, at 11:21 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> Spark Thrift does not need Hive/hadoop. STS should be your first choice if
> you are planning to integrate BI tools with Spark. It works with Zeppelin
> as well. We do all our development using Zeppelin and STS.
>
> One thing to note: many BI tools like Qliksense, Tablaue (not sure of
> oracle Bi Tool) queires and the caches data on client side. This works
> really well in real life.
>
>
> On Fri, Jul 8, 2016 at 1:58 PM, Chanh Le <giaosu...@gmail.com> wrote:
>
>> Hi Mich,
>> Thanks for replying. Currently we think we need to separate 2 groups of
>> user.
>> 1. Technical: Can write SQL
>> 2. Business: Can drag and drop fields or metrics and see the result.
>> Our stack using Zeppeline, Spark SQL to query data from Alluxio. Our data
>> current store in parquet files.* Zeppelin is using HiveContext but we
>> haven’t set up Hive and Hadoop yet*.
>>
>> I am little bit confuse in Spark Thift Server because Thift Server in
>> Spark can allow external tools connect but *is that require to set up
>> Hive and Hadoop*?
>>
>> Thanks and regards,
>> Chanh
>>
>>
>>
>> On Jul 8, 2016, at 10:49 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>> hi,
>>
>> I have not used Alluxio but it is a distributed file system much like an
>> IMDB say Oracle TimesTen. Spark is your query tool and Zeppelin is the GUI
>> interface to your Spark which basically allows you graphs with Spark
>> queries.
>>
>> You mentioned Hive so I assume your persistent storage is Hive?
>>
>> Your business are using Oracle BI tool. It is like Tableau. I assume
>> Oracle BI tool accesses a database of some sort say Oracle DW using native
>> connectivity and it may also have ODBC and JDBC connections to Hive etc.
>>
>> The issue I see here is your GUI tool Zeppelin which does the same thing
>> as Oracle BI tool. Can you please clarify below:
>>
>>
>>1. you use Hive as your database/persistent storage and use Alluxio
>>on top of Hive?
>>2. are users accessing Hive or a Data Warehouse like Oracle
>>3. Oracle BI tools are pretty mature. Zeppelin is not in the same
>>league so you have to decide which technology stack to follow
>>4. Spark should work with Oracle BI tool as well (need to check this)
>>as a fast query tool. In that case the users can use Oracle BI tool with
>>Spark as well.
>>
>> It seems to me that the issue is that users don't want to move from
>> Oracle BI tool. We had the same issue with Tableau. So you really need to
>> make that Oracle BI tool use Spark and Alluxio and leave Zeppelin at one
>> side.
>>
>> Zeppelin as I used it a while back may not do what Oracle BI tool does.
>> So the presentation layer has to be Oracle BI tool.
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 8 July 2016 at 04:19, Chanh Le <giaosu...@gmail.com> wrote:
>>
>>> Hi everyone,
>>> Currently we use Zeppelin to analytics our data and because of using SQL
>>> it’s hard to distribute for users use. But users are using some kind of
>>> Oracle BI tools to analytic because it support some kinds of drag and drop
>>> and we can do some kind of permitted for each user.
>>> Our architecture is Spark, Alluxio, Zeppelin. Because We want to share
>>> what we have done in Zeppelin to business users.
>>>
>>> Is there any way to do that?
>>>
>>> Thanks.
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark as sql engine on S3

2016-07-07 Thread ayan guha
Spark Thrift Server..works as jdbc server. you can connect to it from
any jdbc tool like squirrel

On Fri, Jul 8, 2016 at 3:50 AM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Hello gurus,
>
> We are storing data externally on Amazon S3
>
> What is the optimum or best way to use Spark as SQL engine to access data
> on S3?
>
> Any info/write up will be greatly appreciated.
>
> Regards
>



-- 
Best Regards,
Ayan Guha


Re: Any ways to connect BI tool to Spark without Hive

2016-07-07 Thread ayan guha
Hi

Spark Thrift does not need Hive/hadoop. STS should be your first choice if
you are planning to integrate BI tools with Spark. It works with Zeppelin
as well. We do all our development using Zeppelin and STS.

One thing to note: many BI tools like Qliksense, Tablaue (not sure of
oracle Bi Tool) queires and the caches data on client side. This works
really well in real life.


On Fri, Jul 8, 2016 at 1:58 PM, Chanh Le <giaosu...@gmail.com> wrote:

> Hi Mich,
> Thanks for replying. Currently we think we need to separate 2 groups of
> user.
> 1. Technical: Can write SQL
> 2. Business: Can drag and drop fields or metrics and see the result.
> Our stack using Zeppeline, Spark SQL to query data from Alluxio. Our data
> current store in parquet files.* Zeppelin is using HiveContext but we
> haven’t set up Hive and Hadoop yet*.
>
> I am little bit confuse in Spark Thift Server because Thift Server in
> Spark can allow external tools connect but *is that require to set up
> Hive and Hadoop*?
>
> Thanks and regards,
> Chanh
>
>
>
> On Jul 8, 2016, at 10:49 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
> hi,
>
> I have not used Alluxio but it is a distributed file system much like an
> IMDB say Oracle TimesTen. Spark is your query tool and Zeppelin is the GUI
> interface to your Spark which basically allows you graphs with Spark
> queries.
>
> You mentioned Hive so I assume your persistent storage is Hive?
>
> Your business are using Oracle BI tool. It is like Tableau. I assume
> Oracle BI tool accesses a database of some sort say Oracle DW using native
> connectivity and it may also have ODBC and JDBC connections to Hive etc.
>
> The issue I see here is your GUI tool Zeppelin which does the same thing
> as Oracle BI tool. Can you please clarify below:
>
>
>1. you use Hive as your database/persistent storage and use Alluxio on
>top of Hive?
>2. are users accessing Hive or a Data Warehouse like Oracle
>3. Oracle BI tools are pretty mature. Zeppelin is not in the same
>league so you have to decide which technology stack to follow
>4. Spark should work with Oracle BI tool as well (need to check this)
>as a fast query tool. In that case the users can use Oracle BI tool with
>Spark as well.
>
> It seems to me that the issue is that users don't want to move from Oracle
> BI tool. We had the same issue with Tableau. So you really need to make
> that Oracle BI tool use Spark and Alluxio and leave Zeppelin at one side.
>
> Zeppelin as I used it a while back may not do what Oracle BI tool does. So
> the presentation layer has to be Oracle BI tool.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 8 July 2016 at 04:19, Chanh Le <giaosu...@gmail.com> wrote:
>
>> Hi everyone,
>> Currently we use Zeppelin to analytics our data and because of using SQL
>> it’s hard to distribute for users use. But users are using some kind of
>> Oracle BI tools to analytic because it support some kinds of drag and drop
>> and we can do some kind of permitted for each user.
>> Our architecture is Spark, Alluxio, Zeppelin. Because We want to share
>> what we have done in Zeppelin to business users.
>>
>> Is there any way to do that?
>>
>> Thanks.
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Left outer Join issue using programmatic sql joins

2016-07-06 Thread ayan guha
   return dept;
> }
> }
>
> Input
> Emp
> 1001 aba 10
> 1002 abs 20
> 1003 abd 10
> 1004 abf 30
> 1005 abg 10
> 1006 abh 20
> 1007 abj 10
> 1008 abk 30
> 1009 abl 20
> 1010 abq 10
>
> Dept
> 10 dev
> 20 Test
> 30 IT
>
> Output
> +--+--++--++
> |deptid|id|name|deptid|deptname|
> +--+--++--++
> |10|  1001| aba|10| dev|
> |10|  1003| abd|10| dev|
> |10|  1005| abg|10| dev|
> |10|  1007| abj|10| dev|
> |10|  1010| abq|10| dev|
> |20|  1002| abs|  null|null|
> |20|  1006| abh|  null|    null|
> |20|  1009| abl|  null|null|
> |30|  1004| abf|  null|null|
> |30|  1008| abk|  null|null|
> +--+--++--++
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: add multiple columns

2016-06-26 Thread ayan guha
Can you share an example? You may want to write a sql stmt to add the
columns>?

On Sun, Jun 26, 2016 at 11:02 PM, <ndj...@gmail.com> wrote:

> Hi guy!
>
> I'm afraid you have to loop...The update of the Logical Plan is getting
> faster on Spark.
>
> Cheers,
> Ardo.
>
> Sent from my iPhone
>
> > On 26 Jun 2016, at 14:20, pseudo oduesp <pseudo20...@gmail.com> wrote:
> >
> > Hi who i can add multiple columns to data frame
> >
> > withcolumns allow to add one columns but when you  have multiple  i have
> to loop on eache columns ?
> >
> > thanks
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Partitioning in spark

2016-06-23 Thread ayan guha
You can change paralllism like following:

conf = SparkConf()
conf.set('spark.sql.shuffle.partitions',10)
sc = SparkContext(conf=conf)



On Fri, Jun 24, 2016 at 6:46 AM, Darshan Singh <darshan.m...@gmail.com>
wrote:

> Hi,
>
> My default parallelism is 100. Now I join 2 dataframes with 20 partitions
> each , joined dataframe has 100 partition. I want to know what is the way
> to keep it to 20 (except re-partition and coalesce.
>
> Also, when i join these 2 dataframes I am using 4 columns as joined
> columns. The dataframes are partitions based on first 2 columns of join and
> thus, in effect one partition should be joined corresponding joins and
> doesn't need to join with rest of partitions so why spark is shuffling all
> the data.
>
> Simialrly, when my dataframe is partitioned by col1,col2 and if i use
> group by on col1,col2,col3,col4 then why does it shuffle everything whereas
> it need to sort each partitions and then should grouping there itself.
>
> Bit confusing , I am using 1.5.1
>
> Is it fixed in future versions.
>
> Thanks
>



-- 
Best Regards,
Ayan Guha


Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-21 Thread ayan guha
I may be wrong here, but beeline is basically a client library. So you
"connect" to STS and/or HS2 using beeline.

Spark connecting to jdbc is different discussion and no way related to
beeline. When you read data from DB (Oracle, DB2 etc) then you do not use
beeline, but use jdbc connection to the DB.

#2 agree on thrift limitations, but not sure if there are any other
mechanism to share data to other systems such as API/BI Tools etc..

On Wed, Jun 22, 2016 at 1:47 PM, Michael Segel <msegel_had...@hotmail.com>
wrote:

>
> Sorry, I think you misunderstood.
> Spark can read from JDBC sources so to say using beeline as a way to
> access data is not a spark application isn’t really true.  Would you say
> the same if you were pulling data in to spark from Oracle or DB2?
> There are a couple of different design patterns and use cases where data
> could be stored in Hive yet your only access method is via a JDBC or
> Thift/Rest service.  Think also of compute / storage cluster
> implementations.
>
> WRT to #2, not exactly what I meant, by exposing the data… and there are
> limitations to the thift service…
>
> On Jun 21, 2016, at 5:44 PM, ayan guha <guha.a...@gmail.com> wrote:
>
> 1. Yes, in the sense you control number of executors from spark
> application config.
> 2. Any IO will be done from executors (never ever on driver, unless you
> explicitly call collect()). For example, connection to a DB happens one for
> each worker (and used by local executors). Also, if you run a reduceByKey
> job and write to hdfs, you will find a bunch of files were written from
> various executors. What happens when you want to expose the data to world:
> Spark Thrift Server (STS), which is a long running spark application (ie
> spark context) which can serve data from RDDs.
>
> Suppose I have a data source… like a couple of hive tables and I access
> the tables via beeline. (JDBC)  -
> This is NOT a spark application, and there is no RDD created. Beeline is
> just a jdbc client tool. You use beeline to connect to HS2 or STS.
>
> In this case… Hive generates a map/reduce job and then would stream the
> result set back to the client node where the RDD result set would be built.
>  --
> This is never true. When you connect Hive from spark, spark actually reads
> hive metastore and streams data directly from HDFS. Hive MR jobs do not
> play any role here, making spark faster than hive.
>
> HTH
>
> Ayan
>
> On Wed, Jun 22, 2016 at 9:58 AM, Michael Segel <msegel_had...@hotmail.com>
> wrote:
>
>> Ok, its at the end of the day and I’m trying to make sure I understand
>> the locale of where things are running.
>>
>> I have an application where I have to query a bunch of sources, creating
>> some RDDs and then I need to join off the RDDs and some other lookup tables.
>>
>>
>> Yarn has two modes… client and cluster.
>>
>> I get it that in cluster mode… everything is running on the cluster.
>> But in client mode, the driver is running on the edge node while the
>> workers are running on the cluster.
>>
>> When I run a sparkSQL command that generates a new RDD, does the result
>> set live on the cluster with the workers, and gets referenced by the
>> driver, or does the result set get migrated to the driver running on the
>> client? (I’m pretty sure I know the answer, but its never safe to assume
>> anything…)
>>
>> The follow up questions:
>>
>> 1) If I kill the  app running the driver on the edge node… will that
>> cause YARN to free up the cluster’s resources? (In cluster mode… that
>> doesn’t happen) What happens and how quickly?
>>
>> 1a) If using the client mode… can I spin up and spin down the number of
>> executors on the cluster? (Assuming that when I kill an executor any
>> portion of the RDDs associated with that executor are gone, however the
>> spark context is still alive on the edge node? [again assuming that the
>> spark context lives with the driver.])
>>
>> 2) Any I/O between my spark job and the outside world… (e.g. walking
>> through the data set and writing out a data set to a file) will occur on
>> the edge node where the driver is located?  (This may seem kinda silly, but
>> what happens when you want to expose the result set to the world… ? )
>>
>> Now for something slightly different…
>>
>> Suppose I have a data source… like a couple of hive tables and I access
>> the tables via beeline. (JDBC)  In this case… Hive generates a map/reduce
>> job and then would stream the result set back to the client node where the
>> RDD result set would be built.  I realize that I could run Hive on top of
>>

Re: Silly question about Yarn client vs Yarn cluster modes...

2016-06-21 Thread ayan guha
1. Yes, in the sense you control number of executors from spark application
config.
2. Any IO will be done from executors (never ever on driver, unless you
explicitly call collect()). For example, connection to a DB happens one for
each worker (and used by local executors). Also, if you run a reduceByKey
job and write to hdfs, you will find a bunch of files were written from
various executors. What happens when you want to expose the data to world:
Spark Thrift Server (STS), which is a long running spark application (ie
spark context) which can serve data from RDDs.

Suppose I have a data source… like a couple of hive tables and I access the
tables via beeline. (JDBC)  -
This is NOT a spark application, and there is no RDD created. Beeline is
just a jdbc client tool. You use beeline to connect to HS2 or STS.

In this case… Hive generates a map/reduce job and then would stream the
result set back to the client node where the RDD result set would be built.
 --
This is never true. When you connect Hive from spark, spark actually reads
hive metastore and streams data directly from HDFS. Hive MR jobs do not
play any role here, making spark faster than hive.

HTH

Ayan

On Wed, Jun 22, 2016 at 9:58 AM, Michael Segel <msegel_had...@hotmail.com>
wrote:

> Ok, its at the end of the day and I’m trying to make sure I understand the
> locale of where things are running.
>
> I have an application where I have to query a bunch of sources, creating
> some RDDs and then I need to join off the RDDs and some other lookup tables.
>
>
> Yarn has two modes… client and cluster.
>
> I get it that in cluster mode… everything is running on the cluster.
> But in client mode, the driver is running on the edge node while the
> workers are running on the cluster.
>
> When I run a sparkSQL command that generates a new RDD, does the result
> set live on the cluster with the workers, and gets referenced by the
> driver, or does the result set get migrated to the driver running on the
> client? (I’m pretty sure I know the answer, but its never safe to assume
> anything…)
>
> The follow up questions:
>
> 1) If I kill the  app running the driver on the edge node… will that cause
> YARN to free up the cluster’s resources? (In cluster mode… that doesn’t
> happen) What happens and how quickly?
>
> 1a) If using the client mode… can I spin up and spin down the number of
> executors on the cluster? (Assuming that when I kill an executor any
> portion of the RDDs associated with that executor are gone, however the
> spark context is still alive on the edge node? [again assuming that the
> spark context lives with the driver.])
>
> 2) Any I/O between my spark job and the outside world… (e.g. walking
> through the data set and writing out a data set to a file) will occur on
> the edge node where the driver is located?  (This may seem kinda silly, but
> what happens when you want to expose the result set to the world… ? )
>
> Now for something slightly different…
>
> Suppose I have a data source… like a couple of hive tables and I access
> the tables via beeline. (JDBC)  In this case… Hive generates a map/reduce
> job and then would stream the result set back to the client node where the
> RDD result set would be built.  I realize that I could run Hive on top of
> spark, but that’s a separate issue. Here the RDD will reside on the client
> only.  (That is I could in theory run this as a single spark instance.)
> If I were to run this on the cluster… then the result set would stream
> thru the beeline gate way and would reside back on the cluster sitting in
> RDDs within each executor?
>
> I realize that these are silly questions but I need to make sure that I
> know the flow of the data and where it ultimately resides.  There really is
> a method to my madness, and if I could explain it… these questions really
> would make sense. ;-)
>
> TIA,
>
> -Mike
>
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Python to Scala

2016-06-18 Thread ayan guha
Post the code..some one would be able to help (your truly included)

On Sat, Jun 18, 2016 at 4:13 PM, Yash Sharma <yash...@gmail.com> wrote:

> Couple of things that can work-
> - If you know the logic- just forget the python script and write it in
> java/scala from scratch
> - If you have python functions and libraries used- Pyspark is probably the
> best bet.
> - If you have specific questions on how to solve a particular
> implementation issue you are facing- ask it here :)
>
> Apart from that its really difficult to understand what you are asking.
>
> - Thanks, via mobile,  excuse brevity.
> On Jun 18, 2016 3:27 PM, "Aakash Basu" <raj2coo...@gmail.com> wrote:
>
>> I don't have a sound knowledge in Python and on the other hand we are
>> working on Spark on Scala, so I don't think it will be allowed to run
>> PySpark along with it, so the requirement is to convert the code to scala
>> and use it. But I'm finding it difficult.
>>
>> Did not find a better forum for help than ours. Hence this mail.
>> On 18-Jun-2016 10:39 AM, "Stephen Boesch" <java...@gmail.com> wrote:
>>
>>> What are you expecting us to do?  Yash provided a reasonable approach -
>>> based on the info you had provided in prior emails.  Otherwise you can
>>> convert it from python to spark - or find someone else who feels
>>> comfortable to do it.  That kind of inquiry would likelybe appropriate on a
>>> job board.
>>>
>>>
>>>
>>> 2016-06-17 21:47 GMT-07:00 Aakash Basu <raj2coo...@gmail.com>:
>>>
>>>> Hey,
>>>>
>>>> Our complete project is in Spark on Scala, I code in Scala for Spark,
>>>> though am new, but I know it and still learning. But I need help in
>>>> converting this code to Scala. I've nearly no knowledge in Python, hence,
>>>> requested the experts here.
>>>>
>>>> Hope you get me now.
>>>>
>>>> Thanks,
>>>> Aakash.
>>>> On 18-Jun-2016 10:07 AM, "Yash Sharma" <yash...@gmail.com> wrote:
>>>>
>>>>> You could use pyspark to run the python code on spark directly. That
>>>>> will cut the effort of learning scala.
>>>>>
>>>>> https://spark.apache.org/docs/0.9.0/python-programming-guide.html
>>>>>
>>>>> - Thanks, via mobile,  excuse brevity.
>>>>> On Jun 18, 2016 2:34 PM, "Aakash Basu" <raj2coo...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've a python code, which I want to convert to Scala for using it in
>>>>>> a Spark program. I'm not so well acquainted with python and learning 
>>>>>> scala
>>>>>> now. Any Python+Scala expert here? Can someone help me out in this 
>>>>>> please?
>>>>>>
>>>>>> Thanks & Regards,
>>>>>> Aakash.
>>>>>>
>>>>>
>>>


-- 
Best Regards,
Ayan Guha


Re: Spark SQL Errors

2016-05-31 Thread ayan guha
Unfortunately, I do not have it, as it is 3rd party code :(

But essentially I am trying to overwrite data to a hive table from a source

On Tue, May 31, 2016 at 4:01 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> ok what is the exact spark code that is causing the issue.
>
> can you show it in its entirety?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 31 May 2016 at 06:31, ayan guha <guha.a...@gmail.com> wrote:
>
>> No there is no semicolon.
>>
>> This is the query:
>>
>> 16/05/31 14:34:29 INFO SparkExecuteStatementOperation: Running query
>> 'DESCRIBE EXTENDED `sds.unhealthy_om_delta`' with
>> e24282a8-43d1-4c3a-a3f3-2645761ed40f
>>
>>
>> On Tue, May 31, 2016 at 3:10 PM, Raju Bairishetti <raju@gmail.com>
>> wrote:
>>
>>>
>>>
>>> On Tue, May 31, 2016 at 1:02 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> While running spark thrift, we are getting 2 issues.
>>>>
>>>> 1.. 16/05/31 14:36:18 WARN ThriftCLIService: Error executing statement:
>>>> org.apache.hive.service.cli.HiveSQLException:
>>>> org.apache.spark.sql.AnalysisException: Table not found:
>>>> sds.unhealthy_om_delta;
>>>>
>>>
>>> Are you using *;* (semi colon) at the end of query like
>>> *sqlcontext.sql(query;)*?   You should not mention *;* at the end of
>>> query
>>>
>>> at
>>>> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org
>>>> $apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:
>>>> 246)
>>>>
>>>> Unfortunately, the table exists and I can see it from beeline.
>>>>
>>>> This error is happening from a front end, where the front end service
>>>> is launched by a different user. However, we do not restrict read access to
>>>> anybody.
>>>>
>>>> 2. org.apache.hive.service.cli.HiveSQLException:
>>>> java.lang.RuntimeException: [1.20] failure: end of input expected
>>>>
>>>> SHOW TABLES IN sds LIKE '.*'
>>>>^
>>>> at
>>>> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org
>>>> $apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:
>>>> 246)
>>>>
>>>> It seems a pure Hive error, and looks like wrong syntax.. Any
>>>> suggestion what is the correct syntax?
>>>>
>>>> Both issues are coming while running a 3rd party tool (datameer)
>>>> connecting to Spark Thrift Server. Spark Version 1.6 on HDP 2.4.
>>>>
>>>>
>>>> TIA...
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>>
>>> www.lazada.com
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-23 Thread ayan guha
8:38,579 Stage-1 map = 91%,  reduce = 0%, Cumulative CPU
> 92.52 sec
>
> INFO  : 2016-05-23 00:28:38,579 Stage-1 map = 91%,  reduce = 0%,
> Cumulative CPU 92.52 sec
>
> 2016-05-23 00:28:44,759 Stage-1 map = 95%,  reduce = 0%, Cumulative CPU
> 97.35 sec
>
> INFO  : 2016-05-23 00:28:44,759 Stage-1 map = 95%,  reduce = 0%,
> Cumulative CPU 97.35 sec
>
> 2016-05-23 00:28:49,915 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 99.6 sec
>
> INFO  : 2016-05-23 00:28:49,915 Stage-1 map = 100%,  reduce = 0%,
> Cumulative CPU 99.6 sec
>
> 2016-05-23 00:28:54,043 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU
> 101.4 sec
>
> MapReduce Total cumulative CPU time: 1 minutes 41 seconds 400 msec
>
> Ended Job = job_1463956731753_0005
>
> MapReduce Jobs Launched:
>
> Stage-Stage-1: Map: 22  Reduce: 1   Cumulative CPU: 101.4 sec   HDFS Read:
> 5318569 HDFS Write: 46 SUCCESS
>
> Total MapReduce CPU Time Spent: 1 minutes 41 seconds 400 msec
>
> OK
>
> INFO  : 2016-05-23 00:28:54,043 Stage-1 map = 100%,  reduce = 100%,
> Cumulative CPU 101.4 sec
>
> INFO  : MapReduce Total cumulative CPU time: 1 minutes 41 seconds 400 msec
>
> INFO  : Ended Job = job_1463956731753_0005
>
> INFO  : MapReduce Jobs Launched:
>
> INFO  : Stage-Stage-1: Map: 22  Reduce: 1   Cumulative CPU: 101.4 sec
> HDFS Read: 5318569 HDFS Write: 46 SUCCESS
>
> INFO  : Total MapReduce CPU Time Spent: 1 minutes 41 seconds 400 msec
>
> INFO  : Completed executing
> command(queryId=hduser_20160523002632_9f91d42a-ea46-4a66-a589-7d39c23b41dc);
> Time taken: 142.525 seconds
>
> INFO  : OK
>
> +-++---+---+--+
>
> | c0  | c1 |  c2   |  c3   |
>
> +-++---+---+--+
>
> | 1   | 1  | 5.0005E7  | 2.8867513459481288E7  |
>
> +-++---+---+--+
>
> 1 row selected (142.744 seconds)
>
>
>
> OK Hive on map-reduce engine took 142 seconds compared to 58 seconds with
> Hive on Spark. So you can obviously gain pretty well by using Hive on Spark.
>
>
>
> Please also note that I did not use any vendor's build for this purpose. I
> compiled Spark 1.3.1 myself.
>
>
>
> HTH
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com/
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: How to perform reduce operation in the same order as partition indexes

2016-05-19 Thread ayan guha
You can add the index from mappartitionwithindex in the output and order
based on that in merge step
On 19 May 2016 13:22, "Pulasthi Supun Wickramasinghe" 
wrote:

> Hi Devs/All,
>
> I am pretty new to Spark. I have a program which does some map reduce
> operations with matrices. Here *shortrddFinal* is a of type "
> *RDD[Array[Short]]"* and consists of several partitions
>
> *var BC =
> shortrddFinal.mapPartitionsWithIndex(calculateBCInternal).reduce(mergeBC)*
>
> The map function produces a "Array[Array[Double]]" and at the reduce step
> i need to merge all the 2 dimensional double arrays produced for each
> partition into one big matrix. But i also need to keep the same order as
> the partitions. that is the 2D double array produced for partition 0 should
> be the first set of rows in the matrix and then the 2d double array
> produced for partition 1 and so on. Is there a way to enforce the order in
> the reduce step.
>
> Thanks in advance
>
> Best Regards,
> Pulasthi
> --
> Pulasthi S. Wickramasinghe
> Graduate Student  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035
>


Re: Tar File: On Spark

2016-05-19 Thread ayan guha
Hi

Thanks for the input. Can it be possible to write it in python? I think I
can use FileUti.untar from hdfs jar. But can I do it from python?
On 19 May 2016 16:57, "Sun Rui" <sunrise_...@163.com> wrote:

> 1. create a temp dir on HDFS, say “/tmp”
> 2. write a script to create in the temp dir one file for each tar file.
> Each file has only one line:
> 
> 3. Write a spark application. It is like:
>   val rdd = sc.textFile ()
>   rdd.map { line =>
>construct an untar command using the path information in “line” and
> launches the command
>   }
>
> > On May 19, 2016, at 14:42, ayan guha <guha.a...@gmail.com> wrote:
> >
> > Hi
> >
> > I have few tar files in HDFS in a single folder. each file has multiple
> files in it.
> >
> > tar1:
> >   - f1.txt
> >   - f2.txt
> > tar2:
> >   - f1.txt
> >   - f2.txt
> >
> > (each tar file will have exact same number of files, same name)
> >
> > I am trying to find a way (spark or pig) to extract them to their own
> folders.
> >
> > f1
> >   - tar1_f1.txt
> >   - tar2_f1.txt
> > f2:
> >- tar1_f2.txt
> >- tar1_f2.txt
> >
> > Any help?
> >
> >
> >
> > --
> > Best Regards,
> > Ayan Guha
>
>
>


Tar File: On Spark

2016-05-19 Thread ayan guha
Hi

I have few tar files in HDFS in a single folder. each file has multiple
files in it.

tar1:
  - f1.txt
  - f2.txt
tar2:
  - f1.txt
  - f2.txt

(each tar file will have exact same number of files, same name)

I am trying to find a way (spark or pig) to extract them to their own
folders.

f1
  - tar1_f1.txt
  - tar2_f1.txt
f2:
   - tar1_f2.txt
   - tar1_f2.txt

Any help?



-- 
Best Regards,
Ayan Guha


Pyspark with non default hive table

2016-05-10 Thread ayan guha
Hi

Can we write to non default hive table using pyspark?


Re: partitioner aware subtract

2016-05-09 Thread ayan guha
How about outer join?
On 9 May 2016 13:18, "Raghava Mutharaju"  wrote:

> Hello All,
>
> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
> (number of partitions are same for both the RDDs). We would like to
> subtract rdd2 from rdd1.
>
> The subtract code at
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
> seems to group the elements of both the RDDs using (x, null) where x is the
> element of the RDD and partition them. Then it makes use of
> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
> case, is both key and value combined). In our case, both the RDDs are
> already hash partitioned on the key of x. Can we take advantage of this by
> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
> mapPartitions() for this?
>
> We tried to broadcast rdd2 and use mapPartitions. But this turns out to be
> memory consuming and inefficient. We tried to do a local set difference
> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
> use destroy() on the broadcasted value, but it does not help.
>
> The current subtract method is slow for us. rdd1 and rdd2 are around 700MB
> each and the subtract takes around 14 seconds.
>
> Any ideas on this issue is highly appreciated.
>
> Regards,
> Raghava.
>


Re: SparkSQL with large result size

2016-05-02 Thread ayan guha
How many executors are you running? Is your partition scheme ensures data
is distributed evenly? It is possible that your data is skewed and one of
the executors failing. Maybe you can try reduce per executor memory and
increase partitions.
On 2 May 2016 14:19, "Buntu Dev"  wrote:

> I got a 10g limitation on the executors and operating on parquet dataset
> with block size 70M with 200 blocks. I keep hitting the memory limits when
> doing a 'select * from t1 order by c1 limit 100' (ie, 1M). It works if
> I limit to say 100k. What are the options to save a large dataset without
> running into memory issues?
>
> Thanks!
>


Re: Sqoop on Spark

2016-04-05 Thread ayan guha
Thanks guys for feedback.

On Wed, Apr 6, 2016 at 3:44 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> I do not think you can be more resource efficient. In the end you have to
> store the data anyway on HDFS . You have a lot of development effort for
> doing something like sqoop. Especially with error handling.
> You may create a ticket with the Sqoop guys to support Spark as an
> execution engine and maybe it is less effort to plug it in there.
> Maybe if your cluster is loaded then you may want to add more machines or
> improve the existing programs.
>
> On 06 Apr 2016, at 07:33, ayan guha <guha.a...@gmail.com> wrote:
>
> One of the reason in my mind is to avoid Map-Reduce application completely
> during ingestion, if possible. Also, I can then use Spark stand alone
> cluster to ingest, even if my hadoop cluster is heavily loaded. What you
> guys think?
>
> On Wed, Apr 6, 2016 at 3:13 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Why do you want to reimplement something which is already there?
>>
>> On 06 Apr 2016, at 06:47, ayan guha <guha.a...@gmail.com> wrote:
>>
>> Hi
>>
>> Thanks for reply. My use case is query ~40 tables from Oracle (using
>> index and incremental only) and add data to existing Hive tables. Also, it
>> would be good to have an option to create Hive table, driven by job
>> specific configuration.
>>
>> What do you think?
>>
>> Best
>> Ayan
>>
>> On Wed, Apr 6, 2016 at 2:30 PM, Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> It depends on your use case using sqoop.
>>> What's it like?
>>>
>>> // maropu
>>>
>>> On Wed, Apr 6, 2016 at 1:26 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi All
>>>>
>>>> Asking opinion: is it possible/advisable to use spark to replace what
>>>> sqoop does? Any existing project done in similar lines?
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


-- 
Best Regards,
Ayan Guha


Sqoop on Spark

2016-04-05 Thread ayan guha
Hi All

Asking opinion: is it possible/advisable to use spark to replace what sqoop
does? Any existing project done in similar lines?

-- 
Best Regards,
Ayan Guha


Spark thrift issue 8659 (changing subject)

2016-03-23 Thread ayan guha
>
> Hi All
>
> I found this issue listed in Spark Jira -
> https://issues.apache.org/jira/browse/SPARK-8659
>
> I would love to know if there are any roadmap for this? Maybe someone from
> dev group can confirm?
>
> Thank you in advance
>
> Best
> Ayan
>
>


Re: Zeppelin Integration

2016-03-23 Thread ayan guha
Hi All

After spending few more days with the issue, I finally found the issue
listed in Spark Jira - https://issues.apache.org/jira/browse/SPARK-8659

I would love to know if there are any roadmap for this? Maybe someone from
dev group can confirm?

Thank you in advance

Best
Ayan

On Thu, Mar 10, 2016 at 10:32 PM, ayan guha <guha.a...@gmail.com> wrote:

> Thanks guys for reply. Yes, Zeppelin with Spark is pretty compelling
> choice, for single user. Any pointers for using Zeppelin for multi user
> scenario? In essence, can we either (a) Use Zeppelin to connect to a long
> running Spark Application which has some pre-cached Dataframes? (b) Can
> Zeppelin user be passed down and use Ranger to implement Hive RBAC?
>
> I know I am sounding a little vague, but such is the problem state in my
> mind :) Any help will be appreciated.
>
> Best
> Ayan
>
> On Thu, Mar 10, 2016 at 9:51 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Zeppelin is pretty a good choice for Spark. It has a UI that allows you
>> to run your code. It has Interpreter where you change the connection
>> configuration. I made mine run on port 21999 (a deamon process on Linux
>> host where your spark master is running). It is pretty easy to set up and
>> run.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 10 March 2016 at 10:26, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe you need to co-locate your Zeppelin on the same node where
>>> Spark is installed. You need to specify the SPARK HOME. The master I used
>>> was YARN.
>>>
>>> Zeppelin exposes a notebook interface. A notebook can have many
>>> paragraphs. You run the paragraphs. You can mix multiple contexts in the
>>> same notebook. So first paragraph can be scala, second can be sql that uses
>>> DF from first paragraph etc. If you use a select query, the output is
>>> automatically displayed as a chart.
>>>
>>> As RDDs are bound to the context that creates them, I don't think
>>> Zeppelin can use those RDDs.
>>>
>>> I don't know if notebooks can be reused within other notebooks. It would
>>> be a nice way of doing some common preparatory work (like building these
>>> RDDs).
>>>
>>> Regards
>>> Sab
>>>
>>> On Thu, Mar 10, 2016 at 2:28 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi All
>>>>
>>>> I am writing this in order to get a fair understanding of how zeppelin
>>>> can be integrated with Spark.
>>>>
>>>> Our use case is to load few tables from a DB to Spark, run some
>>>> transformation. Once done, we want to expose data through Zeppelin for
>>>> analytics. I have few question around that to sound off any gross
>>>> architectural flaws.
>>>>
>>>> Questions:
>>>>
>>>> 1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?
>>>>
>>>> 2. What is the scope of Spark application when it is used from
>>>> Zeppelin? For example, if I have few subsequent actions in zeppelin like
>>>> map,filter,reduceByKey, filter,collect. I assume this will translate to an
>>>> application and get submitted to Spark. However, If I want to use reuse
>>>> some part of the data (for example) after first map transformation in
>>>> earlier application. Can I do it? Or will it be another application and
>>>> another spark submit?
>>>>
>>>>  In our use case data will already be loaded in RDDs. So how Zeppelin
>>>> can access it?
>>>>
>>>> 3. How can I control access on specific rdds to specific users in
>>>> Zeppelin (assuming we have implemented some way of login mechanism in
>>>> Zeppelin and we have a mapping between Zeppelin users and their LDAP
>>>> accounts). Is it even possible?
>>>>
>>>> 4. If Zeppelin is not a good choice, yet, for the use case, what are
>>>> the other alternatives?
>>>>
>>>> appreciate any help/pointers/guidance.
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Re: Spark Thriftserver

2016-03-16 Thread ayan guha
Thank you Jeff. However, I am more looking for fine grained access control.
For example: something like Ranger. Do you know if Spark thriftserver
supported by Ranger or Sentry? Or something similar? Much appreciated

On Wed, Mar 16, 2016 at 1:49 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> It's same as hive thrift server. I believe kerberos is supported.
>
> On Wed, Mar 16, 2016 at 10:48 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> so, how about implementing security? Any pointer will be helpful
>>
>> On Wed, Mar 16, 2016 at 1:44 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> The spark thrift server allow you to run hive query in spark engine. It
>>> can be used as jdbc server.
>>>
>>> On Wed, Mar 16, 2016 at 10:42 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Sorry to be dumb-head today, but what is the purpose of spark
>>>> thriftserver then? In other words, should I view spark thriftserver as a
>>>> better version of hive one (with Spark as execution engine instead of
>>>> MR/Tez) OR should I see it as a JDBC server?
>>>>
>>>> On Wed, Mar 16, 2016 at 11:44 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> spark thrift server is very similar with hive thrift server. You can
>>>>> use hive jdbc driver to access spark thrift server. AFAIK, all the 
>>>>> features
>>>>> of hive thrift server are also available in spark thrift server.
>>>>>
>>>>> On Wed, Mar 16, 2016 at 8:39 AM, ayan guha <guha.a...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> My understanding about thriftserver is to use it to expose pre-loaded
>>>>>> RDD/dataframes to tools who can connect through JDBC.
>>>>>>
>>>>>> Is there something like Spark JDBC server too? Does it do the same
>>>>>> thing? What is the difference between these two?
>>>>>>
>>>>>> How does Spark JDBC/Thrift supports security? Can we restrict certain
>>>>>> users to access certain dataframes and not the others?
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards,
Ayan Guha


Re: Zeppelin Integration

2016-03-10 Thread ayan guha
Thanks guys for reply. Yes, Zeppelin with Spark is pretty compelling
choice, for single user. Any pointers for using Zeppelin for multi user
scenario? In essence, can we either (a) Use Zeppelin to connect to a long
running Spark Application which has some pre-cached Dataframes? (b) Can
Zeppelin user be passed down and use Ranger to implement Hive RBAC?

I know I am sounding a little vague, but such is the problem state in my
mind :) Any help will be appreciated.

Best
Ayan

On Thu, Mar 10, 2016 at 9:51 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Zeppelin is pretty a good choice for Spark. It has a UI that allows you to
> run your code. It has Interpreter where you change the connection
> configuration. I made mine run on port 21999 (a deamon process on Linux
> host where your spark master is running). It is pretty easy to set up and
> run.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 March 2016 at 10:26, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I believe you need to co-locate your Zeppelin on the same node where
>> Spark is installed. You need to specify the SPARK HOME. The master I used
>> was YARN.
>>
>> Zeppelin exposes a notebook interface. A notebook can have many
>> paragraphs. You run the paragraphs. You can mix multiple contexts in the
>> same notebook. So first paragraph can be scala, second can be sql that uses
>> DF from first paragraph etc. If you use a select query, the output is
>> automatically displayed as a chart.
>>
>> As RDDs are bound to the context that creates them, I don't think
>> Zeppelin can use those RDDs.
>>
>> I don't know if notebooks can be reused within other notebooks. It would
>> be a nice way of doing some common preparatory work (like building these
>> RDDs).
>>
>> Regards
>> Sab
>>
>> On Thu, Mar 10, 2016 at 2:28 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi All
>>>
>>> I am writing this in order to get a fair understanding of how zeppelin
>>> can be integrated with Spark.
>>>
>>> Our use case is to load few tables from a DB to Spark, run some
>>> transformation. Once done, we want to expose data through Zeppelin for
>>> analytics. I have few question around that to sound off any gross
>>> architectural flaws.
>>>
>>> Questions:
>>>
>>> 1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?
>>>
>>> 2. What is the scope of Spark application when it is used from Zeppelin?
>>> For example, if I have few subsequent actions in zeppelin like
>>> map,filter,reduceByKey, filter,collect. I assume this will translate to an
>>> application and get submitted to Spark. However, If I want to use reuse
>>> some part of the data (for example) after first map transformation in
>>> earlier application. Can I do it? Or will it be another application and
>>> another spark submit?
>>>
>>>  In our use case data will already be loaded in RDDs. So how Zeppelin
>>> can access it?
>>>
>>> 3. How can I control access on specific rdds to specific users in
>>> Zeppelin (assuming we have implemented some way of login mechanism in
>>> Zeppelin and we have a mapping between Zeppelin users and their LDAP
>>> accounts). Is it even possible?
>>>
>>> 4. If Zeppelin is not a good choice, yet, for the use case, what are the
>>> other alternatives?
>>>
>>> appreciate any help/pointers/guidance.
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>
>


-- 
Best Regards,
Ayan Guha


Zeppelin Integration

2016-03-10 Thread ayan guha
Hi All

I am writing this in order to get a fair understanding of how zeppelin can
be integrated with Spark.

Our use case is to load few tables from a DB to Spark, run some
transformation. Once done, we want to expose data through Zeppelin for
analytics. I have few question around that to sound off any gross
architectural flaws.

Questions:

1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?

2. What is the scope of Spark application when it is used from Zeppelin?
For example, if I have few subsequent actions in zeppelin like
map,filter,reduceByKey, filter,collect. I assume this will translate to an
application and get submitted to Spark. However, If I want to use reuse
some part of the data (for example) after first map transformation in
earlier application. Can I do it? Or will it be another application and
another spark submit?

 In our use case data will already be loaded in RDDs. So how Zeppelin can
access it?

3. How can I control access on specific rdds to specific users in Zeppelin
(assuming we have implemented some way of login mechanism in Zeppelin and
we have a mapping between Zeppelin users and their LDAP accounts). Is it
even possible?

4. If Zeppelin is not a good choice, yet, for the use case, what are the
other alternatives?

appreciate any help/pointers/guidance.


-- 
Best Regards,
Ayan Guha


Re: Output the data to external database at particular time in spark streaming

2016-03-08 Thread ayan guha
Yes if it falls within the batch. But if the requirement is flush
everything till 15th min of the hour, then it should work.
On 9 Mar 2016 04:01, "Ted Yu" <yuzhih...@gmail.com> wrote:

> That may miss the 15th minute of the hour (with non-trivial deviation),
> right ?
>
> On Tue, Mar 8, 2016 at 8:50 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Why not compare current time in every batch and it meets certain
>> condition emit the data?
>> On 9 Mar 2016 00:19, "Abhishek Anand" <abhis.anan...@gmail.com> wrote:
>>
>>> I have a spark streaming job where I am aggregating the data by doing
>>> reduceByKeyAndWindow with inverse function.
>>>
>>> I am keeping the data in memory for upto 2 hours and In order to output
>>> the reduced data to an external storage I conditionally need to puke the
>>> data to DB say at every 15th minute of the each hour.
>>>
>>> How can this be achieved.
>>>
>>>
>>> Regards,
>>> Abhi
>>>
>>
>


Re: Output the data to external database at particular time in spark streaming

2016-03-08 Thread ayan guha
Why not compare current time in every batch and it meets certain condition
emit the data?
On 9 Mar 2016 00:19, "Abhishek Anand"  wrote:

> I have a spark streaming job where I am aggregating the data by doing
> reduceByKeyAndWindow with inverse function.
>
> I am keeping the data in memory for upto 2 hours and In order to output
> the reduced data to an external storage I conditionally need to puke the
> data to DB say at every 15th minute of the each hour.
>
> How can this be achieved.
>
>
> Regards,
> Abhi
>


Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-04 Thread ayan guha
g 125 
>>>>>> non-empty blocks out of 3007 blocks
>>>>>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 
>>>>>> remote fetches in 10 ms
>>>>>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to 
>>>>>> maprnode5 has been quiet for 12 ms while there are outstanding 
>>>>>> requests. Assuming connection is dead; please adjust 
>>>>>> spark.network.timeout if this is wrong.
>>>>>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 
>>>>>> requests outstanding when connection from maprnode5 is closed
>>>>>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>>>> starting block fetches
>>>>>> java.io.IOException: Connection from maprnode5 closed
>>>>>> at 
>>>>>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>>>>>> at 
>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>> at 
>>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>> at 
>>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>> at 
>>>>>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>>>>>> at 
>>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>>>>>> at 
>>>>>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>>>>>> at 
>>>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>>> at 
>>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>> 16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch 
>>>>>> (1/3) for 6 outstanding blocks after 5000 ms
>>>>>> 16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive 
>>>>>> connection to maprnode5, creating a new one.
>>>>>> 16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in 
>>>>>> connection from maprnode5
>>>>>> java.io.IOException: Connection reset by peer
>>>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>>>> at 
>>>>>> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>>>>>> at 
>>>>>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>>>>>> at 
>>>>>> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>>>>>> at 
>>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>>>>>> at 
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>> at 
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>> at 
>>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>> at 
>>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>> 16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 
>>>>>> requests outstanding when connection from maprnode5 is closed
>>>>>> 16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while 
>>>>>> starting block fetches
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [image: What's New with Xactly]
>>>>> <http://www.xactlycorp.com/email-click/>
>>>>>
>>>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>>>> <https://www.linkedin.com/company/xactly-corporation>  [image:
>>>>> Twitter] <https://twitter.com/Xactly>  [image: Facebook]
>>>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>>>> <http://www.youtube.com/xactlycorporation>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>>
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>



-- 
Best Regards,
Ayan Guha


Re: Facing issue with floor function in spark SQL query

2016-03-04 Thread ayan guha
Most likely you are missing import of  org.apache.spark.sql.functions.

In any case, you can write your own function for floor and use it as UDF.

On Fri, Mar 4, 2016 at 7:34 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi,
>
> I load json file that has timestamp (as long in milliseconds) and several
> other attributes. I would like to group them by 5 minutes and store them as
> separate file.
>
> I am facing couple of problems here..
> 1. Using Floor function at select clause (to bucket by 5mins) gives me
> error saying "java.util.NoSuchElementException: key not found: floor". How
> do I use floor function in select clause? I see that floor method is
> available in org.apache.spark.sql.functions clause but not sure why its not
> working here.
> 2. Can I use the same in Group by clause?
> 3. How do I store them as separate file after grouping them?
>
> String logPath = "my-json.gz";
> DataFrame logdf = sqlContext.read().json(logPath);
> logdf.registerTempTable("logs");
> DataFrame bucketLogs = sqlContext.sql("Select `user.timestamp` as
> rawTimeStamp, `user.requestId` as requestId,
> *floor(`user.timestamp`/72000*) as timeBucket FROM logs");
> bucketLogs.toJSON().saveAsTextFile("target_file");
>
> Regards
> Ashok
>



-- 
Best Regards,
Ayan Guha


Re: Does pyspark still lag far behind the Scala API in terms of features

2016-03-02 Thread ayan guha
g overhead?
>>>
>>>
>>> If you are a very advanced Python shop and if you’ve in-house libraries
>>> that you have written in Python that don’t exist in Scala or some ML libs
>>> that don’t exist in the Scala version and will require fair amount of
>>> porting and gap is too large, then perhaps it makes sense to stay put with
>>> Python.
>>>
>>> However, I believe, investing (or having some members of your group)
>>> learn and invest in Scala is worthwhile for few reasons. One, you will get
>>> the performance gain, especially now with Tungsten (not sure how it relates
>>> to Python, but some other knowledgeable people on the list, please chime
>>> in). Two, since Spark is written in Scala, it gives you an enormous
>>> advantage to read sources (which are well documented and highly readable)
>>> should you have to consult or learn nuances of certain API method or action
>>> not covered comprehensively in the docs. And finally, there’s a long term
>>> benefit in learning Scala for reasons other than Spark. For example,
>>> writing other scalable and distributed applications.
>>>
>>>
>>> Particularly, we will be using Spark Streaming. I know a couple of years
>>> ago that practically forced the decision to use Scala.  Is this still the
>>> case?
>>>
>>>
>>> You’ll notice that certain APIs call are not available, at least for
>>> now, in Python.
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>>>
>>>
>>> Cheers
>>> Jules
>>>
>>> --
>>> The Best Ideas Are Simple
>>> Jules S. Damji
>>> e-mail:dmat...@comcast.net
>>> e-mail:jules.da...@gmail.com
>>>
>>> ​
>



-- 
Best Regards,
Ayan Guha


Re: Spark Integration Patterns

2016-02-28 Thread ayan guha
I believe you are looking  for something like Spark Jobserver for running
jobs & JDBC server for accessing data? I am curious to know more about it,
any further discussion will be very helpful

On Mon, Feb 29, 2016 at 6:06 AM, Luciano Resende <luckbr1...@gmail.com>
wrote:

> One option we have used in the past is to expose spark application
> functionality via REST, this would enable python or any other client that
> is capable of doing a HTTP request to integrate with your Spark application.
>
> To get you started, this might be a useful reference
>
>
> http://blog.michaelhamrah.com/2013/06/scala-web-apis-up-and-running-with-spray-and-akka/
>
>
> On Sun, Feb 28, 2016 at 10:38 AM, moshir mikael <moshir.mik...@gmail.com>
> wrote:
>
>> Ok,
>> but what do I need for the program to run.
>> In python  sparkcontext  = SparkContext(conf) only works when you have
>> spark installed locally.
>> AFAIK there is no *pyspark *package for python that you can install
>> doing pip install pyspark.
>> You actually need to install spark to get it running (e.g :
>> https://github.com/KristianHolsheimer/pyspark-setup-guide).
>>
>> Does it mean you need to install spark on the box your applications runs
>> to benefit from pyspark and this is required to connect to another remote
>> spark cluster ?
>> Am I missing something obvious ?
>>
>>
>> Le dim. 28 févr. 2016 à 19:01, Todd Nist <tsind...@gmail.com> a écrit :
>>
>>> Define your SparkConfig to set the master:
>>>
>>>   val conf = new SparkConf().setAppName(AppName)
>>> .setMaster(SparkMaster)
>>> .set()
>>>
>>> Where SparkMaster = "spark://SparkServerHost:7077".  So if your spark
>>> server hostname it "RADTech" then it would be "spark://RADTech:7077".
>>>
>>> Then when you create the SparkContext, pass the SparkConf  to it:
>>>
>>> val sparkContext = new SparkContext(conf)
>>>
>>> Then use the sparkContext for interact with the SparkMaster / Cluster.
>>> Your program basically becomes the driver.
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>> On Sun, Feb 28, 2016 at 9:25 AM, mms <moshir.mik...@gmail.com> wrote:
>>>
>>>> Hi, I cannot find a simple example showing how a typical application
>>>> can 'connect' to a remote spark cluster and interact with it. Let's say I
>>>> have a Python web application hosted somewhere *outside *a spark
>>>> cluster, with just python installed on it. How can I talk to Spark without
>>>> using a notebook, or using ssh to connect to a cluster master node ? I know
>>>> of spark-submit and spark-shell, however forking a process on a remote host
>>>> to execute a shell script seems like a lot of effort What are the
>>>> recommended ways to connect and query Spark from a remote client ? Thanks
>>>> Thx !
>>>> --
>>>> View this message in context: Spark Integration Patterns
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Integration-Patterns-tp26354.html>
>>>> Sent from the Apache Spark User List mailing list archive
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>
>>>
>>>
>
>
> --
> Luciano Resende
> http://people.apache.org/~lresende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>



-- 
Best Regards,
Ayan Guha


Re: select * from mytable where column1 in (select max(column1) from mytable)

2016-02-26 Thread ayan guha
But can't I just use HiveContext and use hive's functionality, which does
support subqueries?

On Fri, Feb 26, 2016 at 4:28 PM, Mohammad Tariq <donta...@gmail.com> wrote:

> Spark doesn't support subqueries in WHERE clause, IIRC. It supports
> subqueries only in the FROM clause as of now. See this ticket
> <https://issues.apache.org/jira/browse/SPARK-4226> for more on this.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Feb 26, 2016 at 7:01 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Why is this not working for you? Are you trying on dataframe? What error
>> are you getting?
>>
>> On Thu, Feb 25, 2016 at 10:23 PM, Ashok Kumar <
>> ashok34...@yahoo.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> What is the equivalent of this in Spark please
>>>
>>> select * from mytable where column1 in (select max(column1) from mytable)
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: select * from mytable where column1 in (select max(column1) from mytable)

2016-02-25 Thread ayan guha
Why is this not working for you? Are you trying on dataframe? What error
are you getting?

On Thu, Feb 25, 2016 at 10:23 PM, Ashok Kumar <ashok34...@yahoo.com.invalid>
wrote:

> Hi,
>
> What is the equivalent of this in Spark please
>
> select * from mytable where column1 in (select max(column1) from mytable)
>
> Thanks
>



-- 
Best Regards,
Ayan Guha


Re: Stream group by

2016-02-21 Thread ayan guha
I believe the best way would be to use reduceByKey operation.

On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>  .foreachRDD(rdd => {
>rdd.collect().foreach((fileName, valueTuple) =>  map here>)
>  })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.u...@gmail.com>
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>> def parseCoverageLine(str: String) = {
>>>   val arr = str.split(",")
>>>   ...
>>>   ...
>>>   (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>> }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>> coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>> // if exists in global map, append result else add new key
>>>
>>> // globalMap
>>> // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jku...@rocketfuelinc.com>
>>> wrote:
>>>
>>>> Hello Vinti,
>>>>
>>>> One way to get this done is you split your input line into key and
>>>> value tuple and then you can simply use groupByKey and handle the values
>>>> the way you want. For example:
>>>>
>>>> Assuming you have already split the values into a 5 tuple:
>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>  .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>>> r2._3))
>>>>
>>>> I hope that helps.
>>>>
>>>> --
>>>> Thanks
>>>> Jatin Kumar | Rocket Scientist
>>>> +91-7696741743 m
>>>>
>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>> vinti.u...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have input lines like below
>>>>>
>>>>> *Input*
>>>>> t1, file1, 1, 1, 1
>>>>> t1, file1, 1, 2, 3
>>>>> t1, file2, 2, 2, 2, 2
>>>>> t2, file1, 5, 5, 5
>>>>> t2, file2, 1, 1, 2, 2
>>>>>
>>>>> and i want to achieve the output like below rows which is a vertical
>>>>> addition of the corresponding numbers.
>>>>>
>>>>> *Output*
>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>
>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>> to figure out the way to group by file name.
>>>>>
>>>>> It seems like i will need to use something like below, i am not sure
>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>
>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>
>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>> not sure how to feed that function to the group by.
>>>>>
>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>   counts.toList.transpose.map(_.sum)
>>>>>   }
>>>>>
>>>>> ~Thanks,
>>>>> Vinti
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
Hi

You can always use RDD properties, which already has partition information.

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html


On Wed, Feb 17, 2016 at 2:36 PM, Cyril Scetbon <cyril.scet...@free.fr>
wrote:

> Your understanding is the right one (having re-read the documentation).
> Still wondering how I can verify that 5 partitions have been created. My
> job is reading from a topic in Kafka that has 5 partitions and sends the
> data to E/S. I can see that when there is one task to read from Kafka there
> are 5 tasks writing to E/S. So I'm supposing that the task reading from
> Kafka does it in // using 5 partitions and that's why there are then 5
> tasks to write to E/S. But I'm supposing ...
>
> On Feb 16, 2016, at 21:12, ayan guha <guha.a...@gmail.com> wrote:
>
> I have a slightly different understanding.
>
> Direct stream generates 1 RDD per batch, however, number of partitions in
> that RDD = number of partitions in kafka topic.
>
> On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon <cyril.scet...@free.fr>
> wrote:
>
>> Hi guys,
>>
>> I'm making some tests with Spark and Kafka using a Python script. I use
>> the second method that doesn't need any receiver (Direct Approach). It
>> should adapt the number of RDDs to the number of partitions in the topic.
>> I'm trying to verify it. What's the easiest way to verify it ? I also tried
>> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
>> on the leaders of partitions in a topic, and they are not. Can you confirm
>> that RDDs are not created depending on the location of partitions and that
>> co-locating Kafka with Spark is not a must-have or that Spark does not take
>> advantage of it ?
>>
>> As the parallelism is simplified (by creating as many RDDs as there are
>> partitions) I suppose that the biggest part of the tuning is playing with
>> KafKa partitions (not talking about network configuration or management of
>> Spark resources) ?
>>
>> Thank you
>> -----
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming with Kafka DirectStream

2016-02-16 Thread ayan guha
I have a slightly different understanding.

Direct stream generates 1 RDD per batch, however, number of partitions in
that RDD = number of partitions in kafka topic.

On Wed, Feb 17, 2016 at 12:18 PM, Cyril Scetbon <cyril.scet...@free.fr>
wrote:

> Hi guys,
>
> I'm making some tests with Spark and Kafka using a Python script. I use
> the second method that doesn't need any receiver (Direct Approach). It
> should adapt the number of RDDs to the number of partitions in the topic.
> I'm trying to verify it. What's the easiest way to verify it ? I also tried
> to co-locate Yarn, Spark and Kafka to check if RDDs are created depending
> on the leaders of partitions in a topic, and they are not. Can you confirm
> that RDDs are not created depending on the location of partitions and that
> co-locating Kafka with Spark is not a must-have or that Spark does not take
> advantage of it ?
>
> As the parallelism is simplified (by creating as many RDDs as there are
> partitions) I suppose that the biggest part of the tuning is playing with
> KafKa partitions (not talking about network configuration or management of
> Spark resources) ?
>
> Thank you
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Trying to join a registered Hive table as temporary with two Oracle tables registered as temporary in Spark

2016-02-14 Thread ayan guha
Why can't you use the jdbc in hive context? I don't think sharing data
across contexts are allowed.
On 15 Feb 2016 07:22, "Mich Talebzadeh"  wrote:

> I am intending to get a table from Hive and register it as temporary table
> in Spark.
>
>
>
> I have created contexts for both Hive and Spark as below
>
>
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> //
>
>
>
> I get the Hive table as below using HiveContext
>
>
>
> //Get the FACT table from Hive
>
> //
>
> var s = hiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> oraclehadoop.sales")
>
>
>
> s.registerTempTable("t_s")
>
>
>
> This works fine using HiveContext
>
>
>
> scala> hiveContext.sql("select count(1) from
> t_s").collect.foreach(println)
>
> [4991761]
>
>
>
> Now I use JDBC to get data from two Oracle tables and registar them as
> temporary tables using sqlContext
>
>
>
> val c = sqlContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
>
> "user" -> "sh",
>
> "password" -> "xxx"))
>
>
>
> val t = sqlContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(TIME_ID) AS TIME_ID, CALENDAR_MONTH_DESC
> FROM sh.times)",
>
> "user" -> "sh",
>
> "password" -> "sxxx"))
>
>
>
> And register them as temporary tables
>
>
>
> c.registerTempTable("t_c")
>
> t.registerTempTable("t_t")
>
> //
>
>
>
> Now trying to do SQL on three tables using sqlContext. However it cannot
> see the hive table
>
>
>
> var sqltext : String = ""
>
> sqltext = """
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 10
>
>
>
>
>
> sqlContext.sql(sqltext).collect.foreach(println)
>
>
>
> *org.apache.spark.sql.AnalysisException: no such table t_s; line 5 pos 10*
>
>
>
> I guess this is due to two  different Data Frame used. Is there any
> solution? For example can I transorm from HiveContext to sqlContext?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>


Re: Spark Error: Not enough space to cache partition rdd

2016-02-14 Thread ayan guha
Have you tried repartition to larger number of partitions? Also, I would
suggest increase number of executors and give them smaller amount of memory
each.
On 15 Feb 2016 06:49, "gustavolacerdas"  wrote:

> I have a machine with 96GB and 24 cores. I'm trying to run a k-means
> algorithm with 30GB of input data. My spark-defaults.conf file are
> configured like that: spark.driver.memory 80g spark.executor.memory 70g
> spark.network.timeout 1200s spark.rdd.compress true
> spark.broadcast.compress true But i always get this error Spark Error: Not
> enough space to cache partition rdd I changed the k-means code to persist
> the rdd.cache(MEMORY_AND_DISK) but didn't work.
> --
> View this message in context: Spark Error: Not enough space to cache
> partition rdd
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Spark Certification

2016-02-14 Thread ayan guha
Thanks. Do we have any forum or study group for certification aspirants? I
would like to join.
On 15 Feb 2016 05:53, "Olivier Girardot" 
wrote:

> It does not contain (as of yet) anything > 1.3 (for example in depth
> knowledge of the Dataframe API)
> but you need to know about all the modules (Core, Streaming, SQL, MLLib,
> GraphX)
>
> Regards,
>
> Olivier.
>
> 2016-02-11 19:31 GMT+01:00 Prem Sure :
>
>> I did recently. it includes MLib & Graphx too and I felt like exam
>> content covered all topics till 1.3 and not the > 1.3 versions of spark.
>>
>>
>> On Thu, Feb 11, 2016 at 9:39 AM, Janardhan Karri 
>> wrote:
>>
>>> I am planning to do that with databricks
>>> http://go.databricks.com/spark-certified-developer
>>>
>>> Regards,
>>> Janardhan
>>>
>>> On Thu, Feb 11, 2016 at 2:00 PM, Timothy Spann 
>>> wrote:
>>>
 I was wondering that as well.

 Also is it fully updated for 1.6?

 Tim
 http://airisdata.com/
 http://sparkdeveloper.com/


 From: naga sharathrayapati 
 Date: Wednesday, February 10, 2016 at 11:36 PM
 To: "user@spark.apache.org" 
 Subject: Spark Certification

 Hello All,

 I am planning on taking Spark Certification and I was wondering If one
 has to be well equipped with  MLib & GraphX as well or not ?

 Please advise

 Thanks

>>>
>>>
>>
>
>
> --
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: AM creation in yarn client mode

2016-02-09 Thread ayan guha
It depends on yarn-cluster and yarn-client mode.

On Wed, Feb 10, 2016 at 3:42 PM, praveen S <mylogi...@gmail.com> wrote:

> Hi,
>
> I have 2 questions when running the spark jobs on yarn in client mode :
>
> 1) Where is the AM(application master) created :
>
> A) is it created on the client where the job was submitted? i.e driver and
> AM on the same client?
> Or
> B) yarn decides where the the AM should be created?
>
> 2) Driver and AM run in different processes : is my assumption correct?
>
> Regards,
> Praveen
>



-- 
Best Regards,
Ayan Guha


Re: is Hbase Scan really need thorough Get (Hbase+solr+spark)

2016-01-19 Thread ayan guha
It is not scanning the HBase. What it is doing is looping through your list
of Row keys and fetching data for each 1 at a time.

Ex: Your solr result has 5 records, with Row Keys R1...R5.
Then list will be [R1,R2,...R5]

Then table.get(list) will do something like:

res=[]
for k in list:
 v = getFromHbaseWithRowKey(k)## This is just for
illustration, there is no such function :)
  res.add(v)
return res

On Wed, Jan 20, 2016 at 10:09 AM, beeshma r <beeshm...@gmail.com> wrote:

> Hi
>
>  I trying to integrated Hbase-solr-spark.
> Solr  is indexing all the documents from Hbase through hbase-indexer .
> Through the Spark I am manipulating all datasets .Thing is after getting
> the solrdocuments from the solr query ,it has the  rowkey and rowvalues .So
> directly i got the rowkeys and corresponding values
>
> question is 'its really need once again scan Hbase table through Get with
> rowkey from solrdocument'?
>
> example code
>
> HTable table = new HTable(conf, "");
> Get get = null;
> List list = new ArrayList();
> String url =  " ";
> SolrServer server = new HttpSolrServer(url);
> SolrQuery query = new SolrQuery(" ");
> query.setStart(0);
> query.setRows(10);
> QueryResponse response = server.query(query);
> SolrDocumentList docs = response.getResults();
> for (SolrDocument doc : docs) {
> get = new Get(Bytes.toBytes((String) doc.getFieldValue("rowkey")));
>  list.add(get);
>
>   }
>
> *Result[] res = table.get(list);//This is really need? because it takes
> extra time to scan right?*
> This piece of code i got from
> http://www.programering.com/a/MTM5kDMwATI.html
>
> please correct if anything wrong :)
>
> Thanks
> Beesh
>
>


-- 
Best Regards,
Ayan Guha


Re: Read from AWS s3 with out having to hard-code sensitive keys

2016-01-12 Thread ayan guha
On EMR, you can add fs.* params in emrfs-site.xml.

On Tue, Jan 12, 2016 at 7:27 AM, Jonathan Kelly <jonathaka...@gmail.com>
wrote:

> Yes, IAM roles are actually required now for EMR. If you use Spark on EMR
> (vs. just EC2), you get S3 configuration for free (it goes by the name
> EMRFS), and it will use your IAM role for communicating with S3. Here is
> the corresponding documentation:
> http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-fs.html
>
> On Mon, Jan 11, 2016 at 11:37 AM Matei Zaharia <matei.zaha...@gmail.com>
> wrote:
>
>> In production, I'd recommend using IAM roles to avoid having keys
>> altogether. Take a look at
>> http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
>> .
>>
>> Matei
>>
>> On Jan 11, 2016, at 11:32 AM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>> If you are on EMR, these can go into your hdfs site config. And will work
>> with Spark on YARN by default.
>>
>> Regards
>> Sab
>> On 11-Jan-2016 5:16 pm, "Krishna Rao" <krishnanj...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Is there a method for reading from s3 without having to hard-code keys?
>>> The only 2 ways I've found both require this:
>>>
>>> 1. Set conf in code e.g.:
>>> sc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "")
>>> sc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
>>> "")
>>>
>>> 2. Set keys in URL, e.g.:
>>> sc.textFile("s3n://@/bucket/test/testdata")
>>>
>>>
>>> Both if which I'm reluctant to do within production code!
>>>
>>>
>>> Cheers
>>>
>>
>>


-- 
Best Regards,
Ayan Guha


Re: pre-install 3-party Python package on spark cluster

2016-01-12 Thread ayan guha
2 cents:

1. You should use an environment management tool, such as ansible, puppet
or chef to handle this kind of use cases (and lot more, Eg what if you want
to add more nodes or to replace one bad node)
2. There are options such as -py-files to provide a zip file

On Tue, Jan 12, 2016 at 6:11 AM, Annabel Melongo <
melongo_anna...@yahoo.com.invalid> wrote:

> When you run spark submit in either client or cluster mode, you can either
> use the options --packages or -jars to automatically copy your packages to
> the worker machines.
>
> Thanks
>
>
> On Monday, January 11, 2016 12:52 PM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
>
>
> I use https://code.google.com/p/parallel-ssh/ to upgrade all my slaves
>
>
>
> From: "taotao.li" <charles.up...@gmail.com>
> Date: Sunday, January 10, 2016 at 9:50 PM
> To: "user @spark" <user@spark.apache.org>
> Subject: pre-install 3-party Python package on spark cluster
>
> I have a spark cluster, from machine-1 to machine 100, and machine-1 acts
> as
> the master.
>
> Then one day my program need use a 3-party python package which is not
> installed on every machine of the cluster.
>
> so here comes my problem: to make that 3-party python package usable on
> master and slaves, should I manually ssh to every machine and use pip to
> install that package?
>
> I believe there should be some deploy scripts or other things to make this
> grace, but I can't find anything after googling.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pre-install-3-party-Python-package-on-spark-cluster-tp25930.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Re: How to merge two large table and remove duplicates?

2016-01-08 Thread ayan guha
One option you may want to explore is writing event table in an noSQL db
such as Hbase. One inherent problem in your approach is you always need to
load either full data set or a defined number of partitions to see if the
event has already come (and no gurantee it is full proof, but lead to
unnecessary loading in most cases).

On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> Hey,
> Thank you for the answer. I checked the setting you mentioend they are all
> correct.  I noticed that in the job, there are always only 200 reducers for
> shuffle read, I believe it is setting in the sql shuffle parallism.
>
> In the doc, it mentions:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
>
> What would be the ideal number for this setting? Is it based on the
> hardware of cluster?
>
>
> Thanks,
>
> Gavin
>
> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> wrote:
>
>>
>>- I assume your parquet files are compressed. Gzip or Snappy?
>>- What spark version did you use? It seems at least 1.4. If you use
>>spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>gave me a wrong result when the data was about 300~400GB, just for a 
>> simple
>>group-by and aggregate.
>>- Did you use kyro serialization?
>>- you should have spark.shuffle.compress=true, verify it.
>>- How many tasks did you use? spark.default.parallelism=?
>>- What about this:
>>   - Read the data day by day
>>   - compute a bucket id from timestamp, e.g., the date and hour
>>   - Write into different buckets (you probably need a special writer
>>   to write data efficiently without shuffling the data).
>>   - distinct for each bucket. Because each bucket is small, spark
>>   can get it done faster than having everything in one run.
>>   - I think using groupBy (userId, timestamp) might be better than
>>   distinct. I guess distinct() will compare every field.
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> And the most frequent operation I am gonna do is find the UserID who
>>> have some events, then retrieve all the events associted with the UserID.
>>>
>>> In this case, how should I partition to speed up the process?
>>>
>>> Thanks.
>>>
>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>> wrote:
>>>
>>>> hey Ted,
>>>>
>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>> the partition.
>>>>
>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>> Day3.
>>>>
>>>> I only want to keep single Event table and each day it come so many
>>>> duplicates.
>>>>
>>>> Is there a way I could just insert into Parquet and if duplicate found,
>>>> just ignore?
>>>>
>>>> Thanks,
>>>> Gavin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Is your Parquet data source partitioned by date ?
>>>>>
>>>>> Can you dedup within partitions ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle 
>>>>>> read
>>>>>> step, which should be another 6.2TB shuffle read.
>>>>>>
>>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>>> I could do to stablize this process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>
>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>
>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>> parquet file").
>>>>>>>
>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>
>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>
>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gavin
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: pyspark dataframe: row with a minimum value of a column for each group

2016-01-05 Thread ayan guha
Yes there is. It is called window function over partitions.

Equivalent SQL would be:

select * from
 (select a,b,c, rank() over (partition by a order by b) r from df) x
where r = 1

You can register your DF as a temp table and use the sql form. Or, (>Spark
1.4) you can use window methods and their variants in Spark SQL module.

HTH

On Wed, Jan 6, 2016 at 11:56 AM, Wei Chen <wei.chen.ri...@gmail.com> wrote:

> Hi,
>
> I am trying to retrieve the rows with a minimum value of a column for each
> group. For example: the following dataframe:
>
> a | b | c
> --
> 1 | 1 | 1
> 1 | 2 | 2
> 1 | 3 | 3
> 2 | 1 | 4
> 2 | 2 | 5
> 2 | 3 | 6
> 3 | 1 | 7
> 3 | 2 | 8
> 3 | 3 | 9
> --
>
> I group by 'a', and want the rows with the smallest 'b', that is, I want
> to return the following dataframe:
>
> a | b | c
> --
> 1 | 1 | 1
> 2 | 1 | 4
> 3 | 1 | 7
> --
>
> The dataframe I have is huge so get the minimum value of b from each group
> and joining on the original dataframe is very expensive. Is there a better
> way to do this?
>
>
> Thanks,
> Wei
>
>


-- 
Best Regards,
Ayan Guha


Re: copy/mv hdfs file to another directory by spark program

2016-01-04 Thread ayan guha
My guess is No, unless you are okay to read the data and write it back
again.

On Tue, Jan 5, 2016 at 2:07 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid>
wrote:

>
> For some file on hdfs, it is necessary to copy/move it to some another
> specific hdfs  directory, and the directory name would keep unchanged.
> Just need finish it in spark program, but not hdfs commands.
> Is there any codes, it seems not to be done by searching spark doc ...
>
> Thanks in advance!
>



-- 
Best Regards,
Ayan Guha


Re: Merge rows into csv

2015-12-08 Thread ayan guha
reduceByKey would be a perfect fit for you

On Wed, Dec 9, 2015 at 4:47 AM, Krishna <research...@gmail.com> wrote:

> Hi,
>
> what is the most efficient way to perform a group-by operation in Spark
> and merge rows into csv?
>
> Here is the current RDD
> -
> ID   STATE
> -
> 1   TX
> 1NY
> 1FL
> 2CA
> 2OH
> -
>
> This is the required output:
> -
> IDCSV_STATE
> -
> 1 TX,NY,FL
> 2 CA,OH
> -
>



-- 
Best Regards,
Ayan Guha


Re: how create hbase connect?

2015-12-07 Thread ayan guha
Kindly take a look https://github.com/nerdammer/spark-hbase-connector

On Mon, Dec 7, 2015 at 10:56 PM, censj <ce...@lotuseed.com> wrote:

> hi all,
>   I want to update row on base. how to create connecting base on Rdd?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


RE: How to create dataframe from SQL Server SQL query

2015-12-07 Thread ayan guha
One more thing I feel for better maintability would be to create a dB view
and then use the view in spark. This will avoid burying complicated SQL
queries within application code.
On 8 Dec 2015 05:55, "Wang, Ningjun (LNG-NPV)" 
wrote:

> This is a very helpful article. Thanks for the help.
>
>
>
> Ningjun
>
>
>
> *From:* Sujit Pal [mailto:sujitatgt...@gmail.com]
> *Sent:* Monday, December 07, 2015 12:42 PM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* user@spark.apache.org
> *Subject:* Re: How to create dataframe from SQL Server SQL query
>
>
>
> Hi Ningjun,
>
>
>
> Haven't done this myself, saw your question and was curious about the
> answer and found this article which you might find useful:
>
>
> http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/
>
>
>
> According this article, you can pass in your SQL statement in the
> "dbtable" mapping, ie, something like:
>
>
>
> val jdbcDF = sqlContext.read.format("jdbc")
>
> .options(
>
> Map("url" -> "jdbc:postgresql:dbserver",
>
> "dbtable" -> "(select docid, title, docText from
> dbo.document where docid between 10 and 1000)"
>
> )).load
>
>
>
> -sujit
>
>
>
> On Mon, Dec 7, 2015 at 8:26 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
> How can I create a RDD from a SQL query against SQLServer database? Here
> is the example of dataframe
>
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>
>
>
>
>
> *val* jdbcDF *=* sqlContext.read.format("jdbc").options(
>
>   *Map*("url" -> "jdbc:postgresql:dbserver",
>
>   "dbtable" -> "schema.tablename")).load()
>
>
>
> This code create dataframe from a table. How can I create dataframe from a
> query, e.g. “select docid, title, docText from dbo.document where docid
> between 10 and 1000”?
>
>
>
> Ningjun
>
>
>
>
>


Re: Experiences about NoSQL databases with Spark

2015-12-06 Thread ayan guha
Hi

I have a general question. I want to do a real time aggrega*tion using
spark. I have kinesis as source and planning ES as data source. there might
be close to 2000 distinct events possible. I want to keep a runnning count
of how many times each event occurs.*

*Currently upon receiving an event I am looking up backend by the event
code (which is used as document id, so fast lookup) and adding 1 with
the* current
value.

I am worried because this process is not idempotent. To solve it, I can
keep writing each event and let ES aggregate while querying. But this seems
wasteful.Am I correct in is assumption?

I know about update and new track by state functions, but I was wondering
what is the general approach to solve this issue,? Any pointer would be
very helpful.

Best
Ayan

On Sun, Dec 6, 2015 at 6:17 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> I've had great success using Elasticsearch with Spark - the integration
> works well (both ways - reading and indexing) and ES + Kibana makes a
> powerful event / time-series storage, aggregation and data visualization
> stack.
>
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
> On Sun, Dec 6, 2015 at 9:07 AM, manasdebashiskar <poorinsp...@gmail.com>
> wrote:
>
>> Depends on your need.
>> Have you looked at Elastic search, or Accumulo or Cassandra?
>> If post processing of your data is not your motive and you want to just
>> retrieve the data later greenplum(based on postgresql) can be an
>> alternative.
>>
>> in short there are many NOSQL out there with each having different
>> project
>> maturity and feature sets.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Experiences-about-NoSQL-databases-with-Spark-tp25462p25594.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -----
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: sparkavro for PySpark 1.3

2015-12-05 Thread ayan guha
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at
>
> org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:265)
> at
> org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:305)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1108)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.sql.sources.HadoopFsRelationProvider
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>     ... 26 more
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/sparkavro-for-PySpark-1-3-tp25561p25574.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: How to get the list of available Transformations and actions for a RDD in Spark-Shell

2015-12-04 Thread ayan guha
sortByKey() is a property of pairRDD as it requires key value pair to work.
I think in scala their are transformation such as .toPairRDD().

On Sat, Dec 5, 2015 at 12:01 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Hello All -
>
> In spark-shell when we press tab after . ; we could see the
> possible list of transformations and actions.
>
> But unable to see all the list. is there any other way to get the rest of
> the list. I'm mainly looking for sortByKey()
>
> val sales_RDD = sc.textFile("Data/Scala/phone_sales.txt")
> val sales_map = sales_RDD.map(sales=>{val x=sales.split(","); (x(0),x(1))})
>
> Layout of phone_sales.txt is (Brand, #.of Phones sold)
>
> I am mainly looking for SortByKey() but when I do Sales_map or sales_RDD,
> I could see only sortBy() but not SortByKey().
>
> By the way, I am using spark 1.3.0 with CDH 5.4
>
> [image: Inline image 1]
>
>
>
> Thanks
> Gokul
>
>


-- 
Best Regards,
Ayan Guha


Re: Low Latency SQL query

2015-12-01 Thread ayan guha
You can try query push down by creating the query while creating the rdd.
On 2 Dec 2015 12:32, "Fengdong Yu"  wrote:

> It depends on many situations:
>
> 1) what’s your data format?  csv(text) or ORC/parquet?
> 2) Did you have Data warehouse to summary/cluster  your data?
>
>
> if your data is text or you query for the raw data, It should be slow,
> Spark cannot do much to optimize your job.
>
>
>
>
> On Dec 2, 2015, at 9:21 AM, Andrés Ivaldi  wrote:
>
> Mark, We have an application that use data from different kind of source,
> and we build a engine able to handle that, but cant scale with big data(we
> could but is to time expensive), and doesn't have Machine learning module,
> etc, we came across with Spark and it's looks like it have all we need,
> actually it does, but our latency is very low right now, and when we do
> some testing it took too long time for the same kind of results, always
> against RDBM which is our primary source.
>
> So, we want to expand our sources, to cvs, web service, big data, etc, we
> can extend our engine or use something like Spark, which give as power of
> clustering, different kind of source access, streaming, machine learning,
> easy extensibility and so on.
>
> On Tue, Dec 1, 2015 at 9:36 PM, Mark Hamstra 
> wrote:
>
>> I'd ask another question first: If your SQL query can be executed in a
>> performant fashion against a conventional (RDBMS?) database, why are you
>> trying to use Spark?  How you answer that question will be the key to
>> deciding among the engineering design tradeoffs to effectively use Spark or
>> some other solution.
>>
>> On Tue, Dec 1, 2015 at 4:23 PM, Andrés Ivaldi  wrote:
>>
>>> Ok, so latency problem is being generated because I'm using SQL as
>>> source? how about csv, hive, or another source?
>>>
>>> On Tue, Dec 1, 2015 at 9:18 PM, Mark Hamstra 
>>> wrote:
>>>
 It is not designed for interactive queries.


 You might want to ask the designers of Spark, Spark SQL, and
 particularly some things built on top of Spark (such as BlinkDB) about
 their intent with regard to interactive queries.  Interactive queries are
 not the only designed use of Spark, but it is going too far to claim that
 Spark is not designed at all to handle interactive queries.

 That being said, I think that you are correct to question the wisdom of
 expecting lowest-latency query response from Spark using SQL (sic,
 presumably a RDBMS is intended) as the datastore.

 On Tue, Dec 1, 2015 at 4:05 PM, Jörn Franke 
 wrote:

> Hmm it will never be faster than SQL if you use SQL as an underlying
> storage. Spark is (currently) an in-memory batch engine for iterative
> machine learning workloads. It is not designed for interactive queries.
> Currently hive is going into the direction of interactive queries.
> Alternatives are Hbase on Phoenix or Impala.
>
> On 01 Dec 2015, at 21:58, Andrés Ivaldi  wrote:
>
> Yes,
> The use case would be,
> Have spark in a service (I didnt invertigate this yet), through api
> calls of this service we perform some aggregations over data in SQL, We 
> are
> already doing this with an internal development
>
> Nothing complicated, for instance, a table with Product, Product
> Family, cost, price, etc. Columns like Dimension and Measures,
>
> I want to Spark for query that table to perform a kind of rollup, with
> cost as Measure and Prodcut, Product Family as Dimension
>
> Only 3 columns, it takes like 20s to perform that query and the
> aggregation, the  query directly to the database with a grouping at the
> columns takes like 1s
>
> regards
>
>
>
> On Tue, Dec 1, 2015 at 5:38 PM, Jörn Franke 
> wrote:
>
>> can you elaborate more on the use case?
>>
>> > On 01 Dec 2015, at 20:51, Andrés Ivaldi  wrote:
>> >
>> > Hi,
>> >
>> > I'd like to use spark to perform some transformations over data
>> stored inSQL, but I need low Latency, I'm doing some test and I run into
>> spark context creation and data query over SQL takes too long time.
>> >
>> > Any idea for speed up the process?
>> >
>> > regards.
>> >
>> > --
>> > Ing. Ivaldi Andres
>>
>
>
>
> --
> Ing. Ivaldi Andres
>
>

>>>
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>>
>>
>
>
> --
> Ing. Ivaldi Andres
>
>
>


Re: spark rdd grouping

2015-12-01 Thread ayan guha
I believe reduceByKeyLocally was introduced for this purpose.

On Tue, Dec 1, 2015 at 10:21 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Rajat,
>
> My quick test has showed that groupBy will preserve the partitions:
>
> scala>
> sc.parallelize(Seq(0,0,0,0,1,1,1,1),2).map((_,1)).mapPartitionsWithIndex
> { case (idx, iter) => val s = iter.toSeq; println(idx + " with " +
> s.size + " elements: " + s); s.toIterator
> }.groupBy(_._1).mapPartitionsWithIndex { case (idx, iter) => val s =
> iter.toSeq; println(idx + " with " + s.size + " elements: " + s);
> s.toIterator }.collect
>
> 1 with 4 elements: Stream((1,1), (1,1), (1,1), (1,1))
> 0 with 4 elements: Stream((0,1), (0,1), (0,1), (0,1))
>
> 0 with 1 elements: Stream((0,CompactBuffer((0,1), (0,1), (0,1), (0,1
> 1 with 1 elements: Stream((1,CompactBuffer((1,1), (1,1), (1,1), (1,1
>
> Do I miss anything?
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
> http://blog.jaceklaskowski.pl
> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Tue, Dec 1, 2015 at 2:46 AM, Rajat Kumar <rajatkumar10...@gmail.com>
> wrote:
> > Hi
> >
> > i have a javaPairRdd<K,V> rdd1. i want to group by rdd1 by keys but
> preserve
> > the partitions of original rdd only to avoid shuffle since I know all
> same
> > keys are already in same partition.
> >
> > PairRdd is basically constrcuted using kafka streaming low level consumer
> > which have all records with same key already in same partition. Can i
> group
> > them together with avoid shuffle.
> >
> > Thanks
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: How can you sort wordcounts by counts in stateful_network_wordcount.py example

2015-11-11 Thread ayan guha
how about this?

sorted = running_counts.map(lambda t: t[1],t[0]).sortByKey()

Basically swap key and value of the RDD and then sort?

On Thu, Nov 12, 2015 at 8:53 AM, Amir Rahnama <amirrahn...@gmail.com> wrote:

> Hey,
>
> Anybody knows how can one sort the result in the stateful example?
>
> Python would be prefered.
>
>
> https://github.com/apache/spark/blob/859dff56eb0f8c63c86e7e900a12340c199e6247/examples/src/main/python/streaming/stateful_network_wordcount.py
> --
> Thanks and Regards,
>
> Amir Hossein Rahnama
>
> *Tel: +46 (0) 761 681 102*
> Website: www.ambodi.com
> Twitter: @_ambodi <https://twitter.com/_ambodi>
>



-- 
Best Regards,
Ayan Guha


Re: How to analyze weather data in Spark?

2015-11-08 Thread ayan guha
Hi

Is it possible to elaborate a little more?

In order to consume a fixed width file, the standard process should be

1. Write a map function which takes input as a string and implement file
specs to return tuple of fields.
2. Load the files using sc.textFile (which reads the lines as string)
3. Pass on the lines to map and get back a RDD of fields.

Ayan

On Mon, Nov 9, 2015 at 3:20 PM, Hitoshi Ozawa <ozaw...@worksap.co.jp> wrote:

> There's a document describing the format of files in the parent directory.
> It
> seems like a fixed width file.
> ftp://ftp.ncdc.noaa.gov/pub/data/noaa/ish-format-document.pdf
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-analyze-weather-data-in-Spark-tp25256p25320.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Assign unique link ID

2015-10-31 Thread ayan guha
Can this be a solution?

1. Write a function which will take a string and convert to md5 hash
2. From your base table, generate a string out of all columns you have used
for joining. So, records 1 and 4 should generate same hash value.
3. group by using this new id (you have already linked the records) and
pull out required fields.

Please let the group know if it works...

Best
Ayan

On Sat, Oct 31, 2015 at 6:44 PM, Sarath Chandra <
sarathchandra.jos...@algofusiontech.com> wrote:

> Hi All,
>
> I have a hive table where data from 2 different sources (S1 and S2) get
> accumulated. Sample data below -
>
>
> *RECORD_ID|SOURCE_TYPE|TRN_NO|DATE1|DATE2|BRANCH|REF1|REF2|REF3|REF4|REF5|REF6|DC_FLAG|AMOUNT|CURRENCY*
>
> *1|S1|55|19-Oct-2015|19-Oct-2015|25602|999||41106|47311|379|9|004|999||Cr|2672.00|INR*
>
> *2|S1|55|19-Oct-2015|19-Oct-2015|81201|999||41106|9|379|9|004|999||Dr|2672.00|INR*
>
> *3|S2|55|19-OCT-2015|19-OCT-2015|81201|999||41106|9|379|9|004|999||DR|2672|INR*
>
> *4|S2|55|19-OCT-2015|19-OCT-2015|25602|999||41106|47311|379|9|004|999||CR|2672|INR*
>
> I have a requirement to link similar records (same dates, branch and
> reference numbers) source wise and assign them unique ID linking the 2
> records. For example records 1 and 4 above should be linked with same ID.
>
> I've written code below to segregate data source wise and join them based
> on the similarities. But not knowing how to proceed further.
>
> *var hc = new org.apache.spark.sql.hive.HiveContext(sc);*
> *var src = hc.sql("select
> RECORD_ID,SOURCE,TRN_NO,DATE1,DATE2,BRANCH,REF1,REF2,REF3,REF4,REF5,REF6,DC_FLAG,AMOUNT,CURRENCY
> from src_table");*
>
> *var s1 = src.filter("source_type='S1'");*
>
> *var s2 = src.filter("source_type='S2'");*
> *var src_join = s1.as <http://s1.as>("S1").join(s2.as
> <http://s2.as>("S2")).filter("(S1.TRN_NO= S2.TRN_NO) and (S1.DATE1=
> S2.DATE1) and (S1.DATE2= S2.DATE2) and (S1.BRANCH= S2.BRANCH) and (S1.REF1=
> S2.REF1) and (S1.REF2= S2.REF2) and (S1.REF3= S2.REF3) and (S1.REF4=
> S2.REF4) and (S1.REF5= S2.REF5) and (S1.REF6= S2.REF6) and (S1.CURRENCY=
> S2.CURRENCY)");*
>
> Tried using a UDF which returns a random value or hashed string using
> record IDs of both sides and include it to schema using withColumn, but
> ended up getting duplicate link IDs.
>
> Also when I use a UDF I'm not able to refer to the columns using the alias
> in next steps. For example if I create a new DF using below line -
> *var src_link = src_join.as
> <http://src_join.as>("SJ").withColumn("LINK_ID",
> linkIDUDF(src_join("S1.RECORD_ID"),src("S2.RECORD_ID")));*
> Then in further lines I'm not able to refer to "s1" columns from
> "src_link" like -
> *var src_link_s1 = src_link.as
> <http://src_link.as>("SL").select($"S1.RECORD_ID");*
>
> Please guide me.
>
> Regards,
> Sarath.
>



-- 
Best Regards,
Ayan Guha


Re: Pivot Data in Spark and Scala

2015-10-31 Thread ayan guha
 Fanilo
>
> *De :* Adrian Tanase [mailto:atan...@adobe.com]
> *Envoyé :* vendredi 30 octobre 2015 11:50
> *À :* Deng Ching-Mallete; Ascot Moss
> *Cc :* User
> *Objet :* Re: Pivot Data in Spark and Scala
>
> Its actually a bit tougher as you’ll first need all the years. Also not
> sure how you would reprsent your “columns” given they are dynamic based on
> the input data.
>
> Depending on your downstream processing, I’d probably try to emulate it
> with a hash map with years as keys instead of the columns.
>
> There is probably a nicer solution using the data frames API but I’m not
> familiar with it.
>
> If you actually need vectors I think this article I saw recently on the
> data bricks blog will highlight some options (look for gather encoder)
>
> https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html
>
> -adrian
>
> *From: *Deng Ching-Mallete
> *Date: *Friday, October 30, 2015 at 4:35 AM
> *To: *Ascot Moss
> *Cc: *User
> *Subject: *Re: Pivot Data in Spark and Scala
>
> Hi,
>
> You could transform it into a pair RDD then use the combineByKey function.
>
> HTH,
> Deng
>
> On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss <ascot.m...@gmail.com> wrote:
> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>
>
> --
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Programatically create RDDs based on input

2015-10-31 Thread ayan guha
Yes, this can be done. quick python equivalent:

# In Driver
fileList=["/file1.txt","/file2.txt"]
rdd = []
for f in fileList:
 rdd = jsc.textFile(f)
 rdds.append(rdd)



On Sat, Oct 31, 2015 at 11:09 PM, amit tewari <amittewar...@gmail.com>
wrote:

> Hi
>
> I need the ability to be able to create RDDs programatically inside my
> program (e.g. based on varaible number of input files).
>
> Can this be done?
>
> I need this as I want to run the following statement inside an iteration:
>
> JavaRDD rdd1 = jsc.textFile("/file1.txt");
>
> Thanks
> Amit
>



-- 
Best Regards,
Ayan Guha


Re: Assign unique link ID

2015-10-31 Thread ayan guha
Hi

The way I see it, your dedup condition needs to be defined. If you have it
variable, then the joining approach is no good either. You may want to stub
columns (like putting a default value in the joining clause) to achieve
this. If not, you would probably state the problem with all other
conditions so we can discuss further?

Getting a partition key upfront will be important in your case to control
shuffle.

Best
Ayan

On Sat, Oct 31, 2015 at 11:54 PM, Sarath Chandra <
sarathchandra.jos...@algofusiontech.com> wrote:

> Thanks for the reply Ayan.
>
> I got this idea earlier but the problem is the number of columns used for
> joining will be varying depending on the some data conditions. Also their
> data types will be different. So I'm not getting how to define the UDF as
> we need to upfront specify the argument count and their types.
>
> Any ideas how to tackle this?
>
> Regards,
> Sarath.
>
> On Sat, Oct 31, 2015 at 4:37 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Can this be a solution?
>>
>> 1. Write a function which will take a string and convert to md5 hash
>> 2. From your base table, generate a string out of all columns you have
>> used for joining. So, records 1 and 4 should generate same hash value.
>> 3. group by using this new id (you have already linked the records) and
>> pull out required fields.
>>
>> Please let the group know if it works...
>>
>> Best
>> Ayan
>>
>> On Sat, Oct 31, 2015 at 6:44 PM, Sarath Chandra <
>> sarathchandra.jos...@algofusiontech.com> wrote:
>>
>>> Hi All,
>>>
>>> I have a hive table where data from 2 different sources (S1 and S2) get
>>> accumulated. Sample data below -
>>>
>>>
>>> *RECORD_ID|SOURCE_TYPE|TRN_NO|DATE1|DATE2|BRANCH|REF1|REF2|REF3|REF4|REF5|REF6|DC_FLAG|AMOUNT|CURRENCY*
>>>
>>> *1|S1|55|19-Oct-2015|19-Oct-2015|25602|999||41106|47311|379|9|004|999||Cr|2672.00|INR*
>>>
>>> *2|S1|55|19-Oct-2015|19-Oct-2015|81201|999||41106|9|379|9|004|999||Dr|2672.00|INR*
>>>
>>> *3|S2|55|19-OCT-2015|19-OCT-2015|81201|999||41106|9|379|9|004|999||DR|2672|INR*
>>>
>>> *4|S2|55|19-OCT-2015|19-OCT-2015|25602|999||41106|47311|379|9|004|999||CR|2672|INR*
>>>
>>> I have a requirement to link similar records (same dates, branch and
>>> reference numbers) source wise and assign them unique ID linking the 2
>>> records. For example records 1 and 4 above should be linked with same ID.
>>>
>>> I've written code below to segregate data source wise and join them
>>> based on the similarities. But not knowing how to proceed further.
>>>
>>> *var hc = new org.apache.spark.sql.hive.HiveContext(sc);*
>>> *var src = hc.sql("select
>>> RECORD_ID,SOURCE,TRN_NO,DATE1,DATE2,BRANCH,REF1,REF2,REF3,REF4,REF5,REF6,DC_FLAG,AMOUNT,CURRENCY
>>> from src_table");*
>>>
>>> *var s1 = src.filter("source_type='S1'");*
>>>
>>> *var s2 = src.filter("source_type='S2'");*
>>> *var src_join = s1.as <http://s1.as>("S1").join(s2.as
>>> <http://s2.as>("S2")).filter("(S1.TRN_NO= S2.TRN_NO) and (S1.DATE1=
>>> S2.DATE1) and (S1.DATE2= S2.DATE2) and (S1.BRANCH= S2.BRANCH) and (S1.REF1=
>>> S2.REF1) and (S1.REF2= S2.REF2) and (S1.REF3= S2.REF3) and (S1.REF4=
>>> S2.REF4) and (S1.REF5= S2.REF5) and (S1.REF6= S2.REF6) and (S1.CURRENCY=
>>> S2.CURRENCY)");*
>>>
>>> Tried using a UDF which returns a random value or hashed string using
>>> record IDs of both sides and include it to schema using withColumn, but
>>> ended up getting duplicate link IDs.
>>>
>>> Also when I use a UDF I'm not able to refer to the columns using the
>>> alias in next steps. For example if I create a new DF using below line -
>>> *var src_link = src_join.as
>>> <http://src_join.as>("SJ").withColumn("LINK_ID",
>>> linkIDUDF(src_join("S1.RECORD_ID"),src("S2.RECORD_ID")));*
>>> Then in further lines I'm not able to refer to "s1" columns from
>>> "src_link" like -
>>> *var src_link_s1 = src_link.as
>>> <http://src_link.as>("SL").select($"S1.RECORD_ID");*
>>>
>>> Please guide me.
>>>
>>> Regards,
>>> Sarath.
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Programatically create RDDs based on input

2015-10-31 Thread ayan guha
My java knowledge is limited, but you may try with a hashmap and put RDDs
in it?

On Sun, Nov 1, 2015 at 4:34 AM, amit tewari <amittewar...@gmail.com> wrote:

> Thanks Ayan thats something similar to what I am looking at but trying the
> same in Java is giving compile error:
>
> JavaRDD jRDD[] = new JavaRDD[3];
>
> //Error: Cannot create a generic array of JavaRDD
>
> Thanks
> Amit
>
>
>
> On Sat, Oct 31, 2015 at 5:46 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Corrected a typo...
>>
>> # In Driver
>> fileList=["/file1.txt","/file2.txt"]
>> rdds = []
>> for f in fileList:
>>  rdd = jsc.textFile(f)
>>  rdds.append(rdd)
>>
>>
>> On Sat, Oct 31, 2015 at 11:14 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Yes, this can be done. quick python equivalent:
>>>
>>> # In Driver
>>> fileList=["/file1.txt","/file2.txt"]
>>> rdd = []
>>> for f in fileList:
>>>  rdd = jsc.textFile(f)
>>>  rdds.append(rdd)
>>>
>>>
>>>
>>> On Sat, Oct 31, 2015 at 11:09 PM, amit tewari <amittewar...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I need the ability to be able to create RDDs programatically inside my
>>>> program (e.g. based on varaible number of input files).
>>>>
>>>> Can this be done?
>>>>
>>>> I need this as I want to run the following statement inside an
>>>> iteration:
>>>>
>>>> JavaRDD rdd1 = jsc.textFile("/file1.txt");
>>>>
>>>> Thanks
>>>> Amit
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Using Hadoop Custom Input format in Spark

2015-10-27 Thread ayan guha
Mind sharing the error you are getting?
On 28 Oct 2015 03:53, "Balachandar R.A."  wrote:

> Hello,
>
>
> I have developed a hadoop based solution that process a binary file. This
> uses classic hadoop MR technique. The binary file is about 10GB and divided
> into 73 HDFS blocks, and the business logic written as map process operates
> on each of these 73 blocks. We have developed a customInputFormat and
> CustomRecordReader in Hadoop that returns key (intWritable) and value
> (BytesWritable) to the map function. The value is nothing but the contents
> of a HDFS block(bianry data). The business logic knows how to read this
> data.
>
> Now, I would like to port this code in spark. I am a starter in spark and
> could run simple examples (wordcount, pi example) in spark. However, could
> not straightforward example to process binaryFiles in spark. I see there
> are two solutions for this use case. In the first, avoid using custom input
> format and record reader. Find a method (approach) in spark the creates a
> RDD for those HDFS blocks, use a map like method that feeds HDFS block
> content to the business logic. If this is not possible, I would like to
> re-use the custom input format and custom reader using some methods such as
> HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first
> approach is possible or not. If possible, can anyone please provide some
> pointers that contains examples? I was trying second approach but highly
> unsuccessful. Here is the code snippet I used
>
> object Driver {
> def myFunc(key : IntWritable, content : BytesWritable) = {
>println("my business logic")
>   // printing key and content value/size is 0
>}
>
>
> def main(args: Array[String]) {
>   // create a spark context
>   val conf = new  
> SparkConf().setAppName("Dummy").setMaster("spark://:7077")
>   val sc = new SparkContext(conf)
>   val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat", 
> classOf[RandomAccessInputFormat], classOf[IntWritable], 
> classOf[BytesWritable])
>   val count = rd.map (x => func(x._1, x._2)).collect()
>}
> }
>
> Can someone tell where I am doing wrong here? I think I am not using API
> the right way but failed to find some documentation/usage examples.
>
>
> Thanks in advancea
>
> - bala
>


Re: spark multi tenancy

2015-10-07 Thread ayan guha
Can queues also be used to separate workloads?
On 7 Oct 2015 20:34, "Steve Loughran"  wrote:

>
> > On 7 Oct 2015, at 09:26, Dominik Fries 
> wrote:
> >
> > Hello Folks,
> >
> > We want to deploy several spark projects and want to use a unique project
> > user for each of them. Only the project user should start the spark
> > application and have the corresponding packages installed.
> >
> > Furthermore a personal user, which belongs to a specific project, should
> > start a spark application via the corresponding spark project user as
> proxy.
> > (Development)
> >
> > The Application is currently running with ipython / pyspark. (HDP 2.3 -
> > Spark 1.3.1)
> >
> > Is this possible or what is the best practice for a spark multi tenancy
> > environment ?
> >
> >
>
> Deploy on a kerberized YARN cluster and each application instance will be
> running as a different unix user in the cluster, with the appropriate
> access to HDFS —isolated.
>
> The issue then becomes "do workloads clash with each other?". If you want
> to isolate dev & production, using node labels to keep dev work off the
> production nodes is the standard technique.


Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread ayan guha
Hi

2 cents

1. It should not be true anymore if data frames are used. The reason is
regardless of the language DF uses same optimization engine behind the
scene.
2. This is generally true in the sense Python APIs are typically  little
behind of scala/java ones.

Best
Ayan

On Wed, Oct 7, 2015 at 9:15 AM, dant <dan.tr...@gmail.com> wrote:

> Hi
>
> I'm hearing a common theme running that I should only do serious
> programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
>
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
>
> Are there any truth's to the above? I'm a little sceptical.
>
> Apologies for the duplication, my previous message was held up due to
> subscription issue. Reposting now.
>
> Thanks
> Dan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: HDFS small file generation problem

2015-09-27 Thread ayan guha
I would suggest not to write small files to hdfs. rather you can hold them
in memory, maybe off heap. and then you may flush it to hdfs using another
job. similar to https://github.com/ptgoetz/storm-hdfs (not sure if spark
already has something like it)

On Sun, Sep 27, 2015 at 11:36 PM, <nib...@free.fr> wrote:

> Hello,
> I'm still investigating my small file generation problem generated by my
> Spark Streaming jobs.
> Indeed, my Spark Streaming jobs are receiving a lot of small events (avg
> 10kb), and I have to store them inside HDFS in order to treat them by PIG
> jobs on-demand.
> The problem is the fact that I generate a lot of small files in HDFS
> (several millions) and it can be problematic.
> I investigated to use Hbase or Archive file but I don't want to do it
> finally.
> So, what about this solution :
> - Spark streaming generate on the fly several millions of small files in
> HDFS
> - Each night I merge them inside a big daily file
> - I launch my PIG jobs on this big file ?
>
> Other question I have :
> - Is it possible to append a big file (daily) by adding on the fly my
> event ?
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Py4j issue with Python Kafka Module

2015-09-23 Thread ayan guha
Thanks guys.

On Wed, Sep 23, 2015 at 3:54 PM, Tathagata Das <t...@databricks.com> wrote:

> SPARK_CLASSPATH is I believe deprecated right now. So I am not surprised
> that there is some difference in the code paths.
>
> On Tue, Sep 22, 2015 at 9:45 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> I think it is something related to class loader, the behavior is
>> different for classpath and --jars. If you want to know the details I think
>> you'd better dig out some source code.
>>
>> Thanks
>> Jerry
>>
>> On Tue, Sep 22, 2015 at 9:10 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> I must have been gone mad :) Thanks for pointing it out. I downloaded
>>> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
>>>
>>> However, I am getting a new error now
>>>
>>> >>> kvs =
>>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>>> ocalhost:9092'})
>>>
>>>
>>> 
>>> 
>>>
>>>   Spark Streaming's Kafka libraries not found in class path. Try one of
>>> the foll
>>> owing.
>>>
>>>   1. Include the Kafka library and its dependencies with in the
>>>  spark-submit command as
>>>
>>>  $ bin/spark-submit --packages
>>> org.apache.spark:spark-streaming-kafka:1.5.0
>>> ...
>>>
>>>   2. Download the JAR of the artifact from Maven Central
>>> http://search.maven.org
>>> /,
>>>  Group Id = org.apache.spark, Artifact Id =
>>> spark-streaming-kafka-assembly,
>>> Version = 1.5.0.
>>>  Then, include the jar in the spark-submit command as
>>>
>>>  $ bin/spark-submit --jars  ...
>>>
>>>
>>> 
>>> 
>>>
>>>
>>>
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \streaming\kafka.py", line 130, in createDirectStream
>>> raise e
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o30.loadClass.
>>> : java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka.KafkaUtilsP
>>> ythonHelper
>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>>> Source)
>>> at java.lang.reflect.Method.invoke(Unknown Source)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>> at
>>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>>> at py4j.Gateway.invoke(Gateway.java:259)
>>> at
>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>> >>> os.environ['SPARK_CLASSPATH']
>>> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>>> >>>
>>>
>>>
>>> So I launched pyspark with --jars with the assembly jar. Now it is
>>> working.
>>>
>>> THANK YOU for help.
>>>
>>> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> I think you're using the wrong version of kafka assembly jar, I think
>>>> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
>>>> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
>>>> you choose Kafka assembly 1.3.0?
>>>>
>>>>
>>>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>>>
>>>>
>>>>
>>>> On Tu

Re: Join over many small files

2015-09-23 Thread ayan guha
I think this can be a good case for using sequence file format to pack many
files to few sequence files with file name as key andd content as value.
Then read it as RDD and produce tuples like you mentioned (key=fileno+id,
value=value). After that, it is a simple map operation to generate the diff
(broadcasting special file is right idea).

On Thu, Sep 24, 2015 at 7:31 AM, Tracewski, Lukasz <
lukasz.tracew...@credit-suisse.com> wrote:

> Hi all,
>
>
>
> I would like you to ask for an advise on how to efficiently make a join
> operation in Spark with tens of thousands of tiny files. A single file has
> a few KB and ~50 rows. In another scenario they might have 200 KB and 2000
> rows.
>
>
>
> To give you impression how they look like:
>
>
>
> File 01
>
> ID | VALUE
>
> 01 | 10
>
> 02 | 12
>
> 03 | 55
>
> …
>
>
>
> File 02
>
> ID | VALUE
>
> 01 | 33
>
> 02 | 21
>
> 03 | 53
>
> …
>
>
>
> and so on… ID is unique in a file, but repeats in every file. There is
> also a Special file which has the same form:
>
>
>
> File Special
>
> ID | VALUE
>
> 01 | 21
>
> 02 | 23
>
> 03 | 54
>
> …
>
>
>
> What I would like to get is a join of File 01..1 with File Special to
> get a difference between values:
>
>
>
> File Result 01 = File Special – File 01
>
> ID | VALUE
>
> 01 | 21-10
>
> 02 | 23-12
>
> 03 | 54-53
>
> …
>
>
>
> And save result to a csv, meaning 1 new files. What’s the best way of
> doing this?
>
>
>
> My idea was the following:
>
> 1.   Read all Files with wholeTextFiles, each to a separate partition
>
> 2.   Perform map-side join with broadcast variable inside
> mapPartitions (the “Special” file will be broadcasted).
>
>
>
> I am on Spark 1.3, but it can be upgraded if needed. Perhaps this could be
> done better in a dataframe? Then I would create one large dataframe, with
> additional “filename” key, i.e.:
>
> File | ID | Value
>
> 01 | 01 | 10
>
> 01 | 02 | 12
>
> 01 | 03 | 55
>
> 02 | 01 | 21
>
> 02 | 02 | 23
>
> …
>
>
>
> What would be then a way to make an efficient query over such dataframe?
>
>
>
> Any advice will be appreciated.
>
>
>
> Best regards,
>
> Lucas
>
>
>
> ==
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==
>



-- 
Best Regards,
Ayan Guha


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha
I must have been gone mad :) Thanks for pointing it out. I downloaded 1.5.0
assembly jar and added it in SPARK_CLASSPATH.

However, I am getting a new error now

>>> kvs =
KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
ocalhost:9092'})




  Spark Streaming's Kafka libraries not found in class path. Try one of the
foll
owing.

  1. Include the Kafka library and its dependencies with in the
 spark-submit command as

 $ bin/spark-submit --packages
org.apache.spark:spark-streaming-kafka:1.5.0
...

  2. Download the JAR of the artifact from Maven Central
http://search.maven.org
/,
 Group Id = org.apache.spark, Artifact Id =
spark-streaming-kafka-assembly,
Version = 1.5.0.
 Then, include the jar in the spark-submit command as

 $ bin/spark-submit --jars  ...






Traceback (most recent call last):
  File "", line 1, in 
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\streaming\kafka.py", line 130, in createDirectStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtilsP
ythonHelper
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)

>>> os.environ['SPARK_CLASSPATH']
'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>>>


So I launched pyspark with --jars with the assembly jar. Now it is working.

THANK YOU for help.

Curiosity:  Why adding it to SPARK CLASSPATH did not work?

Best
Ayan

On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> I think you're using the wrong version of kafka assembly jar, I think
> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
> you choose Kafka assembly 1.3.0?
>
>
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>
>
>
> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> I have added spark assembly jar to SPARK CLASSPATH
>>
>> >>> print os.environ['SPARK_CLASSPATH']
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>
>>
>> Now  I am facing below issue with a test topic
>>
>> >>> ssc = StreamingContext(sc, 2)
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>> ocalhost:9092'})
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\kafka.py", line 126, in createDirectStream
>> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>> set(topics), jfr
>> omOffsets)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \sql\utils.py", line 36, in deco
>> return f(*a, **kw)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling
>> o22.createDirectStream.
>>  Trace:
>> py4j.Py4JException: Method createDirectStream([class
>> org.apache.spark.streaming.
>> api.java.JavaStreamingContext, class java.util.HashMap, class
>> java.util.HashSet,
>>  class java.util.HashMap]) does not exist
>> at
>> py4j.reflection.ReflectionEngine.getMethod(R

Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha
Hi

I have added spark assembly jar to SPARK CLASSPATH

>>> print os.environ['SPARK_CLASSPATH']
D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar


Now  I am facing below issue with a test topic

>>> ssc = StreamingContext(sc, 2)
>>> kvs =
KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
ocalhost:9092'})
Traceback (most recent call last):
  File "", line 1, in 
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\streaming\kafka.py", line 126, in createDirectStream
jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
set(topics), jfr
omOffsets)
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\sql\utils.py", line 36, in deco
return f(*a, **kw)
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
o22.createDirectStream.
 Trace:
py4j.Py4JException: Method createDirectStream([class
org.apache.spark.streaming.
api.java.JavaStreamingContext, class java.util.HashMap, class
java.util.HashSet,
 class java.util.HashMap]) does not exist
at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)

at py4j.Gateway.invoke(Gateway.java:252)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)


>>>

Am I doing something wrong?


-- 
Best Regards,
Ayan Guha


Re: Spark Ingestion into Relational DB

2015-09-21 Thread ayan guha
No, it does not require hadoop.


   1. However, I doubt if this is a good usecase for spark. You probably
   would be better off and gain better performance with sqlloader.


On Tue, Sep 22, 2015 at 3:13 PM, Sri <sriesh.subb...@gmail.com> wrote:

> Hi,
>
> We have a usecase  where we get the dated from different systems and
> finally
> data will be consolidated into Oracle Database. Does spark is a valid
> useless for this scenario. Currently we also don't have any big data
> component. In case if we go with Spark to ingest data, does it require
> hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Relational Log Data

2015-09-15 Thread ayan guha
Spark functions are lazy, so none of them actually do anything until an
action is encountered. And no, your code will NOT read the file multiple
time.

On Tue, Sep 15, 2015 at 7:33 PM, 328d95 <20500...@student.uwa.edu.au> wrote:

> I am trying to read logs which have many irrelevant lines and whose lines
> are
> related by a thread number in each line.
>
> Firstly, if I read from a text file using the textFile function and then
> call multiple filter functions on that file will Spark apply all of the
> filters using one read pass?
>
> Eg will the second filter incur another read of log.txt?
> val file = sc.textFile("log.txt")
> val test = file.filter(some condition)
> val test1 = file.filter(some other condition)
>
> Secondly, if there are multiple reads I was thinking that I could apply a
> filter that gets rid of all of the lines that I do not need and cache that
> in a PairRDD. From that PairRDD I would need to remove keys that only
> appear
> once, is there a recommended strategy for this? I was thinking about using
> distinct to create another PairRDD and then using subtract, but this seems
> inefficient.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Relational-Log-Data-tp24696.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming Suggestion

2015-09-15 Thread ayan guha
I think you need to make up your mind about storm vs spark. Using both in
this context does not make much sense to me.
On 15 Sep 2015 22:54, "David Morales"  wrote:

> Hi there,
>
> This is exactly our goal in Stratio Sparkta, a real-time aggregation
> engine fully developed with spark streaming (and fully open source).
>
> Take a look at:
>
>
>- the docs: http://docs.stratio.com/modules/sparkta/development/
>- the repository: https://github.com/Stratio/sparkta
>- and some slides explaining how sparkta was born and what it makes:
>http://www.slideshare.net/Stratio/strata-sparkta
>
>
> Feel free to ask us anything about the project.
>
>
>
>
>
>
>
>
> 2015-09-15 8:10 GMT+02:00 srungarapu vamsi :
>
>> The batch approach i had implemented takes about 10 minutes to complete
>> all the pre-computation tasks for the one hour worth of data. When i went
>> through my code, i figured out that most of the time consuming tasks are
>> the ones, which read data from cassandra and the places where i perform
>> sparkContex.union(Array[RDD]).
>> Now the ask is to get the pre computation tasks near real time. So i am
>> exploring the streaming approach.
>>
>> My pre computation tasks not only include just finding the unique numbers
>> for a given device every minute, every hour, every day but it also includes
>> the following tasks:
>> 1. Find the number of unique numbers across a set of devices every
>> minute, every hour, every day
>> 2. Find the number of unique numbers which are commonly occurring across
>> a set of devices every minute, every hour, every day
>> 3. Find (total time a number occurred across a set of devices)/(total
>> unique numbers occurred across the set of devices)
>> The above mentioned pre computation tasks are just a few of what i will
>> be needing and there are many more coming towards me :)
>> I see all these problems need more of data parallel approach and hence i
>> am interested to do this on the spark streaming end.
>>
>>
>> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke 
>> wrote:
>>
>>> Why did you not stay with the batch approach? For me the architecture
>>> looks very complex for a simple thing you want to achieve. Why don't you
>>> process the data already in storm ?
>>>
>>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi 
>>> a écrit :
>>>
 I am pretty new to spark. Please suggest a better model for the
 following use case.

 I have few (about 1500) devices in field which keep emitting about
 100KB of data every minute. The nature of data sent by the devices is just
 a list of numbers.
 As of now, we have Storm is in the architecture which receives this
 data, sanitizes it and writes to cassandra.
 Now, i have a requirement to process this data. The processing includes
 finding unique numbers emitted by one or more devices for every minute,
 every hour, every day, every month.
 I had implemented this processing part as a batch job execution and now
 i am interested in making it a streaming application. i.e calculating the
 processed data as and when devices emit the data.

 I have the following two approaches:
 1. Storm writes the actual data to cassandra and writes a message on
 Kafka bus that data corresponding to device D and minute M has been written
 to cassandra

 Then Spark streaming reads this message from kafka , then reads the
 data of Device D at minute M from cassandra and starts processing the data.

 2. Storm writes the data to both cassandra and  kafka, spark reads the
 actual data from kafka , processes the data and writes to cassandra.
 The second approach avoids additional hit of reading from cassandra
 every minute , a device has written data to cassandra at the cost of
 putting the actual heavy messages instead of light events on  kafka.

 I am a bit confused among the two approaches. Please suggest which one
 is better and if both are bad, how can i handle this use case?


 --
 /Vamsi

>>>
>>
>>
>> --
>> /Vamsi
>>
>
>
>
> --
>
> David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
> 
>
>
> 
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> *
>


Re: Directly reading data from S3 to EC2 with PySpark

2015-09-15 Thread ayan guha
Also you can set hadoop conf through jsc.hadoopConf property. Do a dir (sc)
to see exact property name
On 15 Sep 2015 22:43, "Gourav Sengupta"  wrote:

> Hi,
>
> If you start your EC2 nodes with correct roles (default in most cases
> depending on your needs) you should be able to work on S3 and all other AWS
> resources without giving any keys.
>
> I have been doing that for some time now and I have not faced any issues
> yet.
>
>
> Regards,
> Gourav
>
>
>
> On Tue, Sep 15, 2015 at 12:54 PM, Cazen  wrote:
>
>> Good day junHyeok
>>
>> Did you set HADOOP_CONF_DIR? It seems that spark cannot find AWS key
>> properties
>>
>> If it doesn't work after set, How about export AWS_ACCESS_KEY_ID,
>> AWS_SECRET_ACCESS_KEY before running py-spark shell?
>>
>> BR
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Directly-reading-data-from-S3-to-EC2-with-PySpark-tp24638p24698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


<    1   2   3   4   5   6   7   8   >