Re: Invalidating/Remove complete mapWithState state

2017-04-17 Thread Matthias Niehoff
That would be the hard way, but if possible I want to clear the cache
without stopping the application, maybe triggered by a message in the
stream.

Am 17. April 2017 um 19:41 schrieb ayan guha :

> It sounds like you want to stop the stream process, wipe out the check
> point and restart?
>
> On Mon, 17 Apr 2017 at 10:13 pm, Matthias Niehoff <
> matthias.nieh...@codecentric.de> wrote:
>
>> Hi everybody,
>>
>> is there a way to complete invalidate or remove the state used by
>> mapWithState, not only for a given key using State#remove()?
>>
>> Deleting the state key by key is not an option, as a) not all possible
>> keys are known(might be work around of course) and b) the number of keys is
>> to big and therefore takes to long.
>>
>> I tried to unpersist the RDD retrieved by stateSnapshot (
>> stateSnapshots().transform(_.unpersist()) ) , but this did not work as
>> expected.
>>
>> Thank you,
>>
>> Matthias
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
>> codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
>> telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
> --
> Best Regards,
> Ayan Guha
>



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread Jayant Shekhar
Hello Gaurav,

Pre-calculating the results would obviously be a great idea - and load the
results into a serving store from where you serve it out to your customers
- as suggested by Jorn.

And run it every hour/day, depending on your requirements.

Zeppelin (as mentioned by Ayan) would not be a good tool for this use case
as it more for interactive data exploration.

You can hand-code your spark jobs, or if SQL does the job you can use it,
or use a drag and drop tool for creating your workflows for your reports
and/or incorporate ML into it.

Jayant




On Mon, Apr 17, 2017 at 7:17 AM, ayan guha  wrote:

> Zeppelin is more useful for interactive data exploration. If tye reports
> are known beforehand then any good reporting tool should work, such as
> tablaue, qlic, power bi etc. zeppelin is not fit for this use case.
>
> On Mon, 17 Apr 2017 at 6:57 pm, Gaurav Pandya 
> wrote:
>
>> Thanks Jorn. Yes, I will precalculate the results. Do you think Zeppelin
>> can work here?
>>
>> On Mon, Apr 17, 2017 at 1:41 PM, Jörn Franke 
>> wrote:
>>
>>> Processing through Spark is fine, but I do not recommend that each of
>>> the users triggers a Spark query. So either you precalculate the reports in
>>> Spark so that the reports themselves do not trigger Spark queries or you
>>> have a database that serves the report. For the latter case there are tons
>>> of commercial tools. Depending on the type of report you can also use a
>>> custom report tool or write your own dashboard with ds3.js visualizations.
>>>
>>> On 17. Apr 2017, at 09:49, Gaurav Pandya 
>>> wrote:
>>>
>>> Thanks for the revert Jorn.
>>> In my case, I am going to put the analysis on e-commerce website so
>>> naturally users will be more and it will keep growing when e-commerce
>>> website captures market. Users will not be doing any analysis here. Reports
>>> will show their purchasing behaviour and pattern (kind of Machine learning
>>> stuff).
>>> Please note that all processing will be done in Spark here. Please share
>>> your thoughts. Thanks again.
>>>
>>> On Mon, Apr 17, 2017 at 12:58 PM, Jörn Franke 
>>> wrote:
>>>
 I think it highly depends on your requirements. There are various tools
 for analyzing and visualizing data. How many concurrent users do you have?
 What analysis do they do? How much data is involved? Do they have to
 process the data all the time or can they live with sampling which
 increases performance and response time significantly.
 In lambda architecture terms you may want to think about different
 technologies in the serving layer.

 > On 17. Apr 2017, at 06:55, Gaurav1809 
 wrote:
 >
 > Hi All, I am looking for a data visualization (and analytics) tool. My
 > processing is done through Spark. There are many tools available
 around us.
 > I got some suggestions on Apache Zeppelin too? Can anybody throw some
 light
 > on its power and capabilities when it comes to data analytics and
 > visualization? If there are any better options than this, do suggest
 too.
 > One of the options came to me was Kibana (from ELK stack). Thanks.
 >
 >
 >
 > --
 > View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-
 analytics-visualization-tp28604.html
 > Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 >
 > -
 > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 >

>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets

2017-04-17 Thread Amol Patil
@Ayan - Creating temp table dynamically based on dataset name. I will
explore df.saveAsTable option.


On Mon, Apr 17, 2017 at 9:53 PM, Ryan  wrote:

> It shouldn't be a problem then. We've done the similar thing in scala. I
> don't have much experience with python thread but maybe the code related
> with reading/writing temp table isn't thread safe.
>
> On Mon, Apr 17, 2017 at 9:45 PM, Amol Patil  wrote:
>
>> Thanks Ryan,
>>
>> Each dataset has separate hive table. All hive tables belongs to same
>> hive database.
>>
>> The idea is to ingest data in parallel in respective hive tables.
>>
>> If I run code sequentially for each data source, it works fine but I will
>> take lot of time. We are planning to process around 30-40 different data
>> sources.
>>
>> Please advise.
>>
>> Thank you,
>> Amol
>>
>>
>>
>> On Monday, April 17, 2017, Ryan  wrote:
>>
>>> I don't think you can parallel insert into a hive table without dynamic
>>> partition, for hive locking please refer to
>>> https://cwiki.apache.org/confluence/display/Hive/Locking.
>>>
>>> Other than that, it should work.
>>>
>>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil 
>>> wrote:
>>>
 Hi All,

 I'm writing generic pyspark program to process multiple datasets using
 Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
 be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
 will be available at different timeframe (weekly,monthly,quarterly).

 My requirement is to process all the datasets in parallel by triggering
 job only once.

 In Current implementation we are using Spark CSV package for reading
 csv files & using python treading mechanism to trigger multiple threads
 --
 jobs = []
 for dict_key, dict_val in config_dict.items():
 t = threading.Thread(target=task,args=(sqlContext,dict_val))
 jobs.append(t)
 t.start()

 for x in jobs:
 x.join()
 ---
 And Defind task mehtod to process each dataset based config values dict

 -
 def task(sqlContex, data_source_dict):
 ..
 ...
 -

 task method,
 1. create dataframe from csv file
 2. then create temporary table from that dataframe.
 3. finally it ingest data in to Hive table.

 *Issue:*
 1. If I process two datasets in parallel, one dataset goes through
 successfully but for other dataset I'm getting error "*u'temp_table
 not found*" while ingesting data in to hive table. Its happening
 consistently either with dataset A or Dataset B
 sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
 '+temp_table_name)

 I tried below things
 1. I'm creating dataframe name & temporary tabel name dynamically based
 in dataset name.
 2. Enabled Spark Dynamic allocation (--conf
 spark.dynamicAllocation.enabled=true)
 3. Set spark.scheduler.mode to FAIR


 I appreciate advise on
 1. Is anything wrong in above implementation?
 2. Is it good idea to process those big datasets in parallel in one job?
 3. Any other solution to process multiple datasets in parallel?

 Thank you,
 Amol Patil

>>>
>>>
>


Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets

2017-04-17 Thread Ryan
It shouldn't be a problem then. We've done the similar thing in scala. I
don't have much experience with python thread but maybe the code related
with reading/writing temp table isn't thread safe.

On Mon, Apr 17, 2017 at 9:45 PM, Amol Patil  wrote:

> Thanks Ryan,
>
> Each dataset has separate hive table. All hive tables belongs to same hive
> database.
>
> The idea is to ingest data in parallel in respective hive tables.
>
> If I run code sequentially for each data source, it works fine but I will
> take lot of time. We are planning to process around 30-40 different data
> sources.
>
> Please advise.
>
> Thank you,
> Amol
>
>
>
> On Monday, April 17, 2017, Ryan  wrote:
>
>> I don't think you can parallel insert into a hive table without dynamic
>> partition, for hive locking please refer to
>> https://cwiki.apache.org/confluence/display/Hive/Locking.
>>
>> Other than that, it should work.
>>
>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil 
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm writing generic pyspark program to process multiple datasets using
>>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>>> will be available at different timeframe (weekly,monthly,quarterly).
>>>
>>> My requirement is to process all the datasets in parallel by triggering
>>> job only once.
>>>
>>> In Current implementation we are using Spark CSV package for reading csv
>>> files & using python treading mechanism to trigger multiple threads
>>> --
>>> jobs = []
>>> for dict_key, dict_val in config_dict.items():
>>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>>> jobs.append(t)
>>> t.start()
>>>
>>> for x in jobs:
>>> x.join()
>>> ---
>>> And Defind task mehtod to process each dataset based config values dict
>>>
>>> -
>>> def task(sqlContex, data_source_dict):
>>> ..
>>> ...
>>> -
>>>
>>> task method,
>>> 1. create dataframe from csv file
>>> 2. then create temporary table from that dataframe.
>>> 3. finally it ingest data in to Hive table.
>>>
>>> *Issue:*
>>> 1. If I process two datasets in parallel, one dataset goes through
>>> successfully but for other dataset I'm getting error "*u'temp_table not
>>> found*" while ingesting data in to hive table. Its happening
>>> consistently either with dataset A or Dataset B
>>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>>> '+temp_table_name)
>>>
>>> I tried below things
>>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>>> in dataset name.
>>> 2. Enabled Spark Dynamic allocation (--conf
>>> spark.dynamicAllocation.enabled=true)
>>> 3. Set spark.scheduler.mode to FAIR
>>>
>>>
>>> I appreciate advise on
>>> 1. Is anything wrong in above implementation?
>>> 2. Is it good idea to process those big datasets in parallel in one job?
>>> 3. Any other solution to process multiple datasets in parallel?
>>>
>>> Thank you,
>>> Amol Patil
>>>
>>
>>


Application not found in RM

2017-04-17 Thread Mohammad Tariq
Dear fellow Spark users,

*Use case :* I have written a small java client which launches multiple
Spark jobs through *SparkLauncher* and captures jobs' metrics during the
course of the execution.

*Issue :* Sometimes the client fails saying -
*Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException):
Application with id 'application_APP_ID' doesn't exist in RM.*

I am using *YarnClient.getApplicationReport(ApplicationID ID)* to get the
desired metrics. I forced the threads to sleep for sometime so that
applications actually gets started before I query for these metrics. Most
of the times it works. However, I feel this is not the correct approach.

What could be the ideal way to handle such situation?

Thank you so much for your valuable time!





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]




[image: --]

Tariq, Mohammad
[image: https://]about.me/mti



Re: Is there a way to tell if a receiver is a Reliable Receiver?

2017-04-17 Thread Charles O. Bajomo
The easiest way I found was to take a look at the source. Any receiver that 
calls the version of store that requires an iterator is considered reliable. A 
definitive list would be nice.

Kind Regards

- Original Message -
From: "Justin Pihony" 
To: "user" 
Sent: Monday, 17 April, 2017 20:34:21
Subject: Is there a way to tell if a receiver is a Reliable Receiver?

I can't seem to find anywhere that would let a user know if the receiver they
are using is reliable or not. Even better would be a list of known reliable
receivers. Are any of these things possible? Or do you just have to research
your receiver beforehand?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-tell-if-a-receiver-is-a-Reliable-Receiver-tp28609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is there a way to tell if a receiver is a Reliable Receiver?

2017-04-17 Thread Justin Pihony
I can't seem to find anywhere that would let a user know if the receiver they
are using is reliable or not. Even better would be a list of known reliable
receivers. Are any of these things possible? Or do you just have to research
your receiver beforehand?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-tell-if-a-receiver-is-a-Reliable-Receiver-tp28609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: isin query

2017-04-17 Thread Koert Kuipers
i dont see this behavior in the current spark master:

scala> val df = Seq("m_123", "m_111", "m_145", "m_098",
"m_666").toDF("msrid")
df: org.apache.spark.sql.DataFrame = [msrid: string]

scala> df.filter($"msrid".isin("m_123")).count
res0: Long =
1

scala> df.filter($"msrid".isin("m_123","m_111","m_145")).count
res1: Long = 3



On Mon, Apr 17, 2017 at 10:50 AM, nayan sharma 
wrote:

> Thanks for responding.
> df.filter($”msrid”===“m_123” || $”msrid”===“m_111”)
>
> there are lots of workaround to my question but Can you let know whats
> wrong with the “isin” query.
>
> Regards,
> Nayan
>
> Begin forwarded message:
>
> *From: *ayan guha 
> *Subject: **Re: isin query*
> *Date: *17 April 2017 at 8:13:24 PM IST
> *To: *nayan sharma , user@spark.apache.org
>
> How about using OR operator in filter?
>
> On Tue, 18 Apr 2017 at 12:35 am, nayan sharma 
> wrote:
>
>> Dataframe (df) having column msrid(String) having values
>> m_123,m_111,m_145,m_098,m_666
>>
>> I wanted to filter out rows which are having values m_123,m_111,m_145
>>
>> df.filter($"msrid".isin("m_123","m_111","m_145")).count
>> count =0
>> while
>> df.filter($"msrid".isin("m_123")).count
>> count=121212
>> I have tried using queries like
>> df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
>> count =0
>> but
>>
>> df.filter($"msrid" isin (List("m_123"):_*))
>> count=121212
>>
>> Any suggestion will do a great help to me.
>>
>> Thanks,
>> Nayan
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


Handling skewed data

2017-04-17 Thread Vishnu Viswanath
Hello All,

Does anyone know if the skew handling code mentioned in this talk
https://www.youtube.com/watch?v=bhYV0JOPd9Y was added to spark?

If so can I know where to look for more info, JIRA? Pull request?

Thanks in advance.
Regards,
Vishnu Viswanath.


Fwd: isin query

2017-04-17 Thread nayan sharma
Thanks for responding.
df.filter($”msrid”===“m_123” || $”msrid”===“m_111”)

there are lots of workaround to my question but Can you let know whats wrong 
with the “isin” query.

Regards,
Nayan

> Begin forwarded message:
> 
> From: ayan guha 
> Subject: Re: isin query
> Date: 17 April 2017 at 8:13:24 PM IST
> To: nayan sharma , user@spark.apache.org
> 
> How about using OR operator in filter? 
> 
> On Tue, 18 Apr 2017 at 12:35 am, nayan sharma  > wrote:
> Dataframe (df) having column msrid(String) having values 
> m_123,m_111,m_145,m_098,m_666
> 
> I wanted to filter out rows which are having values m_123,m_111,m_145
> 
> df.filter($"msrid".isin("m_123","m_111","m_145")).count 
> count =0
> while 
> df.filter($"msrid".isin("m_123")).count 
> count=121212
> I have tried using queries like 
> df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
> count =0
> but 
> 
> df.filter($"msrid" isin (List("m_123"):_*))
> count=121212
> 
> Any suggestion will do a great help to me.
> 
> Thanks,
> Nayan
> -- 
> Best Regards,
> Ayan Guha



Re: isin query

2017-04-17 Thread ayan guha
How about using OR operator in filter?

On Tue, 18 Apr 2017 at 12:35 am, nayan sharma 
wrote:

> Dataframe (df) having column msrid(String) having values
> m_123,m_111,m_145,m_098,m_666
>
> I wanted to filter out rows which are having values m_123,m_111,m_145
>
> df.filter($"msrid".isin("m_123","m_111","m_145")).count
> count =0
> while
> df.filter($"msrid".isin("m_123")).count
> count=121212
> I have tried using queries like
> df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
> count =0
> but
>
> df.filter($"msrid" isin (List("m_123"):_*))
> count=121212
>
> Any suggestion will do a great help to me.
>
> Thanks,
> Nayan
>
-- 
Best Regards,
Ayan Guha


filter operation using isin

2017-04-17 Thread nayan sharma
Dataframe (df) having column msrid(String) having values 
m_123,m_111,m_145,m_098,m_666

I wanted to filter out rows which are having values m_123,m_111,m_145

df.filter($"msrid".isin("m_123","m_111","m_145")).count 
count =0
while 
df.filter($"msrid".isin("m_123")).count 
count=121212
I have tried using queries like 
df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
count =0
but 

df.filter($"msrid" isin (List("m_123"):_*))
count=121212

Any suggestion will do a great help to me.

Best Regards,
Nayan

Re: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread Jörn Franke
Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc. 
Maybe you can put the data in a HAR and store id, path in orc/parquet.

> On 17. Apr 2017, at 10:52, 莫涛  wrote:
> 
> Hi Jörn,
> 
> 
> 
> I do think a 5 MB column is odd but I don't have any other idea before asking 
> this question. The binary data is a short video and the maximum size is no 
> more than 50 MB.
> 
> 
> 
> Hadoop archive sounds very interesting and I'll try it first to check whether 
> filtering is fast on it.
> 
> 
> To my best knowledge, HBase works best for record around hundreds of KB and 
> it requires extra work of the cluster administrator. So this would be the 
> last option.
> 
> 
> Thanks!
> 
> 
> 
> Mo Tao
> 
> 发件人: Jörn Franke 
> 发送时间: 2017年4月17日 15:59:28
> 收件人: 莫涛
> 抄送: user@spark.apache.org
> 主题: Re: How to store 10M records in HDFS to speed up further filtering?
>  
> You need to sort the data by id otherwise q situation can occur where the 
> index does not work. Aside from this, it sounds odd to put a 5 MB column 
> using those formats. This will be also not so efficient. 
> What is in the 5 MB binary data? 
> You could use HAR or maybe Hbase to store this kind of data (if it does not 
> get much larger than 5 MB).
> 
> > On 17. Apr 2017, at 08:23, MoTao  wrote:
> > 
> > Hi all,
> > 
> > I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> > average.
> > In my daily application, I need to filter out 10K BINARY according to an ID
> > list.
> > How should I store the whole data to make the filtering faster?
> > 
> > I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> > and column-based format (orc).
> > However, both of them require to scan almost ALL records, making the
> > filtering stage very very slow.
> > The code block for filtering looks like:
> > 
> > val IDSet: Set[String] = ...
> > val checkID = udf { ID: String => IDSet(ID) }
> > spark.read.orc("/path/to/whole/data")
> >  .filter(checkID($"ID"))
> >  .select($"ID", $"BINARY")
> >  .write...
> > 
> > Thanks for any advice!
> > 
> > 
> > 
> > 
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> > 
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > 


isin query

2017-04-17 Thread nayan sharma
Dataframe (df) having column msrid(String) having values 
m_123,m_111,m_145,m_098,m_666

I wanted to filter out rows which are having values m_123,m_111,m_145

df.filter($"msrid".isin("m_123","m_111","m_145")).count 
count =0
while 
df.filter($"msrid".isin("m_123")).count 
count=121212
I have tried using queries like 
df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
count =0
but 

df.filter($"msrid" isin (List("m_123"):_*))
count=121212

Any suggestion will do a great help to me.

Thanks,
Nayan

Re: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread ayan guha
One possihility is using hive with bucketed on id column?

Another option: build the index in hbase ie store id and path of hdfs in
hbase. This was your scans will be fast and once you have the hdfs path
pointers you can read the actual data from hdfs.

On Mon, 17 Apr 2017 at 6:52 pm, 莫涛  wrote:

> Hi Jörn,
>
>
> I do think a 5 MB column is odd but I don't have any other idea before
> asking this question. The binary data is a short video and the maximum
> size is no more than 50 MB.
>
>
> Hadoop archive sounds very interesting and I'll try it first to check
> whether filtering is fast on it.
>
>
> To my best knowledge, HBase works best for record around hundreds of KB
> and it requires extra work of the cluster administrator. So this would be
> the last option.
>
>
> Thanks!
>
>
> Mo Tao
> --
> *发件人:* Jörn Franke 
> *发送时间:* 2017年4月17日 15:59:28
> *收件人:* 莫涛
> *抄送:* user@spark.apache.org
>
> *主题:* Re: How to store 10M records in HDFS to speed up further filtering?
> You need to sort the data by id otherwise q situation can occur where the
> index does not work. Aside from this, it sounds odd to put a 5 MB column
> using those formats. This will be also not so efficient.
> What is in the 5 MB binary data?
> You could use HAR or maybe Hbase to store this kind of data (if it does
> not get much larger than 5 MB).
>
> > On 17. Apr 2017, at 08:23, MoTao  wrote:
> >
> > Hi all,
> >
> > I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> > average.
> > In my daily application, I need to filter out 10K BINARY according to an
> ID
> > list.
> > How should I store the whole data to make the filtering faster?
> >
> > I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> > and column-based format (orc).
> > However, both of them require to scan almost ALL records, making the
> > filtering stage very very slow.
> > The code block for filtering looks like:
> >
> > val IDSet: Set[String] = ...
> > val checkID = udf { ID: String => IDSet(ID) }
> > spark.read.orc("/path/to/whole/data")
> >  .filter(checkID($"ID"))
> >  .select($"ID", $"BINARY")
> >  .write...
> >
> > Thanks for any advice!
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
-- 
Best Regards,
Ayan Guha


Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread ayan guha
Zeppelin is more useful for interactive data exploration. If tye reports
are known beforehand then any good reporting tool should work, such as
tablaue, qlic, power bi etc. zeppelin is not fit for this use case.

On Mon, 17 Apr 2017 at 6:57 pm, Gaurav Pandya 
wrote:

> Thanks Jorn. Yes, I will precalculate the results. Do you think Zeppelin
> can work here?
>
> On Mon, Apr 17, 2017 at 1:41 PM, Jörn Franke  wrote:
>
>> Processing through Spark is fine, but I do not recommend that each of the
>> users triggers a Spark query. So either you precalculate the reports in
>> Spark so that the reports themselves do not trigger Spark queries or you
>> have a database that serves the report. For the latter case there are tons
>> of commercial tools. Depending on the type of report you can also use a
>> custom report tool or write your own dashboard with ds3.js visualizations.
>>
>> On 17. Apr 2017, at 09:49, Gaurav Pandya  wrote:
>>
>> Thanks for the revert Jorn.
>> In my case, I am going to put the analysis on e-commerce website so
>> naturally users will be more and it will keep growing when e-commerce
>> website captures market. Users will not be doing any analysis here. Reports
>> will show their purchasing behaviour and pattern (kind of Machine learning
>> stuff).
>> Please note that all processing will be done in Spark here. Please share
>> your thoughts. Thanks again.
>>
>> On Mon, Apr 17, 2017 at 12:58 PM, Jörn Franke 
>> wrote:
>>
>>> I think it highly depends on your requirements. There are various tools
>>> for analyzing and visualizing data. How many concurrent users do you have?
>>> What analysis do they do? How much data is involved? Do they have to
>>> process the data all the time or can they live with sampling which
>>> increases performance and response time significantly.
>>> In lambda architecture terms you may want to think about different
>>> technologies in the serving layer.
>>>
>>> > On 17. Apr 2017, at 06:55, Gaurav1809  wrote:
>>> >
>>> > Hi All, I am looking for a data visualization (and analytics) tool. My
>>> > processing is done through Spark. There are many tools available
>>> around us.
>>> > I got some suggestions on Apache Zeppelin too? Can anybody throw some
>>> light
>>> > on its power and capabilities when it comes to data analytics and
>>> > visualization? If there are any better options than this, do suggest
>>> too.
>>> > One of the options came to me was Kibana (from ELK stack). Thanks.
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-analytics-visualization-tp28604.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>
>>
> --
Best Regards,
Ayan Guha


Re: Invalidating/Remove complete mapWithState state

2017-04-17 Thread ayan guha
It sounds like you want to stop the stream process, wipe out the check
point and restart?

On Mon, 17 Apr 2017 at 10:13 pm, Matthias Niehoff <
matthias.nieh...@codecentric.de> wrote:

> Hi everybody,
>
> is there a way to complete invalidate or remove the state used by
> mapWithState, not only for a given key using State#remove()?
>
> Deleting the state key by key is not an option, as a) not all possible
> keys are known(might be work around of course) and b) the number of keys is
> to big and therefore takes to long.
>
> I tried to unpersist the RDD retrieved by stateSnapshot (
> stateSnapshots().transform(_.unpersist()) ) , but this did not work as
> expected.
>
> Thank you,
>
> Matthias
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
> codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
> telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>
-- 
Best Regards,
Ayan Guha


Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets

2017-04-17 Thread ayan guha
What happens if you do not use the temp table, but directly do
df.saveAsTsble with mode append? If i have to guess without looking at the
code of your task function, i would think the name if temp table is
evaluated statically, so all threads are refering to same tsble. In other
words your app is not threadsafe

Best
Ayan
On Mon, 17 Apr 2017 at 11:45 pm, Amol Patil  wrote:

> Thanks Ryan,
>
> Each dataset has separate hive table. All hive tables belongs to same hive
> database.
>
> The idea is to ingest data in parallel in respective hive tables.
>
> If I run code sequentially for each data source, it works fine but I will
> take lot of time. We are planning to process around 30-40 different data
> sources.
>
> Please advise.
>
> Thank you,
> Amol
>
>
>
> On Monday, April 17, 2017, Ryan  wrote:
>
>> I don't think you can parallel insert into a hive table without dynamic
>> partition, for hive locking please refer to
>> https://cwiki.apache.org/confluence/display/Hive/Locking.
>>
>> Other than that, it should work.
>>
>> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil 
>> wrote:
>>
>>> Hi All,
>>>
>>> I'm writing generic pyspark program to process multiple datasets using
>>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>>> will be available at different timeframe (weekly,monthly,quarterly).
>>>
>>> My requirement is to process all the datasets in parallel by triggering
>>> job only once.
>>>
>>> In Current implementation we are using Spark CSV package for reading csv
>>> files & using python treading mechanism to trigger multiple threads
>>> --
>>> jobs = []
>>> for dict_key, dict_val in config_dict.items():
>>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>>> jobs.append(t)
>>> t.start()
>>>
>>> for x in jobs:
>>> x.join()
>>> ---
>>> And Defind task mehtod to process each dataset based config values dict
>>>
>>> -
>>> def task(sqlContex, data_source_dict):
>>> ..
>>> ...
>>> -
>>>
>>> task method,
>>> 1. create dataframe from csv file
>>> 2. then create temporary table from that dataframe.
>>> 3. finally it ingest data in to Hive table.
>>>
>>> *Issue:*
>>> 1. If I process two datasets in parallel, one dataset goes through
>>> successfully but for other dataset I'm getting error "*u'temp_table not
>>> found*" while ingesting data in to hive table. Its happening
>>> consistently either with dataset A or Dataset B
>>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>>> '+temp_table_name)
>>>
>>> I tried below things
>>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>>> in dataset name.
>>> 2. Enabled Spark Dynamic allocation (--conf
>>> spark.dynamicAllocation.enabled=true)
>>> 3. Set spark.scheduler.mode to FAIR
>>>
>>>
>>> I appreciate advise on
>>> 1. Is anything wrong in above implementation?
>>> 2. Is it good idea to process those big datasets in parallel in one job?
>>> 3. Any other solution to process multiple datasets in parallel?
>>>
>>> Thank you,
>>> Amol Patil
>>>
>>
>> --
Best Regards,
Ayan Guha


Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets

2017-04-17 Thread Amol Patil
Thanks Ryan,

Each dataset has separate hive table. All hive tables belongs to same hive
database.

The idea is to ingest data in parallel in respective hive tables.

If I run code sequentially for each data source, it works fine but I will
take lot of time. We are planning to process around 30-40 different data
sources.

Please advise.

Thank you,
Amol



On Monday, April 17, 2017, Ryan  wrote:

> I don't think you can parallel insert into a hive table without dynamic
> partition, for hive locking please refer to https://cwiki.apache.org/
> confluence/display/Hive/Locking.
>
> Other than that, it should work.
>
> On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil  > wrote:
>
>> Hi All,
>>
>> I'm writing generic pyspark program to process multiple datasets using
>> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
>> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
>> will be available at different timeframe (weekly,monthly,quarterly).
>>
>> My requirement is to process all the datasets in parallel by triggering
>> job only once.
>>
>> In Current implementation we are using Spark CSV package for reading csv
>> files & using python treading mechanism to trigger multiple threads
>> --
>> jobs = []
>> for dict_key, dict_val in config_dict.items():
>> t = threading.Thread(target=task,args=(sqlContext,dict_val))
>> jobs.append(t)
>> t.start()
>>
>> for x in jobs:
>> x.join()
>> ---
>> And Defind task mehtod to process each dataset based config values dict
>>
>> -
>> def task(sqlContex, data_source_dict):
>> ..
>> ...
>> -
>>
>> task method,
>> 1. create dataframe from csv file
>> 2. then create temporary table from that dataframe.
>> 3. finally it ingest data in to Hive table.
>>
>> *Issue:*
>> 1. If I process two datasets in parallel, one dataset goes through
>> successfully but for other dataset I'm getting error "*u'temp_table not
>> found*" while ingesting data in to hive table. Its happening
>> consistently either with dataset A or Dataset B
>> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
>> '+temp_table_name)
>>
>> I tried below things
>> 1. I'm creating dataframe name & temporary tabel name dynamically based
>> in dataset name.
>> 2. Enabled Spark Dynamic allocation (--conf spark.dynamicAllocation.enable
>> d=true)
>> 3. Set spark.scheduler.mode to FAIR
>>
>>
>> I appreciate advise on
>> 1. Is anything wrong in above implementation?
>> 2. Is it good idea to process those big datasets in parallel in one job?
>> 3. Any other solution to process multiple datasets in parallel?
>>
>> Thank you,
>> Amol Patil
>>
>
>


Re: how to add new column using regular expression within pyspark dataframe

2017-04-17 Thread Павел
On Mon, Apr 17, 2017 at 3:25 PM, Zeming Yu  wrote:
> I've got a dataframe with a column looking like this:
>
> display(flight.select("duration").show())
>
> ++
> |duration|
> ++
> |  15h10m|
> |   17h0m|
> |  21h25m|
> |  14h25m|
> |  14h30m|
> ++
> only showing top 20 rows
>
>
>
> I need to extract the hour as a number and store it as an additional column
> within the same dataframe. What's the best way to do that?

You don't actually need to either switch to rdd context or use python
regexps here, which are slow. I'd suggest to try the "split" dataframe
sql function and the "getItem" column method. Bear in mind the
boundary case when duration is less than 1 hour, i.e. it might be
either 30m or 0h30m.

--
Pavel Knoblokh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Memory problems with simple ETL in Pyspark

2017-04-17 Thread ayan guha
Good to know it worked. In case some of the job still failed can indicate
skew in your dataset. You may want to think of a partition by function.

Also, do you still see containers killed by yarn? If so, at what point? You
should see something like your app is trying to use x gb while yarn can
provide only y gb. You have option to go higher on executor memory little
more, maybe till 18G with 2G overhead. Finally you may want to tweak memory
fraction settings a little to see if you can salvage failed jobs.

Best
Ayan


On Mon, 17 Apr 2017 at 5:45 am, Patrick McCarthy 
wrote:

> The partitions helped!
>
> I added repartition() and my function looks like this now:
>
> feature_df = (alldat_idx
> .withColumn('label',alldat_idx['label_val'].cast('double'))
> .groupBy('id','label')
>
> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
> .repartition(1000)
> .withColumn('num_feat',lit(feature_vec_len))
>
> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
> .drop('collect_list_is')
> .drop('num_feat'))
>
> I got a few failed containers for memory overflow, but the job was able to
> finish successfully. I tried upping the repartition as high as 4000 but a
> few still failed.
>
> For posterity's sake, where would I look for the footprint you have in
> mind? On the executor tab?
>
> Since the audience part of the task finished successfully and the failure
> was on a df that didn't touch it, it shouldn't've made a difference.
>
> Thank you!
>
> On Sat, Apr 15, 2017 at 9:07 PM, ayan guha  wrote:
>
>> What i missed is try increasing number of partitions using repartition
>>
>> On Sun, 16 Apr 2017 at 11:06 am, ayan guha  wrote:
>>
>>> It does not look like scala vs python thing. How big is your audience
>>> data store? Can it be broadcasted?
>>>
>>> What is the memory footprint you are seeing? At what point yarn is
>>> killing? Depeneding on that you may want to tweak around number of
>>> partitions of input dataset and increase number of executors
>>>
>>> Ayan
>>>
>>>
>>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <
>>> pmccar...@dstillery.com> wrote:
>>>
 Hello,

 I'm trying to build an ETL job which takes in 30-100gb of text data and
 prepares it for SparkML. I don't speak Scala so I've been trying to
 implement in PySpark on YARN, Spark 2.1.

 Despite the transformations being fairly simple, the job always fails
 by running out of executor memory.

 The input table is long (~6bn rows) but composed of three simple values:

 #
 all_data_long.printSchema()

 root
 |-- id: long (nullable = true)
 |-- label: short (nullable = true)
 |-- segment: string (nullable = true)

 #

 First I join it to a table of particular segments of interests and do
 an aggregation,

 #

 audiences.printSchema()

 root
  |-- entry: integer (nullable = true)
  |-- descr: string (nullable = true)


 print("Num in adl: {}".format(str(all_data_long.count(

 aud_str = audiences.select(audiences['entry'].cast('string'),
 audiences['descr'])

 alldata_aud = all_data_long.join(aud_str,
 all_data_long['segment']==aud_str['entry'],
 'left_outer')

 str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')

 idx_df   = str_idx.fit(alldata_aud)
 label_df =
 idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')

 id_seg = (label_df
 .filter(label_df.descr.isNotNull())
 .groupBy('id')
 .agg(collect_list('descr')))

 id_seg.write.saveAsTable("hive.id_seg")

 #

 Then, I use that StringIndexer again on the first data frame to
 featurize the segment ID

 #

 alldat_idx =
 idx_df.transform(all_data_long).withColumnRenamed('label','label_val')

 #


 My ultimate goal is to make a SparseVector, so I group the indexed
 segments by id and try to cast it into a vector

 #

 list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen,
 {v:1.0 for v in l}),VectorUDT())

 alldat_idx.cache()

 feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)

 print("alldat_dix: {}".format(str(alldat_idx.count(

 

how to add new column using regular expression within pyspark dataframe

2017-04-17 Thread Zeming Yu
I've got a dataframe with a column looking like this:

display(flight.select("duration").show())

++
|duration|
++
|  15h10m|
|   17h0m|
|  21h25m|
|  14h30m|
|  24h50m|
|  26h10m|
|  14h30m|
|   23h5m|
|  21h30m|
|  11h50m|
|  16h10m|
|  15h15m|
|  21h25m|
|  14h25m|
|  14h40m|
|   16h0m|
|  24h20m|
|  14h30m|
|  14h25m|
|  14h30m|
++
only showing top 20 rows



I need to extract the hour as a number and store it as an additional column
within the same dataframe. What's the best way to do that?


I tried the following, but it failed:

import re
def getHours(x):
  return re.match('([0-9]+(?=h))', x)
temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
temp.select("duration").show()


error message:


---Py4JJavaError
Traceback (most recent call
last) in ()  2 def
getHours(x):  3   return re.match('([0-9]+(?=h))', x)> 4 temp
= flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
  5 temp.select("duration").show()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice',
age=1)] 56 """---> 57 return
sparkSession.createDataFrame(self, schema, sampleRatio) 58  59
RDD.toDF = toDF
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in createDataFrame(self, data, schema, samplingRatio, verifySchema)
518 519 if isinstance(data, RDD):--> 520 rdd,
schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
   521 else:522 rdd, schema =
self._createFromLocal(map(prepare, data), schema)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _createFromRDD(self, rdd, schema, samplingRatio)358 """
   359 if schema is None or isinstance(schema, (list,
tuple)):--> 360 struct = self._inferSchema(rdd,
samplingRatio)361 converter =
_create_converter(struct)362 rdd = rdd.map(converter)
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
in _inferSchema(self, rdd, samplingRatio)329 :return:
:class:`pyspark.sql.types.StructType`330 """--> 331
 first = rdd.first()332 if not first:333
raise ValueError("The first row in RDD is empty, "
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in first(self)   1359 ValueError: RDD is empty   1360
"""-> 1361 rs = self.take(1)   1362 if rs:   1363
   return rs[0]
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
in take(self, num)   13411342 p = range(partsScanned,
min(partsScanned + numPartsToTry, totalParts))-> 1343 res
= self.context.runJob(self, takeUpToNumLeft, p)   13441345
items += res
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
in runJob(self, rdd, partitionFunc, partitions, allowLocal)963
# SparkContext#runJob.964 mappedRDD =
rdd.mapPartitions(partitionFunc)--> 965 port =
self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))967
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
in __call__(self, *args)   1131 answer =
self.gateway_client.send_command(command)   1132 return_value
= get_return_value(-> 1133 answer, self.gateway_client,
self.target_id, self.name)   11341135 for temp_arg in
temp_args:
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
in deco(*a, **kw) 61 def deco(*a, **kw): 62
try:---> 63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py
in get_return_value(answer, gateway_client, target_id, name)317
 raise Py4JJavaError(318 "An error
occurred while calling {0}{1}{2}.\n".--> 319
format(target_id, ".", name), value)320 else:321
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 75.0 failed 1 times, most recent failure: Lost task
0.0 in stage 75.0 (TID 1035, localhost, executor driver):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 174, in main
  File 

Invalidating/Remove complete mapWithState state

2017-04-17 Thread Matthias Niehoff
Hi everybody,

is there a way to complete invalidate or remove the state used by
mapWithState, not only for a given key using State#remove()?

Deleting the state key by key is not an option, as a) not all possible keys
are known(might be work around of course) and b) the number of keys is to
big and therefore takes to long.

I tried to unpersist the RDD retrieved by stateSnapshot (
stateSnapshots().transform(_.unpersist()) ) , but this did not work as
expected.

Thank you,

Matthias
-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting
codecentric AG | Hochstraße 11 | 42697 Solingen | Deutschland
telefon: +49 (0) 1721702676 <%2B49%20%280%29%20172.1702676>
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Spark-shell's performance

2017-04-17 Thread Richard Hanson
I am playing with some data using (stand alone) spark-shell (Spark version 
1.6.0) by executing `spark-shell`. The flow is simple; a bit like cp - 
basically moving local 100k files (the max size is 190k) to S3. Memory is 
configured as below


export SPARK_DRIVER_MEMORY=8192M
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=8192M
export SPARK_EXECUTOR_CORES=4
export SPARK_EXECUTOR_MEMORY=2048M


But total time spent on moving those files to S3 took roughly 30 mins. The 
resident memory I found is roughly 3.820g (checking with top -p ). This 
seems to me there are still rooms to speed it up, though this is only for 
testing purpose. So I would like to know if any other parameters I can change 
to improve spark-shell's performance? Is the memory setup above correct? 


Thanks. 

Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread Gaurav Pandya
Thanks Jorn. Yes, I will precalculate the results. Do you think Zeppelin
can work here?

On Mon, Apr 17, 2017 at 1:41 PM, Jörn Franke  wrote:

> Processing through Spark is fine, but I do not recommend that each of the
> users triggers a Spark query. So either you precalculate the reports in
> Spark so that the reports themselves do not trigger Spark queries or you
> have a database that serves the report. For the latter case there are tons
> of commercial tools. Depending on the type of report you can also use a
> custom report tool or write your own dashboard with ds3.js visualizations.
>
> On 17. Apr 2017, at 09:49, Gaurav Pandya  wrote:
>
> Thanks for the revert Jorn.
> In my case, I am going to put the analysis on e-commerce website so
> naturally users will be more and it will keep growing when e-commerce
> website captures market. Users will not be doing any analysis here. Reports
> will show their purchasing behaviour and pattern (kind of Machine learning
> stuff).
> Please note that all processing will be done in Spark here. Please share
> your thoughts. Thanks again.
>
> On Mon, Apr 17, 2017 at 12:58 PM, Jörn Franke 
> wrote:
>
>> I think it highly depends on your requirements. There are various tools
>> for analyzing and visualizing data. How many concurrent users do you have?
>> What analysis do they do? How much data is involved? Do they have to
>> process the data all the time or can they live with sampling which
>> increases performance and response time significantly.
>> In lambda architecture terms you may want to think about different
>> technologies in the serving layer.
>>
>> > On 17. Apr 2017, at 06:55, Gaurav1809  wrote:
>> >
>> > Hi All, I am looking for a data visualization (and analytics) tool. My
>> > processing is done through Spark. There are many tools available around
>> us.
>> > I got some suggestions on Apache Zeppelin too? Can anybody throw some
>> light
>> > on its power and capabilities when it comes to data analytics and
>> > visualization? If there are any better options than this, do suggest
>> too.
>> > One of the options came to me was Kibana (from ELK stack). Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-a
>> nalytics-visualization-tp28604.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>


答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread 莫涛
Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke 
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao  wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread Ryan
how about the event timeline on executors? It seems add more executor could
help.

1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that
states the ppd should work. And I think "only for matched ones the binary
data is read" is true if proper index is configured. The row group wouldn't
be read if the predicate isn't satisfied due to index.

2. It is absolutely true the performance gain depends on the id
distribution...

On Mon, Apr 17, 2017 at 4:23 PM, 莫涛  wrote:

> Hi Ryan,
>
>
> The attachment is a screen shot for the spark job and this is the only
> stage for this job.
>
> I've changed the partition size to 1GB by "--conf spark.sql.files.
> maxPartitionBytes=1073741824 <010%207374%201824>".
>
>
> 1. spark-orc seems not that smart. The input size is almost the whole
> data. I guess "only for matched ones the binary data is read" is not true
> as orc does not know the offset of each BINARY so things like seek could
> not happen
>
> 2. I've tried orc and it does skip the partition that has no hit. This
> could be a solution but the performance depends on the distribution of the
> given ID list. No partition could be skipped in the worst case.
>
>
> Mo Tao
>
>
>
> --
> *发件人:* Ryan 
> *发送时间:* 2017年4月17日 15:42:46
> *收件人:* 莫涛
> *抄送:* user
> *主题:* Re: 答复: How to store 10M records in HDFS to speed up further
> filtering?
>
> 1. Per my understanding, for orc files, it should push down the filters,
> which means all id columns will be scanned but only for matched ones the
> binary data is read. I haven't dig into spark-orc reader though..
>
> 2. orc itself have row group index and bloom filter index. you may try
> configurations like 'orc.bloom.filter.columns' on the source table first.
> From the spark side, with mapPartitions, it's possible to build sort of
> index for each partition.
>
> And could you check how many tasks does the filter stage have? maybe
> there's too few partitions..
>
> On Mon, Apr 17, 2017 at 3:01 PM, 莫涛  wrote:
>
>> Hi Ryan,
>>
>>
>> 1. "expected qps and response time for the filter request"
>>
>> I expect that only the requested BINARY are scanned instead of all
>> records, so the response time would be "10K * 5MB / disk read speed", or
>> several times of this.
>>
>> In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB
>> data takes about 6 hours now. It should becomes several minutes as expected.
>>
>>
>> 2. "build a search tree using ids within each partition to act like an
>> index, or create a bloom filter to see if current partition would have any
>> hit"
>>
>> Sounds like the thing I'm looking for!
>>
>> Could you kindly provide some links for reference? I found nothing in
>> spark document about index or bloom filter working inside partition.
>>
>>
>> Thanks very much!
>>
>>
>> Mo Tao
>>
>> --
>> *发件人:* Ryan 
>> *发送时间:* 2017年4月17日 14:32:00
>> *收件人:* 莫涛
>> *抄送:* user
>> *主题:* Re: How to store 10M records in HDFS to speed up further filtering?
>>
>> you can build a search tree using ids within each partition to act like
>> an index, or create a bloom filter to see if current partition would have
>> any hit.
>>
>> What's your expected qps and response time for the filter request?
>>
>>
>> On Mon, Apr 17, 2017 at 2:23 PM, MoTao  wrote:
>>
>>> Hi all,
>>>
>>> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
>>> average.
>>> In my daily application, I need to filter out 10K BINARY according to an
>>> ID
>>> list.
>>> How should I store the whole data to make the filtering faster?
>>>
>>> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
>>> and column-based format (orc).
>>> However, both of them require to scan almost ALL records, making the
>>> filtering stage very very slow.
>>> The code block for filtering looks like:
>>>
>>> val IDSet: Set[String] = ...
>>> val checkID = udf { ID: String => IDSet(ID) }
>>> spark.read.orc("/path/to/whole/data")
>>>   .filter(checkID($"ID"))
>>>   .select($"ID", $"BINARY")
>>>   .write...
>>>
>>> Thanks for any advice!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-sp
>>> eed-up-further-filtering-tp28605.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread 莫涛
Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage 
for this job.

I've changed the partition size to 1GB by "--conf 
spark.sql.files.maxPartitionBytes=1073741824".


1. spark-orc seems not that smart. The input size is almost the whole data. I 
guess "only for matched ones the binary data is read" is not true as orc does 
not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be 
a solution but the performance depends on the distribution of the given ID 
list. No partition could be skipped in the worst case.


Mo Tao




发件人: Ryan 
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which 
means all id columns will be scanned but only for matched ones the binary data 
is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try 
configurations like 'orc.bloom.filter.columns' on the source table first. From 
the spark side, with mapPartitions, it's possible to build sort of index for 
each partition.

And could you check how many tasks does the filter stage have? maybe there's 
too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 
> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao


发件人: Ryan >
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread Jörn Franke
Processing through Spark is fine, but I do not recommend that each of the users 
triggers a Spark query. So either you precalculate the reports in Spark so that 
the reports themselves do not trigger Spark queries or you have a database that 
serves the report. For the latter case there are tons of commercial tools. 
Depending on the type of report you can also use a custom report tool or write 
your own dashboard with ds3.js visualizations.

> On 17. Apr 2017, at 09:49, Gaurav Pandya  wrote:
> 
> Thanks for the revert Jorn.
> In my case, I am going to put the analysis on e-commerce website so naturally 
> users will be more and it will keep growing when e-commerce website captures 
> market. Users will not be doing any analysis here. Reports will show their 
> purchasing behaviour and pattern (kind of Machine learning stuff).
> Please note that all processing will be done in Spark here. Please share your 
> thoughts. Thanks again.
> 
>> On Mon, Apr 17, 2017 at 12:58 PM, Jörn Franke  wrote:
>> I think it highly depends on your requirements. There are various tools for 
>> analyzing and visualizing data. How many concurrent users do you have? What 
>> analysis do they do? How much data is involved? Do they have to process the 
>> data all the time or can they live with sampling which increases performance 
>> and response time significantly.
>> In lambda architecture terms you may want to think about different 
>> technologies in the serving layer.
>> 
>> > On 17. Apr 2017, at 06:55, Gaurav1809  wrote:
>> >
>> > Hi All, I am looking for a data visualization (and analytics) tool. My
>> > processing is done through Spark. There are many tools available around us.
>> > I got some suggestions on Apache Zeppelin too? Can anybody throw some light
>> > on its power and capabilities when it comes to data analytics and
>> > visualization? If there are any better options than this, do suggest too.
>> > One of the options came to me was Kibana (from ELK stack). Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-analytics-visualization-tp28604.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
> 


Re: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread Jörn Franke
You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient. 
What is in the 5 MB binary data? 
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao  wrote:
> 
> Hi all,
> 
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
> 
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
> 
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
> 
> Thanks for any advice!
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread Gaurav Pandya
Thanks for the revert Jorn.
In my case, I am going to put the analysis on e-commerce website so
naturally users will be more and it will keep growing when e-commerce
website captures market. Users will not be doing any analysis here. Reports
will show their purchasing behaviour and pattern (kind of Machine learning
stuff).
Please note that all processing will be done in Spark here. Please share
your thoughts. Thanks again.

On Mon, Apr 17, 2017 at 12:58 PM, Jörn Franke  wrote:

> I think it highly depends on your requirements. There are various tools
> for analyzing and visualizing data. How many concurrent users do you have?
> What analysis do they do? How much data is involved? Do they have to
> process the data all the time or can they live with sampling which
> increases performance and response time significantly.
> In lambda architecture terms you may want to think about different
> technologies in the serving layer.
>
> > On 17. Apr 2017, at 06:55, Gaurav1809  wrote:
> >
> > Hi All, I am looking for a data visualization (and analytics) tool. My
> > processing is done through Spark. There are many tools available around
> us.
> > I got some suggestions on Apache Zeppelin too? Can anybody throw some
> light
> > on its power and capabilities when it comes to data analytics and
> > visualization? If there are any better options than this, do suggest too.
> > One of the options came to me was Kibana (from ELK stack). Thanks.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-
> analytics-visualization-tp28604.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread Ryan
1. Per my understanding, for orc files, it should push down the filters,
which means all id columns will be scanned but only for matched ones the
binary data is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try
configurations like 'orc.bloom.filter.columns' on the source table first.
>From the spark side, with mapPartitions, it's possible to build sort of
index for each partition.

And could you check how many tasks does the filter stage have? maybe
there's too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛  wrote:

> Hi Ryan,
>
>
> 1. "expected qps and response time for the filter request"
>
> I expect that only the requested BINARY are scanned instead of all
> records, so the response time would be "10K * 5MB / disk read speed", or
> several times of this.
>
> In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB
> data takes about 6 hours now. It should becomes several minutes as expected.
>
>
> 2. "build a search tree using ids within each partition to act like an
> index, or create a bloom filter to see if current partition would have any
> hit"
>
> Sounds like the thing I'm looking for!
>
> Could you kindly provide some links for reference? I found nothing in
> spark document about index or bloom filter working inside partition.
>
>
> Thanks very much!
>
>
> Mo Tao
>
> --
> *发件人:* Ryan 
> *发送时间:* 2017年4月17日 14:32:00
> *收件人:* 莫涛
> *抄送:* user
> *主题:* Re: How to store 10M records in HDFS to speed up further filtering?
>
> you can build a search tree using ids within each partition to act like an
> index, or create a bloom filter to see if current partition would have any
> hit.
>
> What's your expected qps and response time for the filter request?
>
>
> On Mon, Apr 17, 2017 at 2:23 PM, MoTao  wrote:
>
>> Hi all,
>>
>> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
>> average.
>> In my daily application, I need to filter out 10K BINARY according to an
>> ID
>> list.
>> How should I store the whole data to make the filtering faster?
>>
>> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
>> and column-based format (orc).
>> However, both of them require to scan almost ALL records, making the
>> filtering stage very very slow.
>> The code block for filtering looks like:
>>
>> val IDSet: Set[String] = ...
>> val checkID = udf { ID: String => IDSet(ID) }
>> spark.read.orc("/path/to/whole/data")
>>   .filter(checkID($"ID"))
>>   .select($"ID", $"BINARY")
>>   .write...
>>
>> Thanks for any advice!
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-sp
>> eed-up-further-filtering-tp28605.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Shall I use Apache Zeppelin for data analytics & visualization?

2017-04-17 Thread Jörn Franke
I think it highly depends on your requirements. There are various tools for 
analyzing and visualizing data. How many concurrent users do you have? What 
analysis do they do? How much data is involved? Do they have to process the 
data all the time or can they live with sampling which increases performance 
and response time significantly.
In lambda architecture terms you may want to think about different technologies 
in the serving layer.

> On 17. Apr 2017, at 06:55, Gaurav1809  wrote:
> 
> Hi All, I am looking for a data visualization (and analytics) tool. My
> processing is done through Spark. There are many tools available around us.
> I got some suggestions on Apache Zeppelin too? Can anybody throw some light
> on its power and capabilities when it comes to data analytics and
> visualization? If there are any better options than this, do suggest too.
> One of the options came to me was Kibana (from ELK stack). Thanks.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Shall-I-use-Apache-Zeppelin-for-data-analytics-visualization-tp28604.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread 莫涛
Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao


发件人: Ryan 
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets

2017-04-17 Thread Ryan
I don't think you can parallel insert into a hive table without dynamic
partition, for hive locking please refer to
https://cwiki.apache.org/confluence/display/Hive/Locking.

Other than that, it should work.

On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil  wrote:

> Hi All,
>
> I'm writing generic pyspark program to process multiple datasets using
> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
> will be available at different timeframe (weekly,monthly,quarterly).
>
> My requirement is to process all the datasets in parallel by triggering
> job only once.
>
> In Current implementation we are using Spark CSV package for reading csv
> files & using python treading mechanism to trigger multiple threads
> --
> jobs = []
> for dict_key, dict_val in config_dict.items():
> t = threading.Thread(target=task,args=(sqlContext,dict_val))
> jobs.append(t)
> t.start()
>
> for x in jobs:
> x.join()
> ---
> And Defind task mehtod to process each dataset based config values dict
>
> -
> def task(sqlContex, data_source_dict):
> ..
> ...
> -
>
> task method,
> 1. create dataframe from csv file
> 2. then create temporary table from that dataframe.
> 3. finally it ingest data in to Hive table.
>
> *Issue:*
> 1. If I process two datasets in parallel, one dataset goes through
> successfully but for other dataset I'm getting error "*u'temp_table not
> found*" while ingesting data in to hive table. Its happening consistently
> either with dataset A or Dataset B
> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
> '+temp_table_name)
>
> I tried below things
> 1. I'm creating dataframe name & temporary tabel name dynamically based in
> dataset name.
> 2. Enabled Spark Dynamic allocation (--conf spark.dynamicAllocation.
> enabled=true)
> 3. Set spark.scheduler.mode to FAIR
>
>
> I appreciate advise on
> 1. Is anything wrong in above implementation?
> 2. Is it good idea to process those big datasets in parallel in one job?
> 3. Any other solution to process multiple datasets in parallel?
>
> Thank you,
> Amol Patil
>


Re: How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread Ryan
you can build a search tree using ids within each partition to act like an
index, or create a bloom filter to see if current partition would have any
hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao  wrote:

> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>   .filter(checkID($"ID"))
>   .select($"ID", $"BINARY")
>   .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-
> speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to store 10M records in HDFS to speed up further filtering?

2017-04-17 Thread MoTao
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org