Re: Best way to process this dataset

2018-06-19 Thread Matteo Cossu
Single machine? Any other framework will perform better than Spark

On Tue, 19 Jun 2018 at 09:40, Aakash Basu 
wrote:

> Georg, just asking, can Pandas handle such a big dataset? If that data is
> further passed into using any of the sklearn modules?
>
> On Tue, Jun 19, 2018 at 10:35 AM, Georg Heiler 
> wrote:
>
>> use pandas or dask
>>
>> If you do want to use spark store the dataset as parquet / orc. And then
>> continue to perform analytical queries on that dataset.
>>
>> Raymond Xie  schrieb am Di., 19. Juni 2018 um
>> 04:29 Uhr:
>>
>>> I have a 3.6GB csv dataset (4 columns, 100,150,807 rows), my environment
>>> is 20GB ssd harddisk and 2GB RAM.
>>>
>>> The dataset comes with
>>> User ID: 987,994
>>> Item ID: 4,162,024
>>> Category ID: 9,439
>>> Behavior type ('pv', 'buy', 'cart', 'fav')
>>> Unix Timestamp: span between November 25 to December 03, 2017
>>>
>>> I would like to hear any suggestion from you on how should I process the
>>> dataset with my current environment.
>>>
>>> Thank you.
>>>
>>> **
>>> *Sincerely yours,*
>>>
>>>
>>> *Raymond*
>>>
>>
>


Re: Help explaining explain() after DataFrame join reordering

2018-06-05 Thread Matteo Cossu
Hello,

as explained here
,
the join order can be changed by the optimizer. The difference introduced
in Spark 2.2 is that the reordering is based on statistics instead of
heuristics, that can appear "random" and for some cases decrease the
performances.
If you want to control more the join order you can define your own Rule, an
example here.


Best,

Matteo


On 1 June 2018 at 18:31, Mohamed Nadjib MAMI 
wrote:

> Dear Sparkers,
>
> I'm loading into DataFrames data from 5 sources (using official
> connectors): Parquet, MongoDB, Cassandra, MySQL and CSV. I'm then joining
> those DataFrames in two different orders.
> - mongo * cassandra * jdbc * parquet * csv (random order).
> - parquet * csv * cassandra * jdbc * mongodb (optimized order).
>
> The first follows a random order, whereas the second I'm deciding based on
> some optimization techniques (can provide details for the interested
> readers or if needed here).
>
> After the evaluation on increasing sizes of data, the optimization
> techniques I developed didn't improve the performance very noticeably. I
> inspected the Logical/Physical plan of the final joined DataFrame (using
> `explain(true)`). The 1st order was respected, whereas the 2nd order, it
> turned out, wasn't respected, and MongoDB was queried first.
>
> However, that what it seemed to me, I'm not quite confident reading the
> Plans (returned using explain(true)). Could someone help explaining the
> `explain(true)` output? (pasted in this gist
> ). Is
> there a way we could enforce the given order?
>
> I'm using Spark 2.1, so I think it doesn't include the new cost-based
> optimizations (introduced in Spark 2.2).
>
> *Regards, Grüße, **Cordialement,** Recuerdos, Saluti, προσρήσεις, 问候,
> تحياتي.*
> *Mohamed Nadjib Mami*
> *Research Associate @ Fraunhofer IAIS - PhD Student @ Bonn University*
> *About me! *
> *LinkedIn *
>


Re: OneHotEncoderEstimator - java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset.withColumns

2018-05-18 Thread Matteo Cossu
Hi,

are you sure Dataset has a method withColumns?

On 15 May 2018 at 16:58, Mina Aslani  wrote:

> Hi,
>
> I get below error when I try to run oneHotEncoderEstimator example.
> https://github.com/apache/spark/blob/b74366481cc87490adf4e69d26389e
> c737548c15/examples/src/main/java/org/apache/spark/examples/ml/
> JavaOneHotEncoderEstimatorExample.java#L67
>
> Which is this line of the code:
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala#L348
>
> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
> org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
>   at 
> org.apache.spark.ml.feature.OneHotEncoderModel.transform(OneHotEncoderEstimator.scala:348)
>
>
> Can you please let me know, what is the cause? Any workaround?
>
> Seeing the example in the repo, looks like that at some point it used be
> running fine. And, now it's not working. Also, oneHotEncoder is deprecated.
>
> I really appreciate your quick response.
>
> Regards,
> Mina
>
>


Re: How to Spark can solve this example

2018-05-18 Thread Matteo Cossu
Hello Esa,
all the steps that you described can be performed with Spark. I don't know
about CEP, but Spark Streaming should be enough.

Best,

Matteo

On 18 May 2018 at 09:20, Esa Heikkinen  wrote:

> Hi
>
>
>
> I have attached fictive example (pdf-file) about processing of event
> traces from data streams (or batch data). I hope the picture of the
> attachment is clear and understandable.
>
>
>
> I would be very interested in how best to solve it with Spark. Or it is
> possible or not ? If it is possible, can it be solved for example by CEP ?
>
>
>
> Little explanations.. Data processing reads three different and parallel
> streams (or batch data): A, B and C. Each of them have events which have
> different “keys with value” (like K1-K4) or record.
>
>
>
> I would want to find all event traces, which have certain dependences or
> patterns between streams (or batches). To find pattern there are three
> steps:
>
> 1)  Searches an event that have value “X” in K1 in stream A and if it
> is found, stores it to global data for later use and continues next step
>
> 2)  Searches an event that have value A(K1) in K2 in stream B and if
> it is found, stores it to global data for later use and continues next step
>
> 3)  Searches an event that have value A(K1) in K1 and value B(K3) in
> K2 in stream C and if it is found, continues next step (back to step 1)
>
>
>
> If that is not possible by Spark, do you have any idea of tools, which can
> solve this ?
>
>
>
> Best, Esa
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Why doesn't spark use broadcast join?

2018-04-18 Thread Matteo Cossu
Can you check the value for spark.sql.autoBroadcastJoinThreshold?

On 29 March 2018 at 14:41, Vitaliy Pisarev 
wrote:

> I am looking at the physical plan for the following query:
>
> SELECT f1,f2,f3,...
> FROM T1
> LEFT ANTI JOIN T2 ON T1.id = T2.id
> WHERE  f1 = 'bla'
>AND f2 = 'bla2'
>AND some_date >= date_sub(current_date(), 1)
> LIMIT 100
>
> An important detail: the table 'T1' can be very large (hundreds of
> thousands of rows), but table T2 is rather small. Maximun in the thousands.
> In this particular case, the table T2 has 2 rows.
>
> In the physical plan, I see that a SortMergeJoin is performed. Despite it
> being the perfect candidate for a broadcast join.
>
> What could be the reason for this?
> Is there a way to hint the optimizer to perform a broadcast join in the
> sql syntax?
>
> I am writing this in pyspark and the query itself is over parquets stored
> in Azure blob storage.
>
>
>


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-12 Thread Matteo Cossu
I don't think it's trivial. Anyway, the naive solution would be a cross
join between user x items. But this can be very very expensive. I've
encountered once a similar problem, here how I solved it:

   - create a new RDD with (itemID, index) where the index is a unique
   integer between 0 and the number of items
   - for every user sample n items by generating randomly n distinct
   integers between 0 and the number of items (e.g. with rand.randint()), so
   you have a new RDD (userID, [sample_items])
   - flatten all the list in the previously created RDD and join them back
   with the RDD with (itemID, index) using index as join attribute

You can do the same things with DataFrame using UDFs.

On 11 April 2018 at 23:01, surender kumar <skiit...@yahoo.co.uk> wrote:

> right, this is what I did when I said I tried to persist and create an RDD
> out of it to sample from. But how to do for each user?
> You have one rdd of users on one hand and rdd of items on the other. How
> to go from here? Am I missing something trivial?
>
>
> On Thursday, 12 April, 2018, 2:10:51 AM IST, Matteo Cossu <
> elco...@gmail.com> wrote:
>
>
> Why broadcasting this list then? You should use an RDD or DataFrame. For
> example, RDD has a method sample() that returns a random sample from it.
>
> On 11 April 2018 at 22:34, surender kumar <skiit...@yahoo.co.uk.invalid>
> wrote:
>
> I'm using pySpark.
> I've list of 1 million items (all float values ) and 1 million users. for
> each user I want to sample randomly some items from the item list.
> Broadcasting the item list results in Outofmemory error on the driver,
> tried setting driver memory till 10G.  I tried to persist this array on
> disk but I'm not able to figure out a way to read the same on the workers.
>
> Any suggestion would be appreciated.
>
>
>


Re: Broadcasting huge array or persisting on HDFS to read on executors - both not working

2018-04-11 Thread Matteo Cossu
Why broadcasting this list then? You should use an RDD or DataFrame. For
example, RDD has a method sample() that returns a random sample from it.

On 11 April 2018 at 22:34, surender kumar 
wrote:

> I'm using pySpark.
> I've list of 1 million items (all float values ) and 1 million users. for
> each user I want to sample randomly some items from the item list.
> Broadcasting the item list results in Outofmemory error on the driver,
> tried setting driver memory till 10G.  I tried to persist this array on
> disk but I'm not able to figure out a way to read the same on the workers.
>
> Any suggestion would be appreciated.
>


Re: can udaf's return complex types?

2018-02-13 Thread Matteo Cossu
Hello,

yes, sure they can return complex types. For example, the functions
collect_list and collect_set return an ArrayType.

On 10 February 2018 at 14:28, kant kodali  wrote:

> Hi All,
>
> Can UDAF's return complex types? like say a Map with key as an Integer and
> the value as an Array of strings?
>
> For Example say I have the following *input dataframe*
>
> id | name | amount
> -
> 1 |  foo | 10
> 2 |  bar | 15
> 1 |  car | 20
> 1 |  bus | 20
>
> and my *target/output data frame* is
>
> id | my_sum_along_with_names
> -
> 1  | Map(key -> 50, value -> [foo, car, bus])
> 2  | Map(key -> 15, value -> [bar])
>
> I am looking for a UDAF solution so I can use it in my raw sql query.
>
> Thanks!
>
>
>
>
>
>
>
>
>


Re: [Spark DataFrame]: Passing DataFrame to custom method results in NullPointerException

2018-01-22 Thread Matteo Cossu
Hello,
I did not understand very well your question.
However, I can tell you that if you do .collect() on a RDD you are
collecting all the data in the driver node. For this reason, you should use
it only when the RDD is very small.
Your function "validate_hostname" depends on a DataFrame. It's not possible
to refer a DataFrame from a worker node, that's why that operation doesn't
work. In the other case it works because the "map" is a function executed
in the driver, not an RDD's method.
In these cases you could use broadcast variables, but I have the intuition
that, in general, you are using the wrong approach to solve the problem.

Best Regards,

Matteo Cossu


On 15 January 2018 at 12:56, <abdul.h.huss...@bt.com> wrote:

> Hi,
>
>
>
> My Spark app is mapping lines from a text file to case classes stored
> within an RDD.
>
>
>
> When I run the following code on this rdd:
>
> .collect.map(line => if(validate_hostname(line, data_frame))
> line).foreach(println)
>
>
>
> It correctly calls the method validate_hostname by passing the case class
> and another data_frame defined within the main method. Unfortunately the
> above map only returns a TraversableLike collection so I can’t do
> transformations and joins on this data structure so I’m tried to apply a
> filter on the rdd with the following code:
>
> .filter(line => validate_hostname(line, data_frame)).count()
>
>
>
> Unfortunately the above method with filtering the rdd does not pass the
> data_frame so I get a NullPointerException though it correctly passes the
> case class which I print within the method.
>
>
>
> Where am I going wrong?
>
>
>
> When
>
>
>
> Regards,
>
> Abdul Haseeb Hussain
>


Re: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?

2017-10-07 Thread Matteo Cossu
Hello,
I think you should use *from_json *from spark.sql.functions

to parse the json string and convert it to a StructType. Afterwards, you
can create a new DataSet by selecting the columns you want.

On 7 October 2017 at 09:30, kant kodali  wrote:

> I have a Dataset ds which consists of json rows.
>
> *Sample Json Row (This is just an example of one row in the dataset)*
>
> [
> {"name": "foo", "address": {"state": "CA", "country": "USA"}, 
> "docs":[{"subject": "english", "year": 2016}]}
> {"name": "bar", "address": {"state": "OH", "country": "USA"}, 
> "docs":[{"subject": "math", "year": 2017}]}
>
> ]
>
> ds.printSchema()
>
> root
>  |-- value: string (nullable = true)
>
> Now I want to convert into the following dataset using Spark 2.2.0
>
> name  | address   |  docs
> --
> "foo" | {"state": "CA", "country": "USA"} | [{"subject": "english", "year": 
> 2016}]
> "bar" | {"state": "OH", "country": "USA"} | [{"subject": "math", "year": 
> 2017}]
>
> Preferably Java but Scala is also fine as long as there are functions
> available in Java API
>


Re: Trying to connect Spark 1.6 to Hive

2017-08-09 Thread Matteo Cossu
Hello,
try to use these options when starting Spark:

*--conf "spark.driver.userClassPathFirst=true" --conf
"spark.executor.userClassPathFirst=true"  *
In this way you will be sure that the executor and the driver of Spark will
use the classpath you define.

Best Regards,
Matteo Cossu


On 5 August 2017 at 23:04, toletum <tole...@toletum.org> wrote:

> Hi everybody
>
> I'm trying to connect Spark to Hive.
>
> Hive uses Derby Server for metastore_db.
>
> $SPARK_HOME/conf/hive-site.xml
>
> 
> 
>   javax.jdo.option.ConnectionURL
>   jdbc:derby://derby:1527/metastore_db;create=true
>   JDBC connect string for a JDBC metastore
> 
>
> 
>   javax.jdo.option.ConnectionDriverName
>   org.apache.derby.jdbc.ClientDriver
>   Driver class name for a JDBC metastore
> 
> 
>
> I have copied to $SPARK_HOME/lib derby.jar, derbyclient.jar, derbytools.jar
>
> Added to CLASSPATH the 3 jars too
>
> $SPARK_HOMElib/derby.jar:$SPARK_HOME/lib/derbytools.jar:
> $SPARK_HOME/lib/derbyclient.jar
>
> But spark-sql saids:
>
> org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
> The specified datastore driver ("org.apache.derby.jdbc.ClientDriver") was
> not found in the CLASSPATH. Please check your CLASSPATH specification, and
> the name of the driver.
>
> java finds the class
>
> java org.apache.derby.jdbc.ClientDriver
> Error: Main method not found in class org.apache.derby.jdbc.ClientDriver,
> please define the main method as:
>public static void main(String[] args)
> or a JavaFX application class must extend javafx.application.Application
>
> It seems Spark can't find the driver
>
>
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-18 Thread Matteo Cossu
The context you use for calling SparkSQL can be used only in the driver.
Moreover, collect() works because it takes in local memory the RDD, but it
should be used only for debugging reasons(95% of the times), if all your
data fits into a single machine memory you shouldn't use Spark at all but
some normal database.
For your problem, if you still want to use SparkSQL, just use the threads,
instead if you want to use parallelize() or foreach you should avoid
calling stuff that needs to remain in the driver.

On 17 July 2017 at 17:46, Fretz Nuson  wrote:

> I was getting NullPointerException when trying to call SparkSQL from
> foreach. After debugging, i got to know spark session is not available in
> executor and could not successfully pass it.
>
> What i am doing is  tablesRDD.foreach.collect() and it works but goes
> sequential
>
> On Mon, Jul 17, 2017 at 5:58 PM, Simon Kitching <
> simon.kitch...@unbelievable-machine.com> wrote:
>
>> Have you tried simply making a list with your tables in it, then using
>> SparkContext.makeRDD(Seq)? ie
>>
>> val tablenames = List("table1", "table2", "table3", ...)
>> val tablesRDD = sc.makeRDD(tablenames, nParallelTasks)
>> tablesRDD.foreach()
>>
>> > Am 17.07.2017 um 14:12 schrieb FN :
>> >
>> > Hi
>> > I am currently trying to parallelize reading multiple tables from Hive
>> . As
>> > part of an archival framework, i need to convert few hundred tables
>> which
>> > are in txt format to Parquet. For now i am calling a Spark SQL inside a
>> for
>> > loop for conversion. But this execute sequential and entire process
>> takes
>> > longer time to finish.
>> >
>> > I tired  submitting 4 different Spark jobs ( each having set of tables
>> to be
>> > converted), it did give me some parallelism , but i would like to do
>> this in
>> > single Spark job due to few limitation of our cluster and process
>> >
>> > Any help will be greatly appreciated
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Matteo Cossu
Hello,
have you tried to use threads instead of the loop?

On 17 July 2017 at 14:12, FN  wrote:

> Hi
> I am currently trying to parallelize reading multiple tables from Hive . As
> part of an archival framework, i need to convert few hundred tables which
> are in txt format to Parquet. For now i am calling a Spark SQL inside a for
> loop for conversion. But this execute sequential and entire process takes
> longer time to finish.
>
> I tired  submitting 4 different Spark jobs ( each having set of tables to
> be
> converted), it did give me some parallelism , but i would like to do this
> in
> single Spark job due to few limitation of our cluster and process
>
> Any help will be greatly appreciated
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>