Re: Broadcast Join and Inner Join giving different result on same DataFrame

2017-01-03 Thread Patrick
Hi,

An Update on above question: In Local[*] mode code is working fine. The
Broadcast size is 200MB, but on Yarn it the broadcast join is giving empty
result.But in Sql Query in UI, it does show BroadcastHint.

Thanks


On Fri, Dec 30, 2016 at 9:15 PM, titli batali  wrote:

> Hi,
>
> I have two dataframes which has common column Product_Id on which i have
> to perform a join operation.
>
> val transactionDF = readCSVToDataFrame(sqlCtx: SQLContext,
> pathToReadTransactions: String, transactionSchema: StructType)
> val productDF = readCSVToDataFrame(sqlCtx: SQLContext,
> pathToReadProduct:String, productSchema: StructType)
>
> As, transaction data is very large but product data is small, i would
> ideally do a  broadcast join where i braodcast productDF.
>
>  val productBroadcastDF =  broadcast(productDF)
>  val broadcastJoin = transcationDF.join(productBroadcastDF,
> "productId")
>
> Or simply,  val innerJoin = transcationDF.join(productDF, "productId")
> should give the same result as above.
>
> But If i join using simple inner join i get  dataframe  with joined values
> whereas if i do broadcast join i get empty dataframe with empty values. I
> am not able to explain this behavior. Ideally both should give the same
> result.
>
> What could have gone wrong. Any one faced the similar issue?
>
>
> Thanks,
> Prateek
>
>
>
>
>


groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Patrick
Hi,

Does groupByKey has intelligence associated with it, such that if all the
keys resides in the same partition, it should not do the shuffle?

Or user should write mapPartitions( scala groupBy code).

Which would be more efficient and what are the memory considerations?


Thanks


Efficient Spark-Sql queries when only nth Column changes

2017-02-18 Thread Patrick
Hi,

I have read 5 columns from parquet into data frame. My queries on the
parquet table is of below type:

val df1 = sqlContext.sql(select col1,col2,count(*) from table groupby
col1,col2)
val df2 = sqlContext.sql(select col1,col3,count(*) from table  groupby
col1,col3)
val df3 = sqlContext.sql(select col1,col4,count(*) from table  groupby
col1,col4)
val df4 = sqlContext.sql(select col1,col5,count(*) from table  groupby
col1,col5)

And then i require to union the results from df1 to df4 into a single df.


So basically, only the second column is changing, Is there any efficient
way to write the above queries  in Spark-Sql instead of writing 4 different
queries(OR in loop) and doing union to get the result.


Thanks


Re: Efficient Spark-Sql queries when only nth Column changes

2017-02-19 Thread Patrick
Hi,

Thanks all,

I checked with both the approaches, grouping sets worked better for me,
because i didn't want to cache it as i am specifying large fraction of
memory to Shuffle operation.
However, i could only do grouping sets using HiveContext. I am using Spark
1.5 and I think SQLContext doesnt have this functionality, so incase any
one want to use SQLContext, they need to stick to cache option.


Thanks

On Sun, Feb 19, 2017 at 3:02 AM, Yong Zhang  wrote:

> If you only need the group by in the same hierarchy logic, then you can
> group by at the lowest level, and cache it, then use the cached DF to
> derive to the higher level, so Spark will only scan the originally table
> once, and reuse the cache in the following.
>
>
> val df_base =  sqlContext.sql("select col1,col2,col3,col4,col5, count(*)
> from table groupby col1,col2,col3,col4,col5").cache
>
> df_base.registerTempTable("df_base")
>
> val df1 = sqlContext.sql("select col1, col2, count(*) from df_base group
> by col1, col2")
>
> val df2 = // similar logic
>
> Yong
> --
> *From:* Patrick 
> *Sent:* Saturday, February 18, 2017 4:23 PM
> *To:* user
> *Subject:* Efficient Spark-Sql queries when only nth Column changes
>
> Hi,
>
> I have read 5 columns from parquet into data frame. My queries on the
> parquet table is of below type:
>
> val df1 = sqlContext.sql(select col1,col2,count(*) from table groupby
> col1,col2)
> val df2 = sqlContext.sql(select col1,col3,count(*) from table  groupby
> col1,col3)
> val df3 = sqlContext.sql(select col1,col4,count(*) from table  groupby
> col1,col4)
> val df4 = sqlContext.sql(select col1,col5,count(*) from table  groupby
> col1,col5)
>
> And then i require to union the results from df1 to df4 into a single df.
>
>
> So basically, only the second column is changing, Is there any efficient
> way to write the above queries  in Spark-Sql instead of writing 4 different
> queries(OR in loop) and doing union to get the result.
>
>
> Thanks
>
>
>
>
>
>


Querying on Deeply Nested JSON Structures

2017-07-15 Thread Patrick
Hi,

We need to query deeply nested Json structure. However query is on a single
field at a nested level such as mean, median, mode.

I am aware of the sql explode function.

df = df_nested.withColumn('exploded', explode(top))

But this is too slow.

Is there any other strategy that could give us the best performance in
querying nested json in Spark Dataset.


Thanks


Complext JSON Handling in Spark 2.1

2017-07-24 Thread Patrick
Hi,

On reading a complex JSON, Spark infers schema as following:

root
 |-- header: struct (nullable = true)
 ||-- deviceId: string (nullable = true)
 ||-- sessionId: string (nullable = true)
 |-- payload: struct (nullable = true)
 ||-- deviceObjects: array (nullable = true)
 |||-- element: struct (containsNull = true)
 ||||-- additionalPayload: array (nullable = true)
 |||||-- element: struct (containsNull = true)
 ||||||-- data: struct (nullable = true)
 |||||||-- *a: struct (nullable = true)*
 ||||||||-- address: string (nullable = true)

When we save the above Json in parquet using Spark sql we get only two top
level columns "header" and "payload" in parquet.

So now we want to do a mean calculation on element  *a: struct (nullable =
true)*

With reference to the Databricks blog for handling complex JSON
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

*"when using Parquet, all struct columns will receive the same treatment as
top-level columns. Therefore, if you have filters on a nested field, you
will get the same benefits as a top-level column."*

Referring to the above statement, will parquet treat *a: struct (nullable =
true)* as top-level column struct and SQL query on the Dataset will be
optimized?

If not, do we need to externally impose the schema by exploding the complex
type before writing to parquet in order to get top-level column benefit?
What we can do with Spark 2.1, to extract the best performance over such
nested structure like *a: struct (nullable = true).*

Thanks


Re: Complext JSON Handling in Spark 2.1

2017-07-24 Thread Patrick
To avoid confusion, the query i am referring above is over some numeric
element inside *a: struct (nullable = true).*

On Mon, Jul 24, 2017 at 4:04 PM, Patrick  wrote:

> Hi,
>
> On reading a complex JSON, Spark infers schema as following:
>
> root
>  |-- header: struct (nullable = true)
>  ||-- deviceId: string (nullable = true)
>  ||-- sessionId: string (nullable = true)
>  |-- payload: struct (nullable = true)
>  ||-- deviceObjects: array (nullable = true)
>  |||-- element: struct (containsNull = true)
>  ||||-- additionalPayload: array (nullable = true)
>  |||||-- element: struct (containsNull = true)
>  ||||||-- data: struct (nullable = true)
>  |||||||-- *a: struct (nullable = true)*
>  ||||||||-- address: string (nullable = true)
>
> When we save the above Json in parquet using Spark sql we get only two top
> level columns "header" and "payload" in parquet.
>
> So now we want to do a mean calculation on element  *a: struct (nullable
> = true)*
>
> With reference to the Databricks blog for handling complex JSON
> https://databricks.com/blog/2017/02/23/working-complex-
> data-formats-structured-streaming-apache-spark-2-1.html
>
> *"when using Parquet, all struct columns will receive the same treatment
> as top-level columns. Therefore, if you have filters on a nested field, you
> will get the same benefits as a top-level column."*
>
> Referring to the above statement, will parquet treat *a: struct (nullable
> = true)* as top-level column struct and SQL query on the Dataset will be
> optimized?
>
> If not, do we need to externally impose the schema by exploding the
> complex type before writing to parquet in order to get top-level column
> benefit? What we can do with Spark 2.1, to extract the best performance
> over such nested structure like *a: struct (nullable = true).*
>
> Thanks
>
>


Re: Nested JSON Handling in Spark 2.1

2017-07-25 Thread Patrick
Hi,

I would appreciate some suggestions on how to achieve top level struct
treatment to nested JSON when stored in Parquet format. Or any other
solutions for best performance using Spark 2.1.

Thanks in advance


On Mon, Jul 24, 2017 at 4:11 PM, Patrick  wrote:

> To avoid confusion, the query i am referring above is over some numeric
> element inside *a: struct (nullable = true).*
>
> On Mon, Jul 24, 2017 at 4:04 PM, Patrick  wrote:
>
>> Hi,
>>
>> On reading a complex JSON, Spark infers schema as following:
>>
>> root
>>  |-- header: struct (nullable = true)
>>  ||-- deviceId: string (nullable = true)
>>  ||-- sessionId: string (nullable = true)
>>  |-- payload: struct (nullable = true)
>>  ||-- deviceObjects: array (nullable = true)
>>  |||-- element: struct (containsNull = true)
>>  ||||-- additionalPayload: array (nullable = true)
>>  |||||-- element: struct (containsNull = true)
>>  ||||||-- data: struct (nullable = true)
>>  |||||||-- *a: struct (nullable = true)*
>>  ||||||||-- address: string (nullable = true)
>>
>> When we save the above Json in parquet using Spark sql we get only two
>> top level columns "header" and "payload" in parquet.
>>
>> So now we want to do a mean calculation on element  *a: struct (nullable
>> = true)*
>>
>> With reference to the Databricks blog for handling complex JSON
>> https://databricks.com/blog/2017/02/23/working-complex-data-
>> formats-structured-streaming-apache-spark-2-1.html
>>
>> *"when using Parquet, all struct columns will receive the same treatment
>> as top-level columns. Therefore, if you have filters on a nested field, you
>> will get the same benefits as a top-level column."*
>>
>> Referring to the above statement, will parquet treat *a: struct
>> (nullable = true)* as top-level column struct and SQL query on the
>> Dataset will be optimized?
>>
>> If not, do we need to externally impose the schema by exploding the
>> complex type before writing to parquet in order to get top-level column
>> benefit? What we can do with Spark 2.1, to extract the best performance
>> over such nested structure like *a: struct (nullable = true).*
>>
>> Thanks
>>
>>
>
>


Re: Complex types projection handling with Spark 2 SQL and Parquet

2017-07-27 Thread Patrick
Hi ,

I am having the same issue. Has any one found solution to this.

When i convert the nested JSON to parquet. I dont see the projection
working correctly.
It still reads all the nested structure columns. Parquet does support
nested column projection.

Does Spark 2 SQL provide the column projection for nested fields.? Does
predicate push-down work for nested columns?

How we can optimize things in this case. Am i using the wrong API ?

Thanks in advance.




On Sun, Jan 29, 2017 at 12:39 AM, Antoine HOM  wrote:

> Hello everybody,
>
> I have been trying to use complex types (stored in parquet) with spark
> SQL and ended up having an issue that I can't seem to be able to solve
> cleanly.
> I was hoping, through this mail, to get some insights from the
> community, maybe I'm just missing something obvious in the way I'm
> using spark :)
>
> It seems that spark only push down projections for columns at the root
> level of the records.
> This is a big I/O issue depending on how much you use complex types,
> in my samples I ended up reading 100GB of data when using only a
> single field out of a struct (It should most likely have read only
> 1GB).
>
> I already saw this PR which sounds promising:
> https://github.com/apache/spark/pull/16578
>
> However it seems that it won't be applicable if you have multiple
> array nesting level, the main reason is that I can't seem to find how
> to reference to fields deeply nested in arrays in a Column expression.
> I can do everything within lambdas but I think the optimizer won't
> drill into it to understand that I'm only accessing a few fields.
>
> If I take the following (simplified) example:
>
> {
> trip_legs:[{
> from: "LHR",
> to: "NYC",
> taxes: [{
> type: "gov",
> amount: 12
> currency: "USD"
> }]
> }]
> }
>
> col(trip_legs.from) will return an Array of all the from fields for
> each trip_leg object.
> col(trip_legs.taxes.type) will throw an exception.
>
> So my questions are:
>   * Is there a way to reference to these deeply nested fields without
> having to specify an array index with a Column expression?
>   * If not, is there an API to force the projection of a given set of
> fields so that parquet only read this specific set of columns?
>
> In addition, regarding the handling of arrays of struct within spark sql:
>   * Has it been discussed to have a way to "reshape" an array of
> structs without using lambdas? (Like the $map/$filter/etc.. operators
> of mongodb for example)
>   * If not and I'm willing to dedicate time to code for these
> features, does someone familiar with the code base could tell me how
> disruptive this would be? And if this would be a welcome change or
> not? (most likely more appropriate for the dev mailing list though)
>
> Regards,
> Antoine
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Projection Pushdown and Predicate Pushdown in Parquet for Nested Column

2017-08-02 Thread Patrick
Hi,

I would like to know that  if  Spark has support for Projection Pushdown
and Predicate Pushdown in Parquet for nested column.?

I can see two JIRA tasks with PR.

https://issues.apache.org/jira/browse/SPARK-17636

https://issues.apache.org/jira/browse/SPARK-4502


If not, are we seeing these feature in Spark 2.3 release?

Thanks
.


Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
Hi

I have two lists:


   - List one: contains names of columns on which I want to do aggregate
   operations.
   - List two: contains the aggregate operations on which I want to perform
   on each column eg ( min, max, mean)

I am trying to use spark 2.0 dataset to achieve this. Spark provides an
agg() where you can pass a Map  (of column name and
respective aggregate operation ) as input, however I want to perform
different aggregation operations on the same column of the data and want to
collect the result in a Map where key is the aggregate
operation and Value is the result on the particular column.  If i add
different agg() to same column, the key gets updated with latest value.

Also I dont find any collectAsMap() operation that returns map of
aggregated column name as key and result as value. I get collectAsList()
but i dont know the order in which those agg() operations are run so how do
i match which list values corresponds to which agg operation.  I am able to
see the result using .show() but How can i collect the result in this case ?

Is it possible to do different aggregation on the same column in one
Job(i.e only one collect operation) using agg() operation?


Thanks in advance.


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
Ah, does it work with Dataset API or i need to convert it to RDD first ?

On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler 
wrote:

> What about the rdd stat counter? https://spark.apache.org/docs/
> 0.6.2/api/core/spark/util/StatCounter.html
>
> Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:
>
>> Hi
>>
>> I have two lists:
>>
>>
>>- List one: contains names of columns on which I want to do aggregate
>>operations.
>>- List two: contains the aggregate operations on which I want to
>>perform on each column eg ( min, max, mean)
>>
>> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
>> agg() where you can pass a Map  (of column name and
>> respective aggregate operation ) as input, however I want to perform
>> different aggregation operations on the same column of the data and want to
>> collect the result in a Map where key is the aggregate
>> operation and Value is the result on the particular column.  If i add
>> different agg() to same column, the key gets updated with latest value.
>>
>> Also I dont find any collectAsMap() operation that returns map of
>> aggregated column name as key and result as value. I get collectAsList()
>> but i dont know the order in which those agg() operations are run so how do
>> i match which list values corresponds to which agg operation.  I am able to
>> see the result using .show() but How can i collect the result in this case ?
>>
>> Is it possible to do different aggregation on the same column in one
>> Job(i.e only one collect operation) using agg() operation?
>>
>>
>> Thanks in advance.
>>
>>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
ok . i see there is a describe() function which does the stat calculation
on dataset similar to StatCounter but however i dont want to restrict my
aggregations to standard mean, stddev etc and generate some custom stats ,
or also may not run all the predefined stats but only subset of them on the
particular column.
I was thinking if we need to write some custom code which does this in one
action(job) that would work for me



On Tue, Aug 29, 2017 at 12:02 AM, Georg Heiler 
wrote:

> Rdd only
> Patrick  schrieb am Mo. 28. Aug. 2017 um 20:13:
>
>> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>>
>> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler > > wrote:
>>
>>> What about the rdd stat counter? https://spark.apache.org/docs/
>>> 0.6.2/api/core/spark/util/StatCounter.html
>>>
>>> Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:
>>>
>>>> Hi
>>>>
>>>> I have two lists:
>>>>
>>>>
>>>>- List one: contains names of columns on which I want to do
>>>>aggregate operations.
>>>>- List two: contains the aggregate operations on which I want to
>>>>perform on each column eg ( min, max, mean)
>>>>
>>>> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
>>>> agg() where you can pass a Map  (of column name and
>>>> respective aggregate operation ) as input, however I want to perform
>>>> different aggregation operations on the same column of the data and want to
>>>> collect the result in a Map where key is the aggregate
>>>> operation and Value is the result on the particular column.  If i add
>>>> different agg() to same column, the key gets updated with latest value.
>>>>
>>>> Also I dont find any collectAsMap() operation that returns map of
>>>> aggregated column name as key and result as value. I get collectAsList()
>>>> but i dont know the order in which those agg() operations are run so how do
>>>> i match which list values corresponds to which agg operation.  I am able to
>>>> see the result using .show() but How can i collect the result in this case 
>>>> ?
>>>>
>>>> Is it possible to do different aggregation on the same column in one
>>>> Job(i.e only one collect operation) using agg() operation?
>>>>
>>>>
>>>> Thanks in advance.
>>>>
>>>>
>>


Builder Pattern used by Spark source code architecture

2017-09-18 Thread Patrick
Hi,

A lot of code base of Spark is based on Builder Pattern, so i was wondering
what are the benefits that Builder Pattern brings to spark.

Some of things that comes in my mind, it is easy on garbage collection and
also user friendly API's.

Are their any other advantages with code running on distributed system, why
spark source code uses Builder Pattern heavily?


Any thoughts on source code design of spark.

Thanks in advance


Out of memory Error when using Collection Accumulator Spark 2.2

2018-02-26 Thread Patrick
Hi,

We were getting OOM error when we are accumulating the results of each
worker. We were trying to avoid collecting data to driver node instead used
accumulator as per below code snippet,

Is there any spark config to set the accumulator settings Or am i doing the
wrong way to collect the huge data set?

  CollectionAccumulator accumulate;
  Dataset bin;

bin.foreach((ForeachFunction) row -> {
  accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
});

accumulate.value().forEach(element -> {
  String[] arr = element.split("\\|");
  String count = arr[2];
  double percentage =
  (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
  PayloadBin payload = new PayloadBin(arr[0],
  arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
  binArray.add(payload);

});


18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)

18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034 on
rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)

18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
92) (14/200)

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
Java heap space

at java.util.Arrays.copyOf(Arrays.java:3181)

at java.util.ArrayList.toArray(ArrayList.java:376)

at
java.util.Collections$SynchronizedCollection.toArray(Collections.java:2024)

at java.util.ArrayList.(ArrayList.java:177)

at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:470)


Spark Mllib logistic regression setWeightCol illegal argument exception

2020-01-09 Thread Patrick
Hi Spark Users,

I am trying to solve a class imbalance problem, I figured out, spark
supports setting weight in its API but I get IIlegal Argument exception
weight column do not exist, but it do exists in the dataset. Any
recommedation to go about this problem ? I am using Pipeline API with
Logistic regression model and TestTrainSplit.

LogisticRegression l;
l.setWeightCol()


Caused by: java.lang.IllegalArgumentException: Field "weight" does not
exist.

at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)

at
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:267)

at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)

at scala.collection.AbstractMap.getOrElse(Map.scala:59)

at org.apache.spark.sql.types.StructType.apply(StructType.scala:266)

at
org.apache.spark.ml.util.SchemaUtils$.checkNumericType(SchemaUtils.scala:71)

at
org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:58)

at org.apache.spark.ml.classification.Classifier.org
$apache$spark$ml$classification$ClassifierParams$$super$validateAndTransformSchema(Classifier.scala:58)

at
org.apache.spark.ml.classification.ClassifierParams$class.validateAndTransformSchema(Classifier.scala:42)

at org.apache.spark.ml.classification.ProbabilisticClassifier.org
$apache$spark$ml$classification$ProbabilisticClassifierParams$$super$validateAndTransformSchema(ProbabilisticClassifier.scala:53)

at
org.apache.spark.ml.classification.ProbabilisticClassifierParams$class.validateAndTransformSchema(ProbabilisticClassifier.scala:37)

at org.apache.spark.ml.classification.LogisticRegression.org
$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:278)

at
org.apache.spark.ml.classification.LogisticRegressionParams$class.validateAndTransformSchema(LogisticRegression.scala:265)

at
org.apache.spark.ml.classification.LogisticRegression.validateAndTransformSchema(LogisticRegression.scala:278)

at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:144)

at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)

at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:184)

at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)

at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)

at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)

at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:184)

at
org.apache.spark.ml.tuning.ValidatorParams$class.transformSchemaImpl(ValidatorParams.scala:77)

at
org.apache.spark.ml.tuning.TrainValidationSplit.transformSchemaImpl(TrainValidationSplit.scala:67)

at
org.apache.spark.ml.tuning.TrainValidationSplit.transformSchema(TrainValidationSplit.scala:180)

at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)

at
org.apache.spark.ml.tuning.TrainValidationSplit.fit(TrainValidationSplit.scala:121)


Thanks in advance,


Building Spark 3.0.0 for Hive 1.2

2020-07-10 Thread Patrick McCarthy
I'm trying to build Spark 3.0.0 for my Yarn cluster, with Hadoop 2.7.3 and
Hive 1.2.1. I downloaded the source and created a runnable dist with

./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr
-Phive-1.2 -Phadoop-2.7 -Pyarn

We're running Spark 2.4.0 in production so I copied the hive-site.xml,
spark-env.sh and spark-defaults.conf from there.

When I try to create a SparkSession in a normal Python REPL, I get the
following uninformative error. How can I debug this? I can run the
spark-shell and get to a scala prompt with Hive access seemingly without
error.

Python 3.6.3 (default, Apr 10 2018, 16:07:04)[GCC 4.8.3 20140911 (Red
Hat 4.8.3-9)] on linuxType "help", "copyright", "credits" or "license"
for more information.>>> import os>>> import sys>>>
os.environ['SPARK_HOME'] = '/home/pmccarthy/custom-spark-3'>>>
sys.path.insert(0,os.path.join(os.environ['SPARK_HOME'],'python','lib','py4j-src.zip'))>>>
sys.path.append(os.path.join(os.environ['SPARK_HOME'],'python'))>>>
import pyspark>>> from pyspark.sql import SparkSession>>> spark =
(SparkSession.builder.enableHiveSupport().config('spark.master','local').getOrCreate())

Traceback (most recent call last):
  File "", line 1, in 
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/session.py",
line 191, in getOrCreate
session._jsparkSession.sessionState().conf().setConfString(key, value)
  File 
"/home/pmccarthy/custom-spark-3/python/lib/py4j-src.zip/py4j/java_gateway.py",
line 1305, in __call__
  File "/home/pmccarthy/custom-spark-3/python/pyspark/sql/utils.py",
line 137, in deco
raise_from(converted)
  File "", line 3, in raise_from
pyspark.sql.utils.IllegalArgumentException: 


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue in parallelization of CNN model using spark

2020-07-14 Thread Patrick McCarthy
Please don't advocate for piracy, this book is not freely available.

I own it and it's wonderful, Mr. Géron deserves to benefit from it.

On Mon, Jul 13, 2020 at 9:59 PM Anwar AliKhan 
wrote:

>  link to a free book  which may be useful.
>
> Hands-On Machine Learning with Scikit-Learn, Keras, and Tensorflow
> Concepts, Tools, and Techniques to Build Intelligent Systems by Aurélien
> Géron
>
> https://bit.ly/2zxueGt
>
>
>
>
>
>  13 Jul 2020, 15:18 Sean Owen,  wrote:
>
>> There is a multilayer perceptron implementation in Spark ML, but
>> that's not what you're looking for.
>> To parallelize model training developed using standard libraries like
>> Keras, use Horovod from Uber.
>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>
>> On Mon, Jul 13, 2020 at 6:59 AM Mukhtaj Khan  wrote:
>> >
>> > Dear Spark User
>> >
>> > I am trying to parallelize the CNN (convolutional neural network) model
>> using spark. I have developed the model using python and Keras library. The
>> model works fine on a single machine but when we try on multiple machines,
>> the execution time remains the same as sequential.
>> > Could you please tell me that there is any built-in library for CNN to
>> parallelize in spark framework. Moreover, MLLIB does not have any support
>> for CNN.
>> > Best regards
>> > Mukhtaj
>> >
>> >
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
This seems like a very expensive operation. Why do you want to write out
all the exploded values? If you just want all combinations of values, could
you instead do it at read-time with a UDF or something?

On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:

> I forgot to add an information. By "can't write" I mean it keeps processing
> and nothing happens. The job runs for hours even with a very small file and
> I have to force the stoppage.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
If you use pandas_udfs in 2.4 they should be quite performant (or at least
won't suffer serialization overhead), might be worth looking into.

I didn't run your code but one consideration is that the while loop might
be making the DAG a lot bigger than it has to be. You might see if defining
those columns with list comprehensions forming a single select() statement
makes for a smaller DAG.

On Mon, Aug 3, 2020 at 10:06 AM Henrique Oliveira  wrote:

> Hi Patrick, thank you for your quick response.
> That's exactly what I think. Actually, the result of this processing is an
> intermediate table that is going to be used for other views generation.
> Another approach I'm trying now, is to move the "explosion" step for this
> "view generation" step, this way I don't need to explode every column but
> just those used for the final client.
>
> ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
> python udfs I tried had very bad performance, but I will give it a try in
> this case. It can't be worse.
> Thanks again!
>
> Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
> pmccar...@dstillery.com> escreveu:
>
>> This seems like a very expensive operation. Why do you want to write out
>> all the exploded values? If you just want all combinations of values, could
>> you instead do it at read-time with a UDF or something?
>>
>> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>>
>>> I forgot to add an information. By "can't write" I mean it keeps
>>> processing
>>> and nothing happens. The job runs for hours even with a very small file
>>> and
>>> I have to force the stoppage.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -----
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: regexp_extract regex for extracting the columns from string

2020-08-10 Thread Patrick McCarthy
Can you simply do a string split on space, and then another on '='?

On Sun, Aug 9, 2020 at 12:00 PM anbutech  wrote:

> Hi All,
>
> I have a following info.in the data column.
>
> <1000> date=2020-08-01 time=20:50:04 name=processing id=123 session=new
> packt=20 orgin=null address=null dest=fgjglgl
>
> here I want to create a separate column for the above key value pairs after
> the integer <1000> separated by spaces.
> Is there any way to achieved it using regexp_extract inbuilt functions.i
> don't want to do it using udf function.
> apart from udf,is there any way to achieved it.
>
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Hive using Spark engine vs native spark with hive integration.

2020-10-07 Thread Patrick McCarthy
I think a lot will depend on what the scripts do. I've seen some legacy
hive scripts which were written in an awkward way (e.g. lots of subqueries,
nested explodes) because pre-spark it was the only way to express certain
logic. For fairly straightforward operations I expect Catalyst would reduce
both code to similar plans.

On Tue, Oct 6, 2020 at 12:07 PM Manu Jacob 
wrote:

> Hi All,
>
>
>
> Not sure if I need to ask this question on spark community or hive
> community.
>
>
>
> We have a set of hive scripts that runs on EMR (Tez engine). We would like
> to experiment by moving some of it onto Spark. We are planning to
> experiment with two options.
>
>
>
>1. Use the current code based on HQL, with engine set as spark.
>2. Write pure spark code in scala/python using SparkQL and hive
>integration.
>
>
>
> The first approach helps us to transition to Spark quickly but not sure if
> this is the best approach in terms of performance.  Could not find any
> reasonable comparisons of this two approaches.  It looks like writing pure
> Spark code, gives us more control to add logic and also control some of the
> performance features, for example things like caching/evicting etc.
>
>
>
>
>
> Any advice on this is much appreciated.
>
>
>
>
>
> Thanks,
>
> -Manu
>
>
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-10-30 Thread Patrick McCarthy
That's a very large vector. Is it sparse? Perhaps you'd have better luck
performing an aggregate instead of a pivot, and assembling the vector using
a UDF.

On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
 wrote:

> Hello,
>
> I have a very large long-format dataframe (several billion rows) that I'd
> like to pivot and vectorize (using the VectorAssembler), with the aim to
> reduce dimensionality using something akin to TF-IDF. Once pivoted, the
> dataframe will have ~130 million columns.
>
> The source, long-format schema looks as follows:
>
> root
>  |-- entity_id: long (nullable = false)
>  |-- attribute_id: long (nullable = false)
>  |-- event_count: integer (nullable = true)
>
> Pivoting as per the following fails, exhausting executor and driver
> memory. I am unsure whether increasing memory limits would be successful
> here as my sense is that pivoting and then using a VectorAssembler isn't
> the right approach to solving this problem.
>
> wide_frame = (
> long_frame.groupBy("entity_id")
> .pivot("attribute_id")
> .agg(F.first("event_count"))
> )
>
> Are there other Spark patterns that I should attempt in order to achieve
> my end goal of a vector of attributes for every entity?
>
> Thanks, Daniel
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Issue while installing dependencies Python Spark

2020-12-17 Thread Patrick McCarthy
I'm not very familiar with the environments on cloud clusters, but in
general I'd be reluctant to lean on setuptools or other python install
mechanisms. In the worst case, you might encounter /usr/bin/pip not having
permissions to install new packages, or even if you do a package might
require something you can't change like a libc dependency.

Perhaps you can install the .whl and its dependencies to the virtualenv on
a local machine, and then *after* the install process, package that venv?

If possible, I like conda for this approach over a vanilla venv because it
will contain all the non-python dependencies (like libc) if they're needed.


Another thing - I think there are several ways to do this, but I've had the
most success including the .zip containing my environment in
`spark.yarn.dist.archives` and then using a relative path:

os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'

dist_archives =
'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'

SparkSession.builder.
...
 .config('spark.yarn.dist.archives', dist_archives)


On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
wrote:

> Hi Users
>
> I have a wheel file , while creating it I have mentioned dependencies in
> setup.py file.
> Now I have 2 virtual envs, 1 was already there . another one I created
> just now.
>
> I have switched to new virtual env, I want spark to download the
> dependencies while doing spark-submit using wheel.
>
> Could you please help me on this?
>
> It is not downloading dependencies , instead it is pointing to older
> version of  virtual env and proceeding with the execution of spark job.
>
> Please note I have tried setting the env variables also.
> Also I have tried following options as well in spark submit
>
> --conf spark.pyspark.virtualenv.enabled=true  --conf
> spark.pyspark.virtualenv.type=native --conf
> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
> spark.pyspark.python= /path/to/venv/bin/python3 --conf
> spark.pyspark.driver.python=/path/to/venv/bin/python3
>
> This did not help too..
>
> Kind Regards,
> Sachit Murarka
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: Getting error message

2020-12-17 Thread Patrick McCarthy
'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'

You may want to change the number of failures to a higher number like 4. A
single failure on a task should be able to be tolerated, especially if
you're on a shared cluster where resources can be preempted.

 It seems that a node dies or goes off the network, so perhaps you can also
debug the logs on the failing node to see why it disappears and prevent the
failures in the first place.

On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:

> Mydomain is named by me while pasting the logs
>
> Also,  there are multiple class files in my project, if I run any 1 or 2
> at a time,  then they run fine,  sometimes they too give this error. But
> running all the classes at the same time always give this error.
>
> Once this error come, I can't run any program and on restarting the
> system, program starts running fine.
> This error goes away on
>
> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
> wrote:
>
>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on your
>> network, is it?
>>
>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>
>>> Hi,
>>>
>>> Since last few days, I am getting error message while running my
>>> project. I have searched Google for the solution but didn't got any help.
>>>
>>> Can someone help me to figure out how I could mitigate this issue?
>>>
>>>
>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while beginning
>>> fetch of 1 outstanding blocks
>>>
>>> *java.io.IOException*: Failed to connect to
>>> my-domain.com/192.168.166.8:63534
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:253*)
>>>
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>> *TransportClientFactory.java:195*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>> *NettyBlockTransferService.scala:122*)
>>>
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>> *RetryingBlockFetcher.java:141*)
>>>
>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>> *RetryingBlockFetcher.java:121*)
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>> *NettyBlockTransferService.scala:143*)
>>>
>>> at org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>> *BlockTransferService.scala:103*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>> *BlockManager.scala:1010*)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>> *BlockManager.scala:954*)
>>>
>>> at scala.Option.orElse(*Option.scala:447*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>> *BlockManager.scala:954*)
>>>
>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>> *BlockManager.scala:1092*)
>>>
>>> at
>>> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
>>> *TaskResultGetter.scala:88*)
>>>
>>> at
>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(
>>> *Utils.scala:1932*)
>>>
>>> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
>>> *TaskResultGetter.scala:63*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> *ThreadPoolExecutor.java:1149*)
>>>
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> *ThreadPoolExecutor.java:624*)
>>>
>>> at java.lang.Thread.run(*Thread.java:748*)
>>>
>>> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
>>> Permission denied: no further information:
>>> my-domain.com/192.168.166.8:63534
>>>
>>> Caused by: *java.net.SocketException*: Permission denied: no further
>>> information
>>>
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>>>
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(
>>> *SocketC

Re: Getting error message

2020-12-17 Thread Patrick McCarthy
Possibly. In that case maybe you should step back from spark and see if
there are OS-level tools to understand what's going on, like looking for
evidence of the OOM killer -
https://docs.memset.com/other/linux-s-oom-process-killer

On Thu, Dec 17, 2020 at 1:45 PM Vikas Garg  wrote:

> I am running code in a local machine that is single node machine.
>
> Getting into logs,  it looked like the host is killed.  This is happening
> very frequently an I am unable to find the reason of this.
>
> Could low memory be the reason?
>
> On Fri, 18 Dec 2020, 00:11 Patrick McCarthy, 
> wrote:
>
>> 'Job aborted due to stage failure: Task 1 in stage 39.0 failed 1 times'
>>
>> You may want to change the number of failures to a higher number like 4.
>> A single failure on a task should be able to be tolerated, especially if
>> you're on a shared cluster where resources can be preempted.
>>
>>  It seems that a node dies or goes off the network, so perhaps you can
>> also debug the logs on the failing node to see why it disappears and
>> prevent the failures in the first place.
>>
>> On Thu, Dec 17, 2020 at 1:27 PM Vikas Garg  wrote:
>>
>>> Mydomain is named by me while pasting the logs
>>>
>>> Also,  there are multiple class files in my project, if I run any 1 or 2
>>> at a time,  then they run fine,  sometimes they too give this error. But
>>> running all the classes at the same time always give this error.
>>>
>>> Once this error come, I can't run any program and on restarting the
>>> system, program starts running fine.
>>> This error goes away on
>>>
>>> On Thu, 17 Dec 2020, 23:50 Patrick McCarthy, 
>>> wrote:
>>>
>>>> my-domain.com/192.168.166.8:63534 probably isn't a valid address on
>>>> your network, is it?
>>>>
>>>> On Thu, Dec 17, 2020 at 3:03 AM Vikas Garg  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Since last few days, I am getting error message while running my
>>>>> project. I have searched Google for the solution but didn't got any help.
>>>>>
>>>>> Can someone help me to figure out how I could mitigate this issue?
>>>>>
>>>>>
>>>>> 20/12/17 13:26:57 ERROR RetryingBlockFetcher: Exception while
>>>>> beginning fetch of 1 outstanding blocks
>>>>>
>>>>> *java.io.IOException*: Failed to connect to
>>>>> my-domain.com/192.168.166.8:63534
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:253*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(
>>>>> *TransportClientFactory.java:195*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
>>>>> *NettyBlockTransferService.scala:122*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
>>>>> *RetryingBlockFetcher.java:141*)
>>>>>
>>>>> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
>>>>> *RetryingBlockFetcher.java:121*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
>>>>> *NettyBlockTransferService.scala:143*)
>>>>>
>>>>> at
>>>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(
>>>>> *BlockTransferService.scala:103*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
>>>>> *BlockManager.scala:1010*)
>>>>>
>>>>> at
>>>>> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at scala.Option.orElse(*Option.scala:447*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBlock(
>>>>> *BlockManager.scala:954*)
>>>>>
>>>>> at org.apache.spark.storage.BlockManager.getRemoteBytes(
>>>>> *BlockManager.scala:1092*)
>>>>>
>>>>> at
>>

Re: Issue while installing dependencies Python Spark

2020-12-18 Thread Patrick McCarthy
At the risk of repeating myself, this is what I was hoping to avoid when I
suggested deploying a full, zipped, conda venv.

What is your motivation for running an install process on the nodes and
risking the process failing, instead of pushing a validated environment
artifact and not having that risk? In either case you move about the same
number of bytes around.

On Fri, Dec 18, 2020 at 3:04 PM Sachit Murarka 
wrote:

> Hi Patrick/Users,
>
> I am exploring wheel file form packages for this , as this seems simple:-
>
>
> https://bytes.grubhub.com/managing-dependencies-and-artifacts-in-pyspark-7641aa89ddb7
>
> However, I am facing another issue:- I am using pandas , which needs
> numpy. Numpy is giving error!
>
>
> ImportError: Unable to import required dependencies:
> numpy:
>
> IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!
>
> Importing the numpy C-extensions failed. This error can happen for
> many reasons, often due to issues with your setup or how NumPy was
> installed.
>
> We have compiled some common reasons and troubleshooting tips at:
>
> https://numpy.org/devdocs/user/troubleshooting-importerror.html
>
> Please note and check the following:
>
>   * The Python version is: Python3.7 from "/usr/bin/python3"
>   * The NumPy version is: "1.19.4"
>
> and make sure that they are the versions you expect.
> Please carefully study the documentation linked above for further help.
>
> Original error was: No module named 'numpy.core._multiarray_umath'
>
>
>
> Kind Regards,
> Sachit Murarka
>
>
> On Thu, Dec 17, 2020 at 9:24 PM Patrick McCarthy 
> wrote:
>
>> I'm not very familiar with the environments on cloud clusters, but in
>> general I'd be reluctant to lean on setuptools or other python install
>> mechanisms. In the worst case, you might encounter /usr/bin/pip not having
>> permissions to install new packages, or even if you do a package might
>> require something you can't change like a libc dependency.
>>
>> Perhaps you can install the .whl and its dependencies to the virtualenv
>> on a local machine, and then *after* the install process, package that
>> venv?
>>
>> If possible, I like conda for this approach over a vanilla venv because
>> it will contain all the non-python dependencies (like libc) if they're
>> needed.
>>
>>
>> Another thing - I think there are several ways to do this, but I've had
>> the most success including the .zip containing my environment in
>> `spark.yarn.dist.archives` and then using a relative path:
>>
>> os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'
>>
>> dist_archives =
>> 'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'
>>
>> SparkSession.builder.
>> ...
>>  .config('spark.yarn.dist.archives', dist_archives)
>>
>>
>> On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
>> wrote:
>>
>>> Hi Users
>>>
>>> I have a wheel file , while creating it I have mentioned dependencies in
>>> setup.py file.
>>> Now I have 2 virtual envs, 1 was already there . another one I created
>>> just now.
>>>
>>> I have switched to new virtual env, I want spark to download the
>>> dependencies while doing spark-submit using wheel.
>>>
>>> Could you please help me on this?
>>>
>>> It is not downloading dependencies , instead it is pointing to older
>>> version of  virtual env and proceeding with the execution of spark job.
>>>
>>> Please note I have tried setting the env variables also.
>>> Also I have tried following options as well in spark submit
>>>
>>> --conf spark.pyspark.virtualenv.enabled=true  --conf
>>> spark.pyspark.virtualenv.type=native --conf
>>> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
>>> spark.pyspark.python= /path/to/venv/bin/python3 --conf
>>> spark.pyspark.driver.python=/path/to/venv/bin/python3
>>>
>>> This did not help too..
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Profiling options for PandasUDF (2.4.7 on yarn)

2021-05-28 Thread Patrick McCarthy
I'm trying to do a very large aggregation of sparse matrices in which my
source data looks like

root
 |-- device_id: string (nullable = true)
 |-- row_id: array (nullable = true)
 ||-- element: integer (containsNull = true)
 |-- column_id: array (nullable = true)
 ||-- element: integer (containsNull = true)



I assume each row to reflect a sparse matrix where each combination of
(row_id, column_id) has value of 1. I have a PandasUDF which performs a
GROUPED_MAP that transforms every row into a scipy.sparse.csr_matrix and,
within the group, sums the matrices before returning columns of (count,
row_id, column_id).

It works at small scale but gets unstable as I scale up. Is there a way to
profile this function in a spark session or am I limited to profiling on
pandas data frames without spark?

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


[Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-12 Thread Patrick Tucci
Hello,

Is there a way to set string comparisons to be case-insensitive globally? I
understand LOWER() can be used, but my codebase contains 27k lines of SQL
and many string comparisons. I would need to apply LOWER() to each string
literal in the code base. I'd also need to change all the ETL/import code
to apply LOWER() to each string value on import.

Current behavior:

SELECT 'ABC' = 'abc';
false
Time taken: 5.466 seconds, Fetched 1 row(s)

SELECT 'ABC' IN ('AbC', 'abc');
false
Time taken: 5.498 seconds, Fetched 1 row(s)

SELECT 'ABC' like 'Ab%'
false
Time taken: 5.439 seconds, Fetched 1 row(s)

Desired behavior would be true for all of the above with the proposed
case-insensitive flag set.

Thanks,

Patrick


RE: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-21 Thread Patrick Tucci

Is this the wrong list for this type of question?

On 2022/11/12 16:34:48 Patrick Tucci wrote:
> Hello,
>
> Is there a way to set string comparisons to be case-insensitive 
globally? I

> understand LOWER() can be used, but my codebase contains 27k lines of SQL
> and many string comparisons. I would need to apply LOWER() to each string
> literal in the code base. I'd also need to change all the ETL/import code
> to apply LOWER() to each string value on import.
>
> Current behavior:
>
> SELECT 'ABC' = 'abc';
> false
> Time taken: 5.466 seconds, Fetched 1 row(s)
>
> SELECT 'ABC' IN ('AbC', 'abc');
> false
> Time taken: 5.498 seconds, Fetched 1 row(s)
>
> SELECT 'ABC' like 'Ab%'
> false
> Time taken: 5.439 seconds, Fetched 1 row(s)
>
> Desired behavior would be true for all of the above with the proposed
> case-insensitive flag set.
>
> Thanks,
>
> Patrick
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: Re: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-22 Thread Patrick Tucci

Thanks. How would I go about formally submitting a feature request for this?

On 2022/11/21 23:47:16 Andrew Melo wrote:
> I think this is the right place, just a hard question :) As far as I
> know, there's no "case insensitive flag", so YMMV
>
> On Mon, Nov 21, 2022 at 5:40 PM Patrick Tucci  wrote:
> >
> > Is this the wrong list for this type of question?
> >
> > On 2022/11/12 16:34:48 Patrick Tucci wrote:
> > > Hello,
> > >
> > > Is there a way to set string comparisons to be case-insensitive
> > globally? I
> > > understand LOWER() can be used, but my codebase contains 27k 
lines of SQL
> > > and many string comparisons. I would need to apply LOWER() to 
each string
> > > literal in the code base. I'd also need to change all the 
ETL/import code

> > > to apply LOWER() to each string value on import.
> > >
> > > Current behavior:
> > >
> > > SELECT 'ABC' = 'abc';
> > > false
> > > Time taken: 5.466 seconds, Fetched 1 row(s)
> > >
> > > SELECT 'ABC' IN ('AbC', 'abc');
> > > false
> > > Time taken: 5.498 seconds, Fetched 1 row(s)
> > >
> > > SELECT 'ABC' like 'Ab%'
> > > false
> > > Time taken: 5.439 seconds, Fetched 1 row(s)
> > >
> > > Desired behavior would be true for all of the above with the proposed
> > > case-insensitive flag set.
> > >
> > > Thanks,
> > >
> > > Patrick
> > >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark] Getting the best row from each group

2022-12-19 Thread Patrick Tucci
Window functions don't work like traditional GROUP BYs. They allow you to
partition data and pull any relevant column, whether it's used in the
partition or not.

I'm not sure what the syntax is for PySpark, but the standard SQL would be
something like this:

WITH InputData AS
(
  SELECT 'USA' Country, 'New York' City, 900 Population
  UNION
  SELECT 'USA' Country, 'Miami', 620 Population
  UNION
  SELECT 'Ukraine' Country, 'Kyiv', 300 Population
  UNION
  SELECT 'Ukraine' Country, 'Kharkiv', 140 Population
)

 SELECT *, ROW_NUMBER() OVER(PARTITION BY Country ORDER BY Population DESC)
PopulationRank
 FROM InputData;

Results would be something like this:

CountryCity   Population PopulationRank
UkraineKyiv   3001
UkraineKharkiv1402
USANew York   9001
USAMiami  6202

Which you could further filter in another CTE or subquery where
PopulationRank = 1.

As I mentioned, I'm not sure how this translates into PySpark, but that's
the general concept in SQL.

On Mon, Dec 19, 2022 at 12:01 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

> If we only wanted to know the biggest population, max function would
> suffice. The problem is I also want the name of the city with the biggest
> population.
>
> On Mon, Dec 19, 2022 at 11:58 AM Sean Owen  wrote:
>
>> As Mich says, isn't this just max by population partitioned by country in
>> a window function?
>>
>> On Mon, Dec 19, 2022, 9:45 AM Oliver Ruebenacker <
>> oliv...@broadinstitute.org> wrote:
>>
>>>
>>>  Hello,
>>>
>>>   Thank you for the response!
>>>
>>>   I can think of two ways to get the largest city by country, but both
>>> seem to be inefficient:
>>>
>>>   (1) I could group by country, sort each group by population, add the
>>> row number within each group, and then retain only cities with a row number
>>> equal to 1. But it seems wasteful to sort everything when I only want the
>>> largest of each country
>>>
>>>   (2) I could group by country, get the maximum city population for each
>>> country, join that with the original data frame, and then retain only
>>> cities with population equal to the maximum population in the country. But
>>> that seems also expensive because I need to join.
>>>
>>>   Am I missing something?
>>>
>>>   Thanks!
>>>
>>>  Best, Oliver
>>>
>>> On Mon, Dec 19, 2022 at 10:59 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 In spark you can use windowing function
 s to
 achieve this

 HTH


view my Linkedin profile
 


  https://en.everybodywiki.com/Mich_Talebzadeh



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Mon, 19 Dec 2022 at 15:28, Oliver Ruebenacker <
 oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is
> the maximum of the group? For example, imagine a DataFrame containing all
> major cities in the world, with three columns: (1) City name (2) Country
> (3) population. How would I get a DataFrame that only contains the largest
> city in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network ,
> Flannick Lab , Broad Institute
> 
>

>>>
>>> --
>>> Oliver Ruebenacker, Ph.D. (he)
>>> Senior Software Engineer, Knowledge Portal Network , 
>>> Flannick
>>> Lab , Broad Institute
>>> 
>>>
>>
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hello,

I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master node
has 2 cores and 8GB of RAM. There is a single worker node with 8 cores and
64GB of RAM.

I'm trying to process a large pipe delimited file that has been compressed
with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
uploaded the gzipped file to HDFS and created an external table using the
attached script. I tried two simpler queries on the same table, and they
finished in ~5 and ~10 minutes respectively:

SELECT COUNT(*) FROM ClaimsImport;
SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;

However, when I tried to create a table stored as ORC using this table as
the input, the query ran for almost 4 hours:

CREATE TABLE Claims STORED AS ORC
AS
SELECT *
FROM ClaimsImport
--Exclude the header record
WHERE ClaimID <> 'ClaimID';

[image: image.png]

Why is there such a speed disparity between these different operations? I
understand that this job cannot be parallelized because the file is
compressed with gzip. I also understand that creating an ORC table from the
input will take more time than a simple COUNT(*). But it doesn't feel like
the CREATE TABLE operation should take more than 24x longer than a simple
SELECT COUNT(*) statement.

Thanks for any help. Please let me know if I can provide any additional
information.

Patrick


Create Table.sql
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. I started running ANALYZE TABLE on the external
table, but the progress was very slow. The stage had only read about 275MB
in 10 minutes. That equates to about 5.5 hours just to analyze the table.

This might just be the reality of trying to process a 240m record file with
80+ columns, unless there's an obvious issue with my setup that someone
sees. The solution is likely going to involve increasing parallelization.

To that end, I extracted and re-zipped this file in bzip. Since bzip is
splittable and gzip is not, Spark can process the bzip file in parallel.
The same CTAS query only took about 45 minutes. This is still a bit slower
than I had hoped, but the import from bzip fully utilized all available
cores. So we can give the cluster more resources if we need the process to
go faster.

Patrick

On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh 
wrote:

> OK for now have you analyzed statistics in Hive external table
>
> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
> COLUMNS;
> spark-sql (default)> DESC EXTENDED test.stg_t2;
>
> Hive external tables have little optimization
>
> HTH
>
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>> and 64GB of RAM.
>>
>> I'm trying to process a large pipe delimited file that has been
>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>> columns). I uploaded the gzipped file to HDFS and created an external table
>> using the attached script. I tried two simpler queries on the same table,
>> and they finished in ~5 and ~10 minutes respectively:
>>
>> SELECT COUNT(*) FROM ClaimsImport;
>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>
>> However, when I tried to create a table stored as ORC using this table as
>> the input, the query ran for almost 4 hours:
>>
>> CREATE TABLE Claims STORED AS ORC
>> AS
>> SELECT *
>> FROM ClaimsImport
>> --Exclude the header record
>> WHERE ClaimID <> 'ClaimID';
>>
>> [image: image.png]
>>
>> Why is there such a speed disparity between these different operations? I
>> understand that this job cannot be parallelized because the file is
>> compressed with gzip. I also understand that creating an ORC table from the
>> input will take more time than a simple COUNT(*). But it doesn't feel like
>> the CREATE TABLE operation should take more than 24x longer than a simple
>> SELECT COUNT(*) statement.
>>
>> Thanks for any help. Please let me know if I can provide any additional
>> information.
>>
>> Patrick
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark-SQL - Concurrent Inserts Into Same Table Throws Exception

2023-07-29 Thread Patrick Tucci
Hello,

I'm building an application on Spark SQL. The cluster is set up in
standalone mode with HDFS as storage. The only Spark application running is
the Spark Thrift Server using FAIR scheduling mode. Queries are submitted
to Thrift Server using beeline.

I have multiple queries that insert rows into the same table (EventClaims).
These queries work fine when run sequentially, however, some individual
queries don't fully utilize the resources available on the cluster. I would
like to run all of these queries concurrently to more fully utilize
available resources. When I attempt to do this, tasks eventually begin to
fail. The stack trace is pretty long, but here's what looks like the most
relevant parts:

org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)

org.apache.hive.service.cli.HiveSQLException: Error running query:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 28
in stage 128.0 failed 4 times, most recent failure: Lost task 28.3 in stage
128.0 (TID 6578) (10.0.50.2 executor 0): org.apache.spark.SparkException:
[TASK_WRITE_FAILED] Task failed while writing rows to hdfs://
10.0.50.1:8020/user/spark/warehouse/eventclaims.

Is it possible to have multiple concurrent writers to the same table with
Spark SQL? Is there any way to make this work?

Thanks for the help.

Patrick


Re: Spark-SQL - Concurrent Inserts Into Same Table Throws Exception

2023-07-30 Thread Patrick Tucci
Hi Mich and Pol,

Thanks for the feedback. The database layer is Hadoop 3.3.5. The cluster
restarted so I lost the stack trace in the application UI. In the snippets
I saved, it looks like the exception being thrown was from Hive. Given the
feedback you've provided, I suspect the issue is with how the Hive
components are handling concurrent writes.

While using a different format would likely help with this issue, I think I
have found an easier solution for now. Currently I have many individual
scripts that perform logic and insert the results separately. Instead of
each script performing an insert, each script can instead create a view.
After the views are created, one single script can perform one single
INSERT, combining the views with UNION ALL statements.

-- Old logic --
-- Script 1
INSERT INTO EventClaims
/*Long, complicated query 1*/

-- Script N
INSERT INTO EventClaims
/*Long, complicated query N*/

-- New logic --
-- Script 1
CREATE VIEW Q1 AS
/*Long, complicated query 1*/

-- Script N
CREATE VIEW QN AS
/*Long, complicated query N*/

-- Final script --
INSERT INTO EventClaims
SELECT * FROM Q1 UNION ALL
SELECT * FROM QN

The old approach had almost two dozen stages with relatively fewer tasks.
The new approach requires only 3 stages. With fewer stages and more tasks,
cluster utilization is much higher.

Thanks again for your feedback. I suspect better concurrent writes will be
valuable for my project in the future, so this is good information to have
ready.

Thanks,

Patrick

On Sun, Jul 30, 2023 at 5:30 AM Pol Santamaria  wrote:

> Hi Patrick,
>
> You can have multiple writers simultaneously writing to the same table in
> HDFS by utilizing an open table format with concurrency control. Several
> formats, such as Apache Hudi, Apache Iceberg, Delta Lake, and Qbeast
> Format, offer this capability. All of them provide advanced features that
> will work better in different use cases according to the writing pattern,
> type of queries, data characteristics, etc.
>
> *Pol Santamaria*
>
>
> On Sat, Jul 29, 2023 at 4:28 PM Mich Talebzadeh 
> wrote:
>
>> It is not Spark SQL that throws the error. It is the underlying Database
>> or layer that throws the error.
>>
>> Spark acts as an ETL tool.  What is the underlying DB  where the table
>> resides? Is concurrency supported. Please send the error to this list
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 29 Jul 2023 at 12:02, Patrick Tucci 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm building an application on Spark SQL. The cluster is set up in
>>> standalone mode with HDFS as storage. The only Spark application running is
>>> the Spark Thrift Server using FAIR scheduling mode. Queries are submitted
>>> to Thrift Server using beeline.
>>>
>>> I have multiple queries that insert rows into the same table
>>> (EventClaims). These queries work fine when run sequentially, however, some
>>> individual queries don't fully utilize the resources available on the
>>> cluster. I would like to run all of these queries concurrently to more
>>> fully utilize available resources. When I attempt to do this, tasks
>>> eventually begin to fail. The stack trace is pretty long, but here's what
>>> looks like the most relevant parts:
>>>
>>>
>>> org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
>>>
>>> org.apache.hive.service.cli.HiveSQLException: Error running query:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 28
>>> in stage 128.0 failed 4 times, most recent failure: Lost task 28.3 in stage
>>> 128.0 (TID 6578) (10.0.50.2 executor 0): org.apache.spark.SparkException:
>>> [TASK_WRITE_FAILED] Task failed while writing rows to hdfs://
>>> 10.0.50.1:8020/user/spark/warehouse/eventclaims.
>>>
>>> Is it possible to have multiple concurrent writers to the same table
>>> with Spark SQL? Is there any way to make this work?
>>>
>>> Thanks for the help.
>>>
>>> Patrick
>>>
>>


Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hello,

I'm attempting to run a query on Spark 3.4.0 through the Spark
ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
standalone mode using HDFS for storage.

The query is as follows:

SELECT ME.*, MB.BenefitID
FROM MemberEnrollment ME
JOIN MemberBenefits MB
ON ME.ID = MB.EnrollmentID
WHERE MB.BenefitID = 5
LIMIT 10

The tables are defined as follows:

-- Contains about 3M rows
CREATE TABLE MemberEnrollment
(
ID INT
, MemberID VARCHAR(50)
, StartDate DATE
, EndDate DATE
-- Other columns, but these are the most important
) STORED AS ORC;

-- Contains about 25m rows
CREATE TABLE MemberBenefits
(
EnrollmentID INT
, BenefitID INT
) STORED AS ORC;

When I execute the query, it runs a single broadcast exchange stage, which
completes after a few seconds. Then everything just hangs. The JDBC/ODBC
tab in the UI shows the query state as COMPILED, but no stages or tasks are
executing or pending:

[image: image.png]

I've let the query run for as long as 30 minutes with no additional stages,
progress, or errors. I'm not sure where to start troubleshooting.

Thanks for your help,

Patrick


Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. Unfortunately I don't have Hive set up on my cluster.
I can explore this if there are no other ways to troubleshoot.

I'm using beeline to run commands against the Thrift server. Here's the
command I use:

~/spark/bin/beeline -u jdbc:hive2://10.0.50.1:1 -n hadoop -f command.sql

Thanks again for your help.

Patrick


On Thu, Aug 10, 2023 at 2:24 PM Mich Talebzadeh 
wrote:

> Can you run this sql query through hive itself?
>
> Are you using this command or similar for your thrift server?
>
> beeline -u jdbc:hive2:///1/default
> org.apache.hive.jdbc.HiveDriver -n hadoop -p xxx
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Aug 2023 at 18:39, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm attempting to run a query on Spark 3.4.0 through the Spark
>> ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
>> standalone mode using HDFS for storage.
>>
>> The query is as follows:
>>
>> SELECT ME.*, MB.BenefitID
>> FROM MemberEnrollment ME
>> JOIN MemberBenefits MB
>> ON ME.ID = MB.EnrollmentID
>> WHERE MB.BenefitID = 5
>> LIMIT 10
>>
>> The tables are defined as follows:
>>
>> -- Contains about 3M rows
>> CREATE TABLE MemberEnrollment
>> (
>> ID INT
>> , MemberID VARCHAR(50)
>> , StartDate DATE
>> , EndDate DATE
>> -- Other columns, but these are the most important
>> ) STORED AS ORC;
>>
>> -- Contains about 25m rows
>> CREATE TABLE MemberBenefits
>> (
>> EnrollmentID INT
>> , BenefitID INT
>> ) STORED AS ORC;
>>
>> When I execute the query, it runs a single broadcast exchange stage,
>> which completes after a few seconds. Then everything just hangs. The
>> JDBC/ODBC tab in the UI shows the query state as COMPILED, but no stages or
>> tasks are executing or pending:
>>
>> [image: image.png]
>>
>> I've let the query run for as long as 30 minutes with no additional
>> stages, progress, or errors. I'm not sure where to start troubleshooting.
>>
>> Thanks for your help,
>>
>> Patrick
>>
>


Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-10 Thread Patrick Tucci
Hi Mich,

I don't believe Hive is installed. I set up this cluster from scratch. I
installed Hadoop and Spark by downloading them from their project websites.
If Hive isn't bundled with Hadoop or Spark, I don't believe I have it. I'm
running the Thrift server distributed with Spark, like so:

~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077

I can look into installing Hive, but it might take some time. I tried to
set up Hive when I first started evaluating distributed data processing
solutions, but I encountered many issues. Spark was much simpler, which was
part of the reason why I chose it.

Thanks again for the reply, I truly appreciate your help.

Patrick

On Thu, Aug 10, 2023 at 3:43 PM Mich Talebzadeh 
wrote:

> sorry host is 10.0.50.1
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 10 Aug 2023 at 20:41, Mich Talebzadeh 
> wrote:
>
>> Hi Patrick
>>
>> That beeline on port 1 is a hive thrift server running on your hive
>> on host 10.0.50.1:1.
>>
>> if you can access that host, you should be able to log into hive by
>> typing hive. The os user is hadoop in your case and sounds like there is no
>> password!
>>
>> Once inside that host, hive logs are kept in your case
>> /tmp/hadoop/hive.log or go to /tmp and do
>>
>> /tmp> find ./ -name hive.log. It should be under /tmp/hive.log
>>
>> Try running the sql inside hive and see what it says
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 10 Aug 2023 at 20:02, Patrick Tucci 
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Thanks for the reply. Unfortunately I don't have Hive set up on my
>>> cluster. I can explore this if there are no other ways to troubleshoot.
>>>
>>> I'm using beeline to run commands against the Thrift server. Here's the
>>> command I use:
>>>
>>> ~/spark/bin/beeline -u jdbc:hive2://10.0.50.1:1 -n hadoop -f
>>> command.sql
>>>
>>> Thanks again for your help.
>>>
>>> Patrick
>>>
>>>
>>> On Thu, Aug 10, 2023 at 2:24 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Can you run this sql query through hive itself?
>>>>
>>>> Are you using this command or similar for your thrift server?
>>>>
>>>> beeline -u jdbc:hive2:///1/default
>>>> org.apache.hive.jdbc.HiveDriver -n hadoop -p xxx
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Solutions Architect/Engineering Lead
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>&g

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-11 Thread Patrick Tucci
Thanks for the reply Stephen and Mich.

Stephen, you're right, it feels like Spark is waiting for something, but
I'm not sure what. I'm the only user on the cluster and there are plenty of
resources (+60 cores, +250GB RAM). I even tried restarting Hadoop, Spark
and the host servers to make sure nothing was lingering in the background.

Mich, thank you so much, your suggestion worked. Storing the tables as
Parquet solves the issue.

Interestingly, I found that only the MemberEnrollment table needs to be
Parquet. The ID field in MemberEnrollment is an int calculated during load
by a ROW_NUMBER() function. Further testing found that if I hard code a 0
as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
query works without issue even if both tables are ORC.

Should I infer from this issue that the Hive components prefer Parquet over
ORC? Furthermore, should I consider using a different table storage
framework, like Delta Lake, instead of the Hive components? Given this
issue and other issues I've had with Hive, I'm starting to think a
different solution might be more robust and stable. The main condition is
that my application operates solely through Thrift server, so I need to be
able to connect to Spark through Thrift server and have it write tables
using Delta Lake instead of Hive. From this StackOverflow question, it
looks like this is possible:
https://stackoverflow.com/questions/69862388/how-to-run-spark-sql-thrift-server-in-local-mode-and-connect-to-delta-using-jdbc

Thanks again to everyone who replied for their help.

Patrick


On Fri, Aug 11, 2023 at 2:14 AM Mich Talebzadeh 
wrote:

> Steve may have a valid point. You raised an issue with concurrent writes
> before, if I recall correctly. Since this limitation may be due to Hive
> metastore. By default Spark uses Apache Derby for its database
> persistence. *However it is limited to only one Spark session at any time
> for the purposes of metadata storage.*  That may be the cause here as
> well. Does this happen if the underlying tables are created as PARQUET as
> opposed to ORC?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Aug 2023 at 01:33, Stephen Coy 
> wrote:
>
>> Hi Patrick,
>>
>> When this has happened to me in the past (admittedly via spark-submit) it
>> has been because another job was still running and had already claimed some
>> of the resources (cores and memory).
>>
>> I think this can also happen if your configuration tries to claim
>> resources that will never be available.
>>
>> Cheers,
>>
>> SteveC
>>
>>
>> On 11 Aug 2023, at 3:36 am, Patrick Tucci 
>> wrote:
>>
>> Hello,
>>
>> I'm attempting to run a query on Spark 3.4.0 through the Spark
>> ThriftServer. The cluster has 64 cores, 250GB RAM, and operates in
>> standalone mode using HDFS for storage.
>>
>> The query is as follows:
>>
>> SELECT ME.*, MB.BenefitID
>> FROM MemberEnrollment ME
>> JOIN MemberBenefits MB
>> ON ME.ID <http://me.id/> = MB.EnrollmentID
>> WHERE MB.BenefitID = 5
>> LIMIT 10
>>
>> The tables are defined as follows:
>>
>> -- Contains about 3M rows
>> CREATE TABLE MemberEnrollment
>> (
>> ID INT
>> , MemberID VARCHAR(50)
>> , StartDate DATE
>> , EndDate DATE
>> -- Other columns, but these are the most important
>> ) STORED AS ORC;
>>
>> -- Contains about 25m rows
>> CREATE TABLE MemberBenefits
>> (
>> EnrollmentID INT
>> , BenefitID INT
>> ) STORED AS ORC;
>>
>> When I execute the query, it runs a single broadcast exchange stage,
>> which completes after a few seconds. Then everything just hangs. The
>> JDBC/ODBC tab in the UI shows the query state as COMPILED, but no stages or
>> tasks are executing or pending:
>>
>> 
>>
>> I've let the query run for as long as 30 minutes with no additional
>> stages, progress, or errors. I'm not sure where to start troubleshooting.
>>
>> Thanks for your help,
>>
>> Patrick
>>
&

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-12 Thread Patrick Tucci
Hi Mich,

Thanks for the feedback. My original intention after reading your response
was to stick to Hive for managing tables. Unfortunately, I'm running into
another case of SQL scripts hanging. Since all tables are already Parquet,
I'm out of troubleshooting options. I'm going to migrate to Delta Lake and
see if that solves the issue.

Thanks again for your feedback.

Patrick

On Fri, Aug 11, 2023 at 10:09 AM Mich Talebzadeh 
wrote:

> Hi Patrick,
>
> There is not anything wrong with Hive On-premise it is the best data
> warehouse there is
>
> Hive handles both ORC and Parquet formal well. They are both columnar
> implementations of relational model. What you are seeing is the Spark API
> to Hive which prefers Parquet. I found out a few years ago.
>
> From your point of view I suggest you stick to parquet format with Hive
> specific to Spark. As far as I know you don't have a fully independent Hive
> DB as yet.
>
> Anyway stick to Hive for now as you never know what issues you may be
> facing using moving to Delta Lake.
>
> You can also use compression
>
> STORED AS PARQUET
> TBLPROPERTIES ("parquet.compression"="SNAPPY")
>
> ALSO
>
> ANALYZE TABLE  COMPUTE STATISTICS FOR COLUMNS
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Aug 2023 at 11:26, Patrick Tucci 
> wrote:
>
>> Thanks for the reply Stephen and Mich.
>>
>> Stephen, you're right, it feels like Spark is waiting for something, but
>> I'm not sure what. I'm the only user on the cluster and there are plenty of
>> resources (+60 cores, +250GB RAM). I even tried restarting Hadoop, Spark
>> and the host servers to make sure nothing was lingering in the background.
>>
>> Mich, thank you so much, your suggestion worked. Storing the tables as
>> Parquet solves the issue.
>>
>> Interestingly, I found that only the MemberEnrollment table needs to be
>> Parquet. The ID field in MemberEnrollment is an int calculated during load
>> by a ROW_NUMBER() function. Further testing found that if I hard code a 0
>> as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
>> query works without issue even if both tables are ORC.
>>
>> Should I infer from this issue that the Hive components prefer Parquet
>> over ORC? Furthermore, should I consider using a different table storage
>> framework, like Delta Lake, instead of the Hive components? Given this
>> issue and other issues I've had with Hive, I'm starting to think a
>> different solution might be more robust and stable. The main condition is
>> that my application operates solely through Thrift server, so I need to be
>> able to connect to Spark through Thrift server and have it write tables
>> using Delta Lake instead of Hive. From this StackOverflow question, it
>> looks like this is possible:
>> https://stackoverflow.com/questions/69862388/how-to-run-spark-sql-thrift-server-in-local-mode-and-connect-to-delta-using-jdbc
>>
>> Thanks again to everyone who replied for their help.
>>
>> Patrick
>>
>>
>> On Fri, Aug 11, 2023 at 2:14 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Steve may have a valid point. You raised an issue with concurrent writes
>>> before, if I recall correctly. Since this limitation may be due to Hive
>>> metastore. By default Spark uses Apache Derby for its database
>>> persistence. *However it is limited to only one Spark session at any
>>> time for the purposes of metadata storage.*  That may be the cause here
>>> as well. Does this happen if the underlying tables are created as PARQUET
>>> as opposed to ORC?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzad

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-12 Thread Patrick Tucci
Yes, on premise.

Unfortunately after installing Delta Lake and re-writing all tables as
Delta tables, the issue persists.

On Sat, Aug 12, 2023 at 11:34 AM Mich Talebzadeh 
wrote:

> ok sure.
>
> Is this Delta Lake going to be on-premise?
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Aug 2023 at 12:03, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Thanks for the feedback. My original intention after reading your
>> response was to stick to Hive for managing tables. Unfortunately, I'm
>> running into another case of SQL scripts hanging. Since all tables are
>> already Parquet, I'm out of troubleshooting options. I'm going to migrate
>> to Delta Lake and see if that solves the issue.
>>
>> Thanks again for your feedback.
>>
>> Patrick
>>
>> On Fri, Aug 11, 2023 at 10:09 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Patrick,
>>>
>>> There is not anything wrong with Hive On-premise it is the best data
>>> warehouse there is
>>>
>>> Hive handles both ORC and Parquet formal well. They are both columnar
>>> implementations of relational model. What you are seeing is the Spark API
>>> to Hive which prefers Parquet. I found out a few years ago.
>>>
>>> From your point of view I suggest you stick to parquet format with Hive
>>> specific to Spark. As far as I know you don't have a fully independent Hive
>>> DB as yet.
>>>
>>> Anyway stick to Hive for now as you never know what issues you may be
>>> facing using moving to Delta Lake.
>>>
>>> You can also use compression
>>>
>>> STORED AS PARQUET
>>> TBLPROPERTIES ("parquet.compression"="SNAPPY")
>>>
>>> ALSO
>>>
>>> ANALYZE TABLE  COMPUTE STATISTICS FOR COLUMNS
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 11 Aug 2023 at 11:26, Patrick Tucci 
>>> wrote:
>>>
>>>> Thanks for the reply Stephen and Mich.
>>>>
>>>> Stephen, you're right, it feels like Spark is waiting for something,
>>>> but I'm not sure what. I'm the only user on the cluster and there are
>>>> plenty of resources (+60 cores, +250GB RAM). I even tried restarting
>>>> Hadoop, Spark and the host servers to make sure nothing was lingering in
>>>> the background.
>>>>
>>>> Mich, thank you so much, your suggestion worked. Storing the tables as
>>>> Parquet solves the issue.
>>>>
>>>> Interestingly, I found that only the MemberEnrollment table needs to be
>>>> Parquet. The ID field in MemberEnrollment is an int calculated during load
>>>> by a ROW_NUMBER() function. Further testing found that if I hard code a 0
>>>> as MemberEnrollment.ID instead of using the ROW_NUMBER() function, the
>>>> query works without issue even if both tables are ORC.
>>>>
>>>> Should I infer from this issue that the Hive components prefer Parquet
>>>> over ORC? Furthermore, should I consider using a different table storage
>>>> framework, like Delta Lake, instead of the Hive components? Given this
&g

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-13 Thread Patrick Tucci
I attempted to install Hive yesterday. The experience was similar to other
attempts at installing Hive: it took a few hours and at the end of the
process, I didn't have a working setup. The latest stable release would not
run. I never discovered the cause, but similar StackOverflow questions
suggest it might be a Java incompatibility issue. Since I didn't want to
downgrade or install an additional Java version, I attempted to use the
latest alpha as well. This appears to have worked, although I couldn't
figure out how to get it to use the metastore_db from Spark.

After turning my attention back to Spark, I determined the issue. After
much troubleshooting, I discovered that if I performed a COUNT(*) using the
same JOINs, the problem query worked. I removed all the columns from the
SELECT statement and added them one by one until I found the culprit. It's
a text field on one of the tables. When the query SELECTs this column, or
attempts to filter on it, the query hangs and never completes. If I remove
all explicit references to this column, the query works fine. Since I need
this column in the results, I went back to the ETL and extracted the values
to a dimension table. I replaced the text column in the source table with
an integer ID column and the query worked without issue.

On the topic of Hive, does anyone have any detailed resources for how to
set up Hive from scratch? Aside from the official site, since those
instructions didn't work for me. I'm starting to feel uneasy about building
my process around Spark. There really shouldn't be any instances where I
ask Spark to run legal ANSI SQL code and it just does nothing. In the past
4 days I've run into 2 of these instances, and the solution was more voodoo
and magic than examining errors/logs and fixing code. I feel that I should
have a contingency plan in place for when I run into an issue with Spark
that can't be resolved.

Thanks everyone.


On Sat, Aug 12, 2023 at 2:18 PM Mich Talebzadeh 
wrote:

> OK you would not have known unless you went through the process so to
> speak.
>
> Let us do something revolutionary here 😁
>
> Install hive and its metastore. You already have hadoop anyway
>
> https://cwiki.apache.org/confluence/display/hive/adminmanual+installation
>
> hive metastore
>
>
> https://data-flair.training/blogs/apache-hive-metastore/#:~:text=What%20is%20Hive%20Metastore%3F,by%20using%20metastore%20service%20API
> .
>
> choose one of these
>
> derby  hive  mssql  mysql  oracle  postgres
>
> Mine is an oracle. postgres is good as well.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Aug 2023 at 18:31, Patrick Tucci 
> wrote:
>
>> Yes, on premise.
>>
>> Unfortunately after installing Delta Lake and re-writing all tables as
>> Delta tables, the issue persists.
>>
>> On Sat, Aug 12, 2023 at 11:34 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> ok sure.
>>>
>>> Is this Delta Lake going to be on-premise?
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sat, 12 Aug 2023 at 12:03, Patrick Tucci 
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Thanks for the feedback. My original intention after reading your
>>>> response was to stick to Hive for managing tables. Unfortunately, I'm
>>>> running into another case 

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Everyone,

I just wanted to follow up on this issue. This issue has continued since
our last correspondence. Today I had a query hang and couldn't resolve the
issue. I decided to upgrade my Spark install from 3.4.0 to 3.4.1. After
doing so, instead of the query hanging, I got an error message that the
driver didn't have enough memory to broadcast objects. After increasing the
driver memory, the query runs without issue.

I hope this can be helpful to someone else in the future. Thanks again for
the support,

Patrick

On Sun, Aug 13, 2023 at 7:52 AM Mich Talebzadeh 
wrote:

> OK I use Hive 3.1.1
>
> My suggestion is to put your hive issues to u...@hive.apache.org and for
> JAVA version compatibility
>
> They will give you better info.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 13 Aug 2023 at 11:48, Patrick Tucci 
> wrote:
>
>> I attempted to install Hive yesterday. The experience was similar to
>> other attempts at installing Hive: it took a few hours and at the end of
>> the process, I didn't have a working setup. The latest stable release would
>> not run. I never discovered the cause, but similar StackOverflow questions
>> suggest it might be a Java incompatibility issue. Since I didn't want to
>> downgrade or install an additional Java version, I attempted to use the
>> latest alpha as well. This appears to have worked, although I couldn't
>> figure out how to get it to use the metastore_db from Spark.
>>
>> After turning my attention back to Spark, I determined the issue. After
>> much troubleshooting, I discovered that if I performed a COUNT(*) using
>> the same JOINs, the problem query worked. I removed all the columns from
>> the SELECT statement and added them one by one until I found the culprit.
>> It's a text field on one of the tables. When the query SELECTs this column,
>> or attempts to filter on it, the query hangs and never completes. If I
>> remove all explicit references to this column, the query works fine. Since
>> I need this column in the results, I went back to the ETL and extracted the
>> values to a dimension table. I replaced the text column in the source table
>> with an integer ID column and the query worked without issue.
>>
>> On the topic of Hive, does anyone have any detailed resources for how to
>> set up Hive from scratch? Aside from the official site, since those
>> instructions didn't work for me. I'm starting to feel uneasy about building
>> my process around Spark. There really shouldn't be any instances where I
>> ask Spark to run legal ANSI SQL code and it just does nothing. In the past
>> 4 days I've run into 2 of these instances, and the solution was more voodoo
>> and magic than examining errors/logs and fixing code. I feel that I should
>> have a contingency plan in place for when I run into an issue with Spark
>> that can't be resolved.
>>
>> Thanks everyone.
>>
>>
>> On Sat, Aug 12, 2023 at 2:18 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK you would not have known unless you went through the process so to
>>> speak.
>>>
>>> Let us do something revolutionary here 😁
>>>
>>> Install hive and its metastore. You already have hadoop anyway
>>>
>>> https://cwiki.apache.org/confluence/display/hive/adminmanual+installation
>>>
>>> hive metastore
>>>
>>>
>>> https://data-flair.training/blogs/apache-hive-metastore/#:~:text=What%20is%20Hive%20Metastore%3F,by%20using%20metastore%20service%20API
>>> .
>>>
>>> choose one of these
>>>
>>> derby  hive  mssql  mysql  oracle  postgres
>>>
>>> Mine is an oracle. postgres is good as well.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Mich,

Yes, that's the sequence of events. I think the big breakthrough is that
(for now at least) Spark is throwing errors instead of the queries hanging.
Which is a big step forward. I can at least troubleshoot issues if I know
what they are.

When I reflect on the issues I faced and the solutions, my issue may have
been driver memory all along. I just couldn't determine that was the issue
because I never saw any errors. In one case, converting a LEFT JOIN to an
inner JOIN caused the query to run. In another case, replacing a text field
with an int ID and JOINing on the ID column worked. Per your advice,
changing file formats from ORC to Parquet solved one issue. These
interventions could have changed the way Spark needed to broadcast data to
execute the query, thereby reducing demand on the memory-constrained driver.

Fingers crossed this is the solution. I will reply to this thread if the
issue comes up again (hopefully it doesn't!).

Thanks again,

Patrick

On Thu, Aug 17, 2023 at 1:54 PM Mich Talebzadeh 
wrote:

> Hi Patrik,
>
> glad that you have managed to sort this problem out. Hopefully it will go
> away for good.
>
> Still we are in the dark about how this problem is going away and coming
> back :( As I recall the chronology of events were as follows:
>
>
>1. The Issue with hanging Spark job reported
>2. concurrency on Hive metastore (single threaded Derby DB) was
>identified as a possible cause
>3. You changed the underlying Hive table formats from ORC to Parquet
>and somehow it worked
>4. The issue was reported again
>5. You upgraded the spark version from 3.4.0 to 3.4.1 (as a possible
>underlying issue) and encountered driver memory limitation.
>6. you allocated more memory to the driver and it is running ok for
>now,
>7. It appears that you are doing some join between a large dataset and
>a smaller dataset. Spark decides to do broadcast join by taking the smaller
>dataset, fit it into the driver memory and broadcasting it to all
>executors.  That is where you had this issue with the memory limit on the
>driver. In the absence of Broadcast join, spark needs to perform a shuffle
>which is an expensive process.
>   1. you can increase the broadcast join memory setting the conf.
>   parameter "spark.sql.autoBroadcastJoinThreshold" in bytes (check the 
> manual)
>   2. You can also disable the broadcast join by setting
>   "spark.sql.autoBroadcastJoinThreshold", -1 to see what is happening.
>
>
> So you still need to find a resolution to this issue. Maybe 3.4.1 has
> managed to fix some underlying issues.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 17:17, Patrick Tucci 
> wrote:
>
>> Hi Everyone,
>>
>> I just wanted to follow up on this issue. This issue has continued since
>> our last correspondence. Today I had a query hang and couldn't resolve the
>> issue. I decided to upgrade my Spark install from 3.4.0 to 3.4.1. After
>> doing so, instead of the query hanging, I got an error message that the
>> driver didn't have enough memory to broadcast objects. After increasing the
>> driver memory, the query runs without issue.
>>
>> I hope this can be helpful to someone else in the future. Thanks again
>> for the support,
>>
>> Patrick
>>
>> On Sun, Aug 13, 2023 at 7:52 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> OK I use Hive 3.1.1
>>>
>>> My suggestion is to put your hive issues to u...@hive.apache.org and
>>> for JAVA version compatibility
>>>
>>> They will give you better info.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *D

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
Hi Mich,

Here are my config values from spark-defaults.conf:

spark.eventLog.enabled true
spark.eventLog.dir hdfs://10.0.50.1:8020/spark-logs
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.fs.logDirectory hdfs://10.0.50.1:8020/spark-logs
spark.history.fs.update.interval 10s
spark.history.ui.port 18080
spark.sql.warehouse.dir hdfs://10.0.50.1:8020/user/spark/warehouse
spark.executor.cores 4
spark.executor.memory 16000M
spark.sql.legacy.createHiveTableByDefault false
spark.driver.host 10.0.50.1
spark.scheduler.mode FAIR
spark.driver.memory 4g #added 2023-08-17

The only application that runs on the cluster is the Spark Thrift server,
which I launch like so:

~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077

The cluster runs in standalone mode and does not use Yarn for resource
management. As a result, the Spark Thrift server acquires all available
cluster resources when it starts. This is okay; as of right now, I am the
only user of the cluster. If I add more users, they will also be SQL users,
submitting queries through the Thrift server.

Let me know if you have any other questions or thoughts.

Thanks,

Patrick

On Thu, Aug 17, 2023 at 3:09 PM Mich Talebzadeh 
wrote:

> Hello Paatrick,
>
> As a matter of interest what parameters and their respective values do
> you use in spark-submit. I assume it is running in YARN mode.
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 19:36, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Yes, that's the sequence of events. I think the big breakthrough is that
>> (for now at least) Spark is throwing errors instead of the queries hanging.
>> Which is a big step forward. I can at least troubleshoot issues if I know
>> what they are.
>>
>> When I reflect on the issues I faced and the solutions, my issue may have
>> been driver memory all along. I just couldn't determine that was the issue
>> because I never saw any errors. In one case, converting a LEFT JOIN to an
>> inner JOIN caused the query to run. In another case, replacing a text field
>> with an int ID and JOINing on the ID column worked. Per your advice,
>> changing file formats from ORC to Parquet solved one issue. These
>> interventions could have changed the way Spark needed to broadcast data to
>> execute the query, thereby reducing demand on the memory-constrained driver.
>>
>> Fingers crossed this is the solution. I will reply to this thread if the
>> issue comes up again (hopefully it doesn't!).
>>
>> Thanks again,
>>
>> Patrick
>>
>> On Thu, Aug 17, 2023 at 1:54 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Patrik,
>>>
>>> glad that you have managed to sort this problem out. Hopefully it will
>>> go away for good.
>>>
>>> Still we are in the dark about how this problem is going away and coming
>>> back :( As I recall the chronology of events were as follows:
>>>
>>>
>>>1. The Issue with hanging Spark job reported
>>>2. concurrency on Hive metastore (single threaded Derby DB) was
>>>identified as a possible cause
>>>3. You changed the underlying Hive table formats from ORC to Parquet
>>>and somehow it worked
>>>4. The issue was reported again
>>>5. You upgraded the spark version from 3.4.0 to 3.4.1 (as a possible
>>>underlying issue) and encountered driver memory limitation.
>>>6. you allocated more memory to the driver and it is running ok for
>>>now,
>>>7. It appears that you are doing some join between a large dataset
>>>and a smaller dataset. Spark decides to do broadcast join by taking the
>>>smaller dataset, fit it into the driver memory and broadcasting it to all
>>>executors.  That is where you had this issue with the memory limit on the
>>>driver. In the absence of Broadcast join, spark needs to perform a 
>>> shuffle
>>>which is an expensive process.
>>>   1

Re: Spark-SQL - Query Hanging, How To Troubleshoot

2023-08-17 Thread Patrick Tucci
No, the driver memory was not set explicitly. So it was likely the default
value, which appears to be 1GB.

On Thu, Aug 17, 2023, 16:49 Mich Talebzadeh 
wrote:

> One question, what was the driver memory before setting it to 4G? Did you
> have it set at all before?
>
> HTH
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Aug 2023 at 21:01, Patrick Tucci 
> wrote:
>
>> Hi Mich,
>>
>> Here are my config values from spark-defaults.conf:
>>
>> spark.eventLog.enabled true
>> spark.eventLog.dir hdfs://10.0.50.1:8020/spark-logs
>> spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
>> spark.history.fs.logDirectory hdfs://10.0.50.1:8020/spark-logs
>> spark.history.fs.update.interval 10s
>> spark.history.ui.port 18080
>> spark.sql.warehouse.dir hdfs://10.0.50.1:8020/user/spark/warehouse
>> spark.executor.cores 4
>> spark.executor.memory 16000M
>> spark.sql.legacy.createHiveTableByDefault false
>> spark.driver.host 10.0.50.1
>> spark.scheduler.mode FAIR
>> spark.driver.memory 4g #added 2023-08-17
>>
>> The only application that runs on the cluster is the Spark Thrift server,
>> which I launch like so:
>>
>> ~/spark/sbin/start-thriftserver.sh --master spark://10.0.50.1:7077
>>
>> The cluster runs in standalone mode and does not use Yarn for resource
>> management. As a result, the Spark Thrift server acquires all available
>> cluster resources when it starts. This is okay; as of right now, I am the
>> only user of the cluster. If I add more users, they will also be SQL users,
>> submitting queries through the Thrift server.
>>
>> Let me know if you have any other questions or thoughts.
>>
>> Thanks,
>>
>> Patrick
>>
>> On Thu, Aug 17, 2023 at 3:09 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hello Paatrick,
>>>
>>> As a matter of interest what parameters and their respective values do
>>> you use in spark-submit. I assume it is running in YARN mode.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Solutions Architect/Engineering Lead
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 17 Aug 2023 at 19:36, Patrick Tucci 
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Yes, that's the sequence of events. I think the big breakthrough is
>>>> that (for now at least) Spark is throwing errors instead of the queries
>>>> hanging. Which is a big step forward. I can at least troubleshoot issues if
>>>> I know what they are.
>>>>
>>>> When I reflect on the issues I faced and the solutions, my issue may
>>>> have been driver memory all along. I just couldn't determine that was the
>>>> issue because I never saw any errors. In one case, converting a LEFT JOIN
>>>> to an inner JOIN caused the query to run. In another case, replacing a text
>>>> field with an int ID and JOINing on the ID column worked. Per your advice,
>>>> changing file formats from ORC to Parquet solved one issue. These
>>>> interventions could have changed the way Spark needed to broadcast data to
>>>> execute the query, thereby reducing demand on the memory-constrained 
>>>> driver.
>>>>
>>>> Fingers crossed this is the 

Re: Spark stand-alone mode

2023-09-15 Thread Patrick Tucci
I use Spark in standalone mode. It works well, and the instructions on the
site are accurate for the most part. The only thing that didn't work for me
was the start_all.sh script. Instead, I use a simple script that starts the
master node, then uses SSH to connect to the worker machines and start the
worker nodes.

All the nodes will need access to the same data, so you will need some sort
of shared file system. You could use an NFS share mounted to the same point
on each machine, S3, or HDFS.

Standalone also acquires all resources when an application is submitted, so
by default only one application may be run at a time. You can limit the
resources allocated to each application to allow multiple concurrent
applications, or you could configure dynamic allocation to scale the
resources up and down per application as needed.

On Fri, Sep 15, 2023 at 5:56 AM Ilango  wrote:

>
> Hi all,
>
> We have 4 HPC nodes and installed spark individually in all nodes.
>
> Spark is used as local mode(each driver/executor will have 8 cores and 65
> GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
> scheduler.
>
> As this is local mode, we are facing performance issue(as only one
> executor) when it comes dealing with large datasets.
>
> Can I convert this 4 nodes into spark standalone cluster. We dont have
> hadoop so yarn mode is out of scope.
>
> Shall I follow the official documentation for setting up standalone
> cluster. Will it work? Do I need to aware anything else?
> Can you please share your thoughts?
>
> Thanks,
> Elango
>


Re: Spark stand-alone mode

2023-09-19 Thread Patrick Tucci
Multiple applications can run at once, but you need to either configure
Spark or your applications to allow that. In stand-alone mode, each
application attempts to take all resources available by default. This
section of the documentation has more details:

https://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling

Explicitly setting the resources per application limits the resources to
the configured values for the lifetime of the application. You can use
dynamic allocation to allow Spark to scale the resources up and down per
application based on load, but the configuration is relatively more complex:

https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

On Mon, Sep 18, 2023 at 3:53 PM Ilango  wrote:

>
> Thanks all for your suggestions. Noted with thanks.
> Just wanted share few more details about the environment
> 1. We use NFS for data storage and data is in parquet format
> 2. All HPC nodes are connected and already work as a cluster for Studio
> workbench. I can setup password less SSH if it not exist already.
> 3. We will stick with NFS for now and stand alone then may be will explore
> HDFS and YARN.
>
> Can you please confirm whether multiple users can run spark jobs at the
> same time?
> If so I will start working on it and let you know how it goes
>
> Mich, the link to Hadoop is not working. Can you please check and let me
> know the correct link. Would like to explore Hadoop option as well.
>
>
>
> Thanks,
> Elango
>
> On Sat, Sep 16, 2023, 4:20 AM Bjørn Jørgensen 
> wrote:
>
>> you need to setup ssh without password, use key instead.  How to connect
>> without password using SSH (passwordless)
>> 
>>
>> fre. 15. sep. 2023 kl. 20:55 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>> Hi,
>>>
>>> Can these 4 nodes talk to each other through ssh as trusted hosts (on
>>> top of the network that Sean already mentioned)? Otherwise you need to set
>>> it up. You can install a LAN if you have another free port at the back of
>>> your HPC nodes. They should
>>>
>>> You ought to try to set up a Hadoop cluster pretty easily. Check this
>>> old article of mine for Hadoop set-up.
>>>
>>>
>>> https://www.linkedin.com/pulse/diy-festive-season-how-install-configure-big-data-so-mich/?trackingId=z7n5tx7tQOGK9tcG9VClkw%3D%3D
>>>
>>> Hadoop will provide you with a common storage layer (HDFS) that these
>>> nodes will be able to share and talk. Yarn is your best bet as the resource
>>> manager with reasonably powerful hosts you have. However, for now the Stand
>>> Alone mode will do. Make sure that the Metastore you choose, (by default it
>>> will use Hive Metastore called Derby :( ) is something respetable like
>>> Postgres DB that can handle multiple concurrent spark jobs
>>>
>>> HTH
>>>
>>>
>>> Mich Talebzadeh,
>>> Distinguished Technologist, Solutions Architect & Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 15 Sept 2023 at 07:04, Ilango  wrote:
>>>

 Hi all,

 We have 4 HPC nodes and installed spark individually in all nodes.

 Spark is used as local mode(each driver/executor will have 8 cores and
 65 GB) in Sparklyr/pyspark using Rstudio/Posit workbench. Slurm is used as
 scheduler.

 As this is local mode, we are facing performance issue(as only one
 executor) when it comes dealing with large datasets.

 Can I convert this 4 nodes into spark standalone cluster. We dont have
 hadoop so yarn mode is out of scope.

 Shall I follow the official documentation for setting up standalone
 cluster. Will it work? Do I need to aware anything else?
 Can you please share your thoughts?

 Thanks,
 Elango

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Re: Spark join produce duplicate rows in resultset

2023-10-22 Thread Patrick Tucci
Hi Meena,

It's not impossible, but it's unlikely that there's a bug in Spark SQL
randomly duplicating rows. The most likely explanation is there are more
records in the item table that match your sys/custumer_id/scode criteria
than you expect.

In your original query, try changing select rev.* to select I.*. This will
show you the records from item that the join produces. If the first part of
the code only returns one record, I expect you will see 4 distinct records
returned here.

Thanks,

Patrick


On Sun, Oct 22, 2023 at 1:29 AM Meena Rajani  wrote:

> Hello all:
>
> I am using spark sql to join two tables. To my surprise I am
> getting redundant rows. What could be the cause.
>
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
> where rev.custumer_id = '123456789'
>
> The first part of the code brings one row
>
> select rev.* from rev
> inner join customer c
> on rev.custumer_id =c.id
> inner join product p
> rev.sys = p.sys
> rev.prin = p.prin
> rev.scode= p.bcode
>
>
> The  item has two rows which have common attributes  and the* final join
> should result in 2 rows. But I am seeing 4 rows instead.*
>
> left join item I
> on rev.sys = i.sys
> rev.custumer_id = I.custumer_id
> rev. scode = I.scode
>
>
>
> Regards,
> Meena
>
>
>


unsubscribe

2023-11-09 Thread Duflot Patrick
unsubscribe


Re: Fully in-memory shuffles

2015-06-10 Thread Patrick Wendell
In many cases the shuffle will actually hit the OS buffer cache and
not ever touch spinning disk if it is a size that is less than memory
on the machine.

- Patrick

On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet  wrote:
> So with this... to help my understanding of Spark under the hood-
>
> Is this statement correct "When data needs to pass between multiple JVMs, a
> shuffle will always hit disk"?
>
> On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen  wrote:
>>
>> There's a discussion of this at https://github.com/apache/spark/pull/5403
>>
>>
>>
>> On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet  wrote:
>>>
>>> Is it possible to configure Spark to do all of its shuffling FULLY in
>>> memory (given that I have enough memory to store all the data)?
>>>
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[ANNOUNCE] Announcing Spark 1.4

2015-06-11 Thread Patrick Wendell
Hi All,

I'm happy to announce the availability of Spark 1.4.0! Spark 1.4.0 is
the fifth release on the API-compatible 1.X line. It is Spark's
largest release ever, with contributions from 210 developers and more
than 1,000 commits!

A huge thanks go to all of the individuals and organizations involved
in development and testing of this release.

Visit the release notes [1] to read about the new features, or
download [2] the release today.

For errata in the contributions or release notes, please e-mail me
*directly* (not on-list).

Thanks to everyone who helped work on this release!

[1] http://spark.apache.org/releases/spark-release-1-4-0.html
[2] http://spark.apache.org/downloads.html

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fully in-memory shuffles

2015-06-11 Thread Patrick Wendell
Hey Corey,

Yes, when shuffles are smaller than available memory to the OS, most
often the outputs never get stored to disk. I believe this holds same
for the YARN shuffle service, because the write path is actually the
same, i.e. we don't fsync the writes and force them to disk. I would
guess in such shuffles the bottleneck is serializing the data rather
than raw IO, so I'm not sure explicitly buffering the data in the JVM
process would yield a large improvement.

Writing shuffle to an explicitly pinned memory filesystem is also
possible (per Davies suggestion), but it's brittle because the job
will fail if shuffle output exceeds memory.

- Patrick

On Wed, Jun 10, 2015 at 9:50 PM, Davies Liu  wrote:
> If you have enough memory, you can put the temporary work directory in
> tempfs (in memory file system).
>
> On Wed, Jun 10, 2015 at 8:43 PM, Corey Nolet  wrote:
>> Ok so it is the case that small shuffles can be done without hitting any
>> disk. Is this the same case for the aux shuffle service in yarn? Can that be
>> done without hitting disk?
>>
>> On Wed, Jun 10, 2015 at 9:17 PM, Patrick Wendell  wrote:
>>>
>>> In many cases the shuffle will actually hit the OS buffer cache and
>>> not ever touch spinning disk if it is a size that is less than memory
>>> on the machine.
>>>
>>> - Patrick
>>>
>>> On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet  wrote:
>>> > So with this... to help my understanding of Spark under the hood-
>>> >
>>> > Is this statement correct "When data needs to pass between multiple
>>> > JVMs, a
>>> > shuffle will always hit disk"?
>>> >
>>> > On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen 
>>> > wrote:
>>> >>
>>> >> There's a discussion of this at
>>> >> https://github.com/apache/spark/pull/5403
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet  wrote:
>>> >>>
>>> >>> Is it possible to configure Spark to do all of its shuffling FULLY in
>>> >>> memory (given that I have enough memory to store all the data)?
>>> >>>
>>> >>>
>>> >>>
>>> >>
>>> >
>>
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dynamic allocator requests -1 executors

2015-06-12 Thread Patrick Woody
Hey all,

I've recently run into an issue where spark dynamicAllocation has asked for
-1 executors from YARN. Unfortunately, this raises an exception that kills
the executor-allocation thread and the application can't request more
resources.

Has anyone seen this before? It is spurious and the application usually
works, but when this gets hit it becomes unusable when getting stuck at
minimum YARN resources.

Stacktrace below.

Thanks!
-Pat

470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught
exception in thread spark-dynamic-executor-allocation-0
471 ! java.lang.IllegalArgumentException: Attempted to request a negative
number of executor(s) -1 from the cluster manager. Please specify a
positive number!
472 ! at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
~[spark-core_2.10-1.3.1.jar:1.
473 ! at
org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
~[spark-core_2.10-1.3.1.jar:1.3.1]
474 ! at
org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
~[spark-core_2.10-1.3.1.jar:1.3.1]
475 ! at
org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
~[spark-core_2.10-1.3.1.jar:1.3.1]
476 ! at 
org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
~[spark-core_2.10-1.3.1.j
477 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
478 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
479 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
480 ! at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
~[spark-core_2.10-1.3.1.jar:1.3.1]
481 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
[spark-core_2.10-1.3.1.jar:1.3.1]
482 ! at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
484 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
485 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
486 ! at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
487 ! at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]


Re: Dynamic allocator requests -1 executors

2015-06-13 Thread Patrick Woody
Hey Sandy,

I'll test it out on 1.4. Do you have a bug number or PR that I could reference 
as well?

Thanks!
-Pat

Sent from my iPhone

> On Jun 13, 2015, at 11:38 AM, Sandy Ryza  wrote:
> 
> Hi Patrick,
> 
> I'm noticing that you're using Spark 1.3.1.  We fixed a bug in dynamic 
> allocation in 1.4 that permitted requesting negative numbers of executors.  
> Any chance you'd be able to try with the newer version and see if the problem 
> persists?
> 
> -Sandy
> 
>> On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody  
>> wrote:
>> Hey all,
>> 
>> I've recently run into an issue where spark dynamicAllocation has asked for 
>> -1 executors from YARN. Unfortunately, this raises an exception that kills 
>> the executor-allocation thread and the application can't request more 
>> resources.
>> 
>> Has anyone seen this before? It is spurious and the application usually 
>> works, but when this gets hit it becomes unusable when getting stuck at 
>> minimum YARN resources.
>> 
>> Stacktrace below.
>> 
>> Thanks!
>> -Pat
>> 
>> 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught 
>> exception in thread spark-dynamic-executor-allocation-0
>> 471 ! java.lang.IllegalArgumentException: Attempted to request a negative 
>> number of executor(s) -1 from the cluster manager. Please specify a positive 
>> number!
>> 472 ! at 
>> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
>>  ~[spark-core_2.10-1.3.1.jar:1.
>> 473 ! at 
>> org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) 
>> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 474 ! at 
>> org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
>>  ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 475 ! at 
>> org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
>>  ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 476 ! at 
>> org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
>>  ~[spark-core_2.10-1.3.1.j
>> 477 ! at 
>> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
>>  ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 478 ! at 
>> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>>  ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 479 ! at 
>> org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
>>  ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 480 ! at 
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) 
>> ~[spark-core_2.10-1.3.1.jar:1.3.1]
>> 481 ! at 
>> org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
>>  [spark-core_2.10-1.3.1.jar:1.3.1]
>> 482 ! at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
>> [na:1.7.0_71]
>> 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) 
>> [na:1.7.0_71]
>> 484 ! at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>  [na:1.7.0_71]
>> 485 ! at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>  [na:1.7.0_71]
>> 486 ! at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  [na:1.7.0_71]
>> 487 ! at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  [na:1.7.0_71]
> 


Get Spark version before starting context

2015-07-04 Thread Patrick Woody
Hey all,

Is it possible to reliably get the version string of a Spark cluster prior
to trying to connect via the SparkContext on the client side? Most of the
errors I've seen on mismatched versions have been cryptic, so it would be
helpful if I could throw an exception earlier.

I know it is contained the HTML of the master, but an API point would also
be helpful. Does this exist?

Thanks!
-Pat


Re: Get Spark version before starting context

2015-07-04 Thread Patrick Woody
To somewhat answer my own question - it looks like an empty request to the
rest API will throw an error which returns the version in JSON as well.
Still not ideal though. Would there be any objection to adding a simple
version endpoint to the API?

On Sat, Jul 4, 2015 at 4:00 PM, Patrick Woody 
wrote:

> Hey all,
>
> Is it possible to reliably get the version string of a Spark cluster prior
> to trying to connect via the SparkContext on the client side? Most of the
> errors I've seen on mismatched versions have been cryptic, so it would be
> helpful if I could throw an exception earlier.
>
> I know it is contained the HTML of the master, but an API point would also
> be helpful. Does this exist?
>
> Thanks!
> -Pat
>


SparkHub: a new community site for Apache Spark

2015-07-10 Thread Patrick Wendell
Hi All,

Today, I'm happy to announce SparkHub
(http://sparkhub.databricks.com), a service for the Apache Spark
community to easily find the most relevant Spark resources on the web.

SparkHub is a curated list of Spark news, videos and talks, package
releases, upcoming events around the world, and a Spark Meetup
directory to help you find a meetup close to you.

We will continue to expand the site in the coming months and add more
content. I hope SparkHub can help you find Spark related information
faster and more easily than is currently possible. Everything is
sourced from the Spark community, and we welcome input from you as
well!

- Patrick

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Announcing Spark 1.4.1!

2015-07-15 Thread Patrick Wendell
Hi All,

I'm happy to announce the Spark 1.4.1 maintenance release.
We recommend all users on the 1.4 branch upgrade to
this release, which contain several important bug fixes.

Download Spark 1.4.1 - http://spark.apache.org/downloads.html
Release notes - http://spark.apache.org/releases/spark-release-1-4-1.html
Comprehensive list of fixes - http://s.apache.org/spark-1.4.1

Thanks to the 85 developers who worked on this release!

Please contact me directly for errata in the release notes.

- Patrick

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Streaming WriteAheadLogBasedBlockHandler disallows parellism via StorageLevel replication factor

2016-04-13 Thread Patrick McGloin
Hi all,

If I am using a Custom Receiver with Storage Level set to StorageLevel.
MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs:

16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level
replication 2 is unnecessary when write ahead log is enabled, change
to replication 1
16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined
storage level StorageLevel(false, true, false, false, 2) is changed to
effective storage level StorageLevel(false, true, false, false, 1)
when write ahead log is enabled


My application is running on 4 Executors with 4 cores each, and 1
Receiver.  Because the data is not replicated the processing runs on only
one Executor:

[image: Inline images 1]

Instead of 16 cores processing the Streaming data only 4 are being used.

We cannot reparation the DStream to distribute data to more Executors since
if you call reparation on an RDD which is only located on one node, the new
partitions are only created on that node, which doesn't help.  This theory
that repartitioning doesn't help can be tested with this simple example,
which tries to go from one partition on a single node to many on many
nodes.  What you find with when you look at the multiplePartitions RDD in
the UI is that its 6 partitions are on the same Executor.

scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd")
rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at
parallelize at :27

scala> rdd.count()
res0: Long = 6

scala> val singlePartition = rdd.repartition(1).cache.setName("singlePartition")
singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition
MapPartitionsRDD[4] at repartition at :29

scala> singlePartition.count()
res1: Long = 6

scala> val multiplePartitions =
singlePartition.repartition(6).cache.setName("multiplePartitions")
multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions
MapPartitionsRDD[8] at repartition at :31

scala> multiplePartitions.count()
res2: Long = 6

Am I correct in the use of reparation, that the data does not get
shuffled if it is all on one Executor?

Shouldn't I be allowed to set the Receiver replication factor to two
when the WAL is enabled so that multiple Executors can work on the
Streaming input data?

We will look into creating 4 Receivers so that the data gets distributed
more evenly.  But won't that "waste" 4 cores in our example, where one
would do?

Best regards,
Patrick


Re: Behaviour of RDD sampling

2016-05-31 Thread Patrick Baier
I would assume that the driver has to count the number of lines in the json
file anyway.
Otherwise, how could it tell the workers which lines they should work on?



2016-05-31 10:03 GMT+02:00 Gavin Yue :

> If not reading the whole dataset, how do you know the total number of
> records? If not knowing total number, how do you choose 30%?
>
>
>
> > On May 31, 2016, at 00:45, pbaier  wrote:
> >
> > Hi all,
> >
> > I have to following use case:
> > I have around 10k of jsons that I want to use for learning.
> > The jsons are all stored in one file.
> >
> > For learning a ML model, however, I only need around 30% of the jsons
> (the
> > rest is not needed at all).
> > So, my idea was to load all data into a RDD and then use the rdd.sample
> > method to get my fraction of the data.
> > I implemented this, and in the end it took as long as loading the whole
> data
> > set.
> > So I was wondering if Spark is still loading the whole dataset from disk
> and
> > does the filtering afterwards?
> > If this is the case, why does Spark not push down the filtering and load
> > only a fraction of data from the disk?
> >
> > Cheers,
> >
> > Patrick
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Behaviour-of-RDD-sampling-tp27052.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>



-- 

Patrick Baier

Payment Analytics



*E-Mail: patrick.ba...@zalando.de *


Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Patrick Duin
Hi,

I'm trying to use sqlContext.createExternalTable("my_table",
"/tmp/location/", "orc") to create tables. This is working fine for
non-partitioned tables. I'd like to create a partitioned table though, how
do I do that?

Can I add some information in the options: Map[String, String] parameter?

Thanks,
 Patrick


Re: Create external table with partitions using sqlContext.createExternalTable

2016-06-14 Thread Patrick Duin
Thanks, yes I have something similar working as "the alternative solution".
:)

I was hoping to get away with not having to specify my schema so the
sqlContext.createExternalTable seemed like a nice clean approach.

2016-06-14 13:59 GMT+02:00 Mich Talebzadeh :

> Try this this will work
>
> sql("use test")
> sql("drop table if exists test.orctype")
> var sqltext: String = ""
> sqltext = """
> CREATE EXTERNAL TABLE test.orctype(
>  prod_id bigint,
>  cust_id bigint,
>  time_id timestamp,
>  channel_id bigint,
>  promo_id bigint,
>  quantity_sold decimal(10,0),
>  amount_sold decimal(10,0))
> PARTITIONED BY (
>year int,
>month int)
> CLUSTERED BY (
>  prod_id,
>  cust_id,
>  time_id,
>  channel_id,
>  promo_id)
> INTO 256 BUCKETS
> STORED AS ORC
> LOCATION '/tmp'
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>   "orc.create.index"="true",
>
> "orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
>   "orc.bloom.filter.fpp"="0.05",
>   "orc.stripe.size"="268435456",
>   "orc.row.index.stride"="1" )
> """
> sql(sqltext)
> sql("select count(1) from test.orctype").show
>
> res2: org.apache.spark.sql.DataFrame = [result: string]
> +---+
> |_c0|
> +---+
> |  0|
> +---+
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 June 2016 at 11:39, Patrick Duin  wrote:
>
>> Hi,
>>
>> I'm trying to use sqlContext.createExternalTable("my_table",
>> "/tmp/location/", "orc") to create tables. This is working fine for
>> non-partitioned tables. I'd like to create a partitioned table though, how
>> do I do that?
>>
>> Can I add some information in the options: Map[String, String] parameter?
>>
>> Thanks,
>>  Patrick
>>
>
>


Spark 1.6.2 short circuit AND filter broken

2016-07-07 Thread Patrick Woody
Hey all,

I hit a pretty nasty bug on 1.6.2 that I can't reproduce on 2.0. Here is
the code/logical plan http://pastebin.com/ULnHd1b6. I have filterPushdown
disabled, so when I call collect here it hits the Exception in my UDF
before doing a null check on the input.

I believe it is a symptom of how the DataSourceStrategy splits the
predicates and recombines since it happens at the Physical planning, but I
haven't gone deeper. This doesn't reproduce if I simply use case classes
and sqlContext.createDataFrame.

Is there going to be a 1.6.3 release where this bug can be fixed? I'm happy
to dig further and send up a PR.

Thanks!
-Pat


DataFrame partitionBy to a single Parquet file (per partition)

2016-01-14 Thread Patrick McGloin
Hi,

I would like to reparation / coalesce my data so that it is saved into one
Parquet file per partition. I would also like to use the Spark SQL
partitionBy API. So I could do that like this:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day",
"status").mode(SaveMode.Append).parquet(s"$location")

I've tested this and it doesn't seem to perform well. This is because there
is only one partition to work on in the dataset and all the partitioning,
compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the
distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?

Best regards,

Patrick


Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Patrick McGloin
I will try this in Monday. Thanks for the tip.

On Fri, 15 Jan 2016, 18:58 Cheng Lian  wrote:

> You may try DataFrame.repartition(partitionExprs: Column*) to shuffle all
> data belonging to a single (data) partition into a single (RDD) partition:
>
> df.coalesce(1).repartition("entity", "year", "month", "day", 
> "status").write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> (Unfortunately the naming here can be quite confusing.)
>
>
> Cheng
>
>
> On 1/14/16 11:48 PM, Patrick McGloin wrote:
>
> Hi,
>
> I would like to reparation / coalesce my data so that it is saved into one
> Parquet file per partition. I would also like to use the Spark SQL
> partitionBy API. So I could do that like this:
>
> df.coalesce(1).write.partitionBy("entity", "year", "month", "day", 
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> I've tested this and it doesn't seem to perform well. This is because
> there is only one partition to work on in the dataset and all the
> partitioning, compression and saving of files has to be done by one CPU
> core.
>
> I could rewrite this to do the partitioning manually (using filter with
> the distinct partition values for example) before calling coalesce.
>
> But is there a better way to do this using the standard Spark SQL API?
>
> Best regards,
>
> Patrick
>
>
>
>


Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-21 Thread Patrick McGloin
Hi all,

To have a simple way of testing the Spark Streaming Write Ahead Log I
created a very simple Custom Input Receiver, which will generate strings
and store those:

class InMemoryStringReceiver extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {

  val batchID = System.currentTimeMillis()

  def onStart() {
new Thread("InMemoryStringReceiver") {
  override def run(): Unit = {
var i = 0
while(true) {
  //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
  //To implement a reliable receiver, you have to use
store(multiple-records) to store data.
  store(ArrayBuffer(s"$batchID-$i"))
  println(s"Stored => [$batchID-$i)]")
  Thread.sleep(1000L)
  i = i + 1
}
  }
}.start()
  }

  def onStop() {}
}

I then created a simple Application which will use the Custom Receiver to
stream the data and process it:

object DStreamResilienceTest extends App {

  val conf = new
SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
"true")
  val ssc = new StreamingContext(conf, Seconds(1))
  
ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] =
ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
println(s"processed => [${rdd.collect().toList}]")
Thread.sleep(2000L)
  }
  ssc.start()
  ssc.awaitTermination()

}

As you can see the processing of each received RDD has sleep of 2 seconds
while the Strings are stored every second. This creates a backlog and the
new strings pile up, and should be stored in the WAL. Indeed, I can see the
files in the checkpoint dirs getting updated. Running the app I get output
like this:

[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

As you would expect, the storing is out pacing the processing. So I kill
the application and restart it. This time I commented out the sleep in the
foreachRDD so that the processing can clear any backlog:

[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

As you can see the new events are processed but none from the previous
batch. The old WAL logs are cleared and I see log messages like this but
the old data does not get processed.

INFO WriteAheadLogManager : Recovered 1 write ahead log files from
hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

What am I doing wrong? I am using Spark 1.5.2.

Best regards,

Patrick


Getting top distinct strings from arraylist

2016-01-25 Thread Patrick Plaatje
Hi, 

I’m quite new to Spark and MR, but have a requirement to get all distinct 
values with their respective counts from a transactional file. Let’s assume the 
following file format:

0 1 2 3 4 5 6 7
1 3 4 5 8 9
9 10 11 12 13 14 15 16 17 18
1 4 7 11 12 13 19 20
3 4 7 11 15 20 21 22 23
1 2 5 9 11 12 16

Given this, I would like an ArrayList back, where the String 
is the item identifier and the Integer the count of that item identifier in the 
file. The following is what I came up with to map the values, but can’t figure 
out how to do the counting :(

// create RDD of an arraylist of strings

JavaRDD> transactions = sc.textFile(dataPath).map(

new Function>() {

private static final long serialVersionUID = 1L;

@Override

public ArrayList call(String s) {

return Lists.newArrayList(s.split(" "));

}

}

);


Any ideas?

Thanks!
Patrick



Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-26 Thread Patrick McGloin
Thank you Shixiong, that is what I was missing.

On 26 January 2016 at 00:27, Shixiong(Ryan) Zhu 
wrote:

> You need to define a create function and use StreamingContext.getOrCreate.
> See the example here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
>
> On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> Hi all,
>>
>> To have a simple way of testing the Spark Streaming Write Ahead Log I
>> created a very simple Custom Input Receiver, which will generate strings
>> and store those:
>>
>> class InMemoryStringReceiver extends 
>> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>>
>>   val batchID = System.currentTimeMillis()
>>
>>   def onStart() {
>> new Thread("InMemoryStringReceiver") {
>>   override def run(): Unit = {
>> var i = 0
>> while(true) {
>>   
>> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>   //To implement a reliable receiver, you have to use 
>> store(multiple-records) to store data.
>>   store(ArrayBuffer(s"$batchID-$i"))
>>   println(s"Stored => [$batchID-$i)]")
>>   Thread.sleep(1000L)
>>   i = i + 1
>> }
>>   }
>> }.start()
>>   }
>>
>>   def onStop() {}
>> }
>>
>> I then created a simple Application which will use the Custom Receiver to
>> stream the data and process it:
>>
>> object DStreamResilienceTest extends App {
>>
>>   val conf = new 
>> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>>  "true")
>>   val ssc = new StreamingContext(conf, Seconds(1))
>>   
>> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>>   val customReceiverStream: ReceiverInputDStream[String] = 
>> ssc.receiverStream(new InMemoryStringReceiver())
>>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
>> println(s"processed => [${rdd.collect().toList}]")
>> Thread.sleep(2000L)
>>   }
>>   ssc.start()
>>   ssc.awaitTermination()
>>
>> }
>>
>> As you can see the processing of each received RDD has sleep of 2 seconds
>> while the Strings are stored every second. This creates a backlog and the
>> new strings pile up, and should be stored in the WAL. Indeed, I can see the
>> files in the checkpoint dirs getting updated. Running the app I get output
>> like this:
>>
>> [info] Stored => [1453374654941-0)]
>> [info] processed => [List(1453374654941-0)]
>> [info] Stored => [1453374654941-1)]
>> [info] Stored => [1453374654941-2)]
>> [info] processed => [List(1453374654941-1)]
>> [info] Stored => [1453374654941-3)]
>> [info] Stored => [1453374654941-4)]
>> [info] processed => [List(1453374654941-2)]
>> [info] Stored => [1453374654941-5)]
>> [info] Stored => [1453374654941-6)]
>> [info] processed => [List(1453374654941-3)]
>> [info] Stored => [1453374654941-7)]
>> [info] Stored => [1453374654941-8)]
>> [info] processed => [List(1453374654941-4)]
>> [info] Stored => [1453374654941-9)]
>> [info] Stored => [1453374654941-10)]
>>
>> As you would expect, the storing is out pacing the processing. So I kill
>> the application and restart it. This time I commented out the sleep in the
>> foreachRDD so that the processing can clear any backlog:
>>
>> [info] Stored => [1453374753946-0)]
>> [info] processed => [List(1453374753946-0)]
>> [info] Stored => [1453374753946-1)]
>> [info] processed => [List(1453374753946-1)]
>> [info] Stored => [1453374753946-2)]
>> [info] processed => [List(1453374753946-2)]
>> [info] Stored => [1453374753946-3)]
>> [info] processed => [List(1453374753946-3)]
>> [info] Stored => [1453374753946-4)]
>> [info] processed => [List(1453374753946-4)]
>>
>> As you can see the new events are processed but none from the previous
>> batch. The old WAL logs are cleared and I see log messages like this but
>> the old data does not get processed.
>>
>> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
>> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>>
>> What am I doing wrong? I am using Spark 1.5.2.
>>
>> Best regards,
>>
>> Patrick
>>
>
>


“java.io.IOException: Class not found” on long running Streaming application

2016-01-28 Thread Patrick McGloin
I am getting the exception below on a long running Spark Streaming
application. The exception could occur after a few minutes, but it may also
may not happen for days. This is with pretty consistent input data.

I have seen this Jira ticket
 (
https://issues.apache.org/jira/browse/SPARK-6152) but I don't think it is
the same issue. That is java.lang.IllegalArgumentException and this is
java.io.IOException:
Class not found.

My application is streaming data and writing to Parquet using Spark SQL.

I am using Spark 1.5.2. Any ideas?

28-01-2016 09:36:00 ERROR JobScheduler:96 - Error generating jobs for
time 145397376 ms
java.io.IOException: Class not found
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
Source)
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
Source)
at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
at 
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.map(RDD.scala:317)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at 
org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStrea


Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
I am trying to understand what will happen when Spark has an exception
during processing, especially while streaming.

If I have a small code spinet like this:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  throw new Exception("User exception...")
}

If I run this I will get output like this:

[info] processed => [List(Item1)]
[error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job
streaming job 1453999278000 ms.0
[error] java.lang.Exception: User exception...
...
[info] processed => [List(Item2)]
[error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job
streaming job 1453999279000 ms.0
[error] java.lang.Exception: User exception...

First "Item1" is processed, and it fails (of course). In the next batch
"Item2" is processed. The record "Item1" has now been lost.

If I change my code so that the exception occurs inside a task:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  rdd.map{case x => throw new Exception("User exception...") }.collect()
}

Then the map closure will be retried, but once it has failed enough times
the record is discarded and processing continues to the next record.

Is it possible to ensure that records are not discarded, even if this means
stopping the application? I have the WAL enabled.


Re: Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
Hi Tathagata,

Thanks for the response.  I can add in a try catch myself and handle user
exceptions, that's true, so maybe my example wasn't a very good one.  I'm
more worried about OOM exceptions and other run-time exceptions (that could
happen outside my try catch).

For example, I have this periodic "java.io.IOException: Class not found"
exception at the moment:

https://forums.databricks.com/questions/6601/javaioioexception-class-not-found-on-long-running.html

After this happens I lose data even though I have the WAL setup.  With the
WAL I can ensure that the data is safely stored when it has come into the
system from an external source, and I only ACK the external source after it
has been stored.  But it seems that there is no guarantee that the data is
successfully processed?

I assume I am right in what I am saying about losing data with the WAL
setup correctly.  The WAL works when stopping and starting the application,
etc.  But something is not handling the run time exception well.  This was
the start of my investigation into what is going wrong, so of course there
could be another reason for what I'm seeing.



On 28 January 2016 at 21:43, Tathagata Das 
wrote:

> That is hard to guarantee by the system, and it is upto the app developer
> to ensure that this is not . For example, if the data in a message is
> corrupted, unless the app code is robust towards handling such data, the
> system will fail every time it retries that app code.
>
> On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> I am trying to understand what will happen when Spark has an exception
>> during processing, especially while streaming.
>>
>> If I have a small code spinet like this:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   throw new Exception("User exception...")
>> }
>>
>> If I run this I will get output like this:
>>
>> [info] processed => [List(Item1)]
>> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999278000 ms.0
>> [error] java.lang.Exception: User exception...
>> ...
>> [info] processed => [List(Item2)]
>> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999279000 ms.0
>> [error] java.lang.Exception: User exception...
>>
>> First "Item1" is processed, and it fails (of course). In the next batch
>> "Item2" is processed. The record "Item1" has now been lost.
>>
>> If I change my code so that the exception occurs inside a task:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   rdd.map{case x => throw new Exception("User exception...") }.collect()
>> }
>>
>> Then the map closure will be retried, but once it has failed enough times
>> the record is discarded and processing continues to the next record.
>>
>> Is it possible to ensure that records are not discarded, even if this
>> means stopping the application? I have the WAL enabled.
>>
>
>


Re: Recommended storage solution for my setup (~5M items, 10KB pr.)

2016-02-04 Thread Patrick Skjennum

(Am I doing this mailinglist thing right? Never used this ...)

I do not have a cluster.

Initially I tried to setup hadoop+hbase+spark, but after spending a week 
trying to get work, I gave up. I had a million problems with mismatching 
versions, and things working locally on the server, but not 
programatically through my client computer, and vice versa. There was 
/always something /that did not work, one way another.


And since I had to actually get things /done /rather than becoming an 
expert in clustering, I gave up and just used simple serializing.


Now I'm going to make a second attempt, but this time around I'll ask 
for help:p


--
mvh
Patrick Skjennum


On 04.02.2016 22.14, Ted Yu wrote:

bq. had a hard time setting it up

Mind sharing your experience in more detail :-)
If you already have a hadoop cluster, it should be relatively straight 
forward to setup.


Tuning needs extra effort.

On Thu, Feb 4, 2016 at 12:58 PM, habitats <mailto:m...@habitats.no>> wrote:


Hello

I have ~5 million text documents, each around 10-15KB in size, and
split
into ~15 columns. I intend to do machine learning, and thus I need to
extract all of the data at the same time, and potentially update
everything
on every run.

So far I've just used json serializing, or simply cached the RDD
to dick.
However, I feel like there must be a better way.

I have tried HBase, but I had a hard time setting it up and
getting it to
work properly. It also felt like a lot of work for my simple
requirements. I
want something /simple/.

Any suggestions?



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-storage-solution-for-my-setup-5M-items-10KB-pr-tp26150.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






Re: newbie unable to write to S3 403 forbidden error

2016-02-13 Thread Patrick Plaatje
Not sure if it’s related, but in our Hadoop configuration we’re also setting 

sc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem”);

Cheers,
-patrick

From:  Andy Davidson 
Date:  Friday, 12 February 2016 at 17:34
To:  Igor Berman 
Cc:  "user @spark" 
Subject:  Re: newbie unable to write to S3 403 forbidden error

Hi Igor

So I assume you are able to use s3 from spark? 

Do you use rdd.saveAsTextFile() ?

How did you create your cluster? I.E. Did you use the spark-1.6.0/spark-ec2 
script, EMR, or something else?


I tried several version of the url including no luck :-(

The bucket name is ‘com.ps.twitter’. It has a folder ‘son'

We have a developer support contract with amazon how ever our case has been 
unassigned for several days now

Thanks

Andy

P.s. In general debugging permission problems is always difficult from the 
client side. Secure servers do not want to make it easy for hackers

From:  Igor Berman 
Date:  Friday, February 12, 2016 at 4:53 AM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: newbie unable to write to S3 403 forbidden error

 String dirPath = "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/json” 

not sure, but 
can you try to remove s3-us-west-1.amazonaws.com from path ?

On 11 February 2016 at 23:15, Andy Davidson  
wrote:
I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I am 
using the standalone cluster manager

My java streaming app is not able to write to s3. It appears to be some for of 
permission problem. 

Any idea what the problem might be?

I tried use the IAM simulator to test the policy. Everything seems okay. Any 
idea how I can debug this problem?

Thanks in advance

Andy

JavaSparkContext jsc = new JavaSparkContext(conf);


// I did not include the full key in my email
   // the keys do not contain ‘\’
   // these are the keys used to create the cluster. They belong to the IAM 
user andy
jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX");

jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 
"uBh9v1hdUctI23uvq9qR");




  private static void saveTweets(JavaDStream jsonTweets, String 
outputURI) {

jsonTweets.foreachRDD(new VoidFunction2, Time>() {

private static final long serialVersionUID = 1L;



@Override

public void call(JavaRDD rdd, Time time) throws Exception {

if(!rdd.isEmpty()) {

// bucket name is ‘com.pws.twitter’ it has a folder ‘json'

String dirPath = 
"s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/json” + "-" + 
time.milliseconds();

rdd.saveAsTextFile(dirPath);

}

}

});




Bucket name : com.pws.titter
Bucket policy (I replaced the account id)

{
"Version": "2012-10-17",
"Id": "Policy1455148808376",
"Statement": [
{
"Sid": "Stmt1455148797805",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:user/andy"
},
"Action": "s3:*",
"Resource": "arn:aws:s3:::com.pws.twitter/*"
}
]
}






RE: Submitting Jobs Programmatically

2016-02-21 Thread Patrick Mi
Hi there,

I had similar problem in Java with the standalone cluster on Linux but got
that working by passing the following option

-Dspark.jars=file:/path/to/sparkapp.jar

sparkapp.jar has the launch application

Hope that helps.

Regards,

Patrick



-Original Message-
From: Arko Provo Mukherjee [mailto:arkoprovomukher...@gmail.com]
Sent: Saturday, 20 February 2016 4:27 p.m.
To: Ted Yu
Cc: Holden Karau; user
Subject: Re: Submitting Jobs Programmatically

Hello,

Thanks much. I could start the service.

When I run my program, the launcher is not being able to find the app class:

java.lang.ClassNotFoundException: SparkSubmitter
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
Spark job complete. Exit code:101
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:639)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My launch code is as follows:
val spark = new SparkLauncher()
.setSparkHome("C:\\spark-1.5.1-bin-hadoop2.6")

.setAppResource("C:\\SparkService\\Scala\\RequestSubmitter\\target\\scala-2.10\\spark-submitter_2.10-0.0.1.jar")
.setMainClass("SparkSubmitter")
.addAppArgs(inputQuery)
.setMaster("spark://157.54.189.70:7077")
.launch()
spark.waitFor()

I added the spark-submitter_2.10-0.0.1.jar in the classpath as well
but that didn't help.

Thanks & regards
Arko

On Fri, Feb 19, 2016 at 6:49 PM, Ted Yu  wrote:
> Cycling old bits:
>
> http://search-hadoop.com/m/q3RTtHrxMj2abwOk2
>
> On Fri, Feb 19, 2016 at 6:40 PM, Arko Provo Mukherjee
>  wrote:
>>
>> Hi,
>>
>> Thanks for your response. Is there a similar link for Windows? I am
>> not sure the .sh scripts would run on windows.
>>
>> My default the start-all.sh doesn't work and I don't see anything in
>> localhos:8080
>>
>> I will do some more investigation and come back.
>>
>> Thanks again for all your help!
>>
>> Thanks & regards
>> Arko
>>
>>
>> On Fri, Feb 19, 2016 at 6:35 PM, Ted Yu  wrote:
>> > Please see https://spark.apache.org/docs/latest/spark-standalone.html
>> >
>> > On Fri, Feb 19, 2016 at 6:27 PM, Arko Provo Mukherjee
>> >  wrote:
>> >>
>> >> Hi,
>> >>
>> >> Thanks for your response, that really helped.
>> >>
>> >> However, I don't believe the job is being submitted. When I run spark
>> >> from the shell, I don't need to start it up explicitly. Do I need to
>> >> start up Spark on my machine before running this program?
>> >>
>> >> I see the following in the SPARK_HOME\bin directory:
>> >> Name
>> >> 
>> >> beeline.cmd
>> >> load-spark-env.cmd
>> >> pyspark.cmd
>> >> pyspark2.cmd
>> >> run-example.cmd
>> >> run-example2.cmd
>> >> spark-class.cmd
>> >> spark-class2.cmd
>> >> spark-shell.cmd
>> >> spark-shell2.cmd
>> >> spark-submit.cmd
>> >> spark-submit2.cmd
>> >> sparkR.cmd
>> >> sparkR2.cmd
>> >>
>> >> Do I need to run anyone of them before submitting the job via the
>> >> program?
>> >>
>> >> Thanks & regards
>> >> Arko
>> >>
>> >> On Fri, Feb 19, 2016 at 6:01 PM, Holden Karau 
>> >> wrote:
>> >> > How are you trying to launch your application? Do you have the Spark
>> >> > jars on
>> >> > your class path?
>> >> >
>> >> >
>> >> > On Friday, February 19, 2016, Arko Provo Mukherjee
>> >> >  wrote:
>> >> >>
>> >> >> Hello,
>> >> >>
>> >> >> I am trying to submit a spark job via a program.
>> >> >>
>> >> 

Re: Spark UI consuming lots of memory

2015-10-27 Thread Patrick McGloin
Hi Nicholas,

I think you are right about the issue relating to Spark-11126, I'm seeing
it as well.

Did you find any workaround?  Looking at the pull request for the fix it
doesn't look possible.

Best regards,
Patrick

On 15 October 2015 at 19:40, Nicholas Pritchard <
nicholas.pritch...@falkonry.com> wrote:

> Thanks for your help, most likely this is the memory leak you are fixing
> in https://issues.apache.org/jira/browse/SPARK-11126.
> -Nick
>
> On Mon, Oct 12, 2015 at 9:00 PM, Shixiong Zhu  wrote:
>
>> In addition, you cannot turn off JobListener and SQLListener now...
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-10-13 11:59 GMT+08:00 Shixiong Zhu :
>>
>>> Is your query very complicated? Could you provide the output of
>>> `explain` your query that consumes an excessive amount of memory? If this
>>> is a small query, there may be a bug that leaks memory in SQLListener.
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-10-13 11:44 GMT+08:00 Nicholas Pritchard <
>>> nicholas.pritch...@falkonry.com>:
>>>
>>>> As an update, I did try disabling the ui with "spark.ui.enabled=false",
>>>> but the JobListener and SQLListener still consume a lot of memory, leading
>>>> to OOM error. Has anyone encountered this before? Is the only solution just
>>>> to increase the driver heap size?
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>> On Mon, Oct 12, 2015 at 8:42 PM, Nicholas Pritchard <
>>>> nicholas.pritch...@falkonry.com> wrote:
>>>>
>>>>> I set those configurations by passing to spark-submit script:
>>>>> "bin/spark-submit --conf spark.ui.retainedJobs=20 ...". I have verified
>>>>> that these configurations are being passed correctly because they are
>>>>> listed in the environments tab and also by counting the number of
>>>>> job/stages that are listed. The "spark.sql.ui.retainedExecutions=0"
>>>>> only applies to the number of "completed" executions; there will always be
>>>>> a "running" execution. For some reason, I have one execution that consumes
>>>>> an excessive amount of memory.
>>>>>
>>>>> Actually, I am not interested in the SQL UI, as I find the Job/Stages
>>>>> UI to have sufficient information. I am also using Spark Standalone 
>>>>> cluster
>>>>> manager so have not had to use the history server.
>>>>>
>>>>>
>>>>> On Mon, Oct 12, 2015 at 8:17 PM, Shixiong Zhu 
>>>>> wrote:
>>>>>
>>>>>> Could you show how did you set the configurations? You need to set
>>>>>> these configurations before creating SparkContext and SQLContext.
>>>>>>
>>>>>> Moreover, the history sever doesn't support SQL UI. So
>>>>>> "spark.eventLog.enabled=true" doesn't work now.
>>>>>>
>>>>>> Best Regards,
>>>>>> Shixiong Zhu
>>>>>>
>>>>>> 2015-10-13 2:01 GMT+08:00 pnpritchard <
>>>>>> nicholas.pritch...@falkonry.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> In my application, the Spark UI is consuming a lot of memory,
>>>>>>> especially the
>>>>>>> SQL tab. I have set the following configurations to reduce the memory
>>>>>>> consumption:
>>>>>>> - spark.ui.retainedJobs=20
>>>>>>> - spark.ui.retainedStages=40
>>>>>>> - spark.sql.ui.retainedExecutions=0
>>>>>>>
>>>>>>> However, I still get OOM errors in the driver process with the
>>>>>>> default 1GB
>>>>>>> heap size. The following link is a screen shot of a heap dump report,
>>>>>>> showing the SQLListener instance having a retained size of 600MB.
>>>>>>>
>>>>>>> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>>>>>>>
>>>>>>> Rather than just increasing the allotted heap size, does anyone have
>>>>>>> any
>>>>>>> other ideas? Is it possible to disable the SQL tab specifically? I
>>>>>>> also
>>>>>>> thought about serving the UI from disk rather than memory with
>>>>>>> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has
>>>>>>> anyone tried
>>>>>>> this before?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Nick
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Data in one partition after reduceByKey

2015-11-20 Thread Patrick McGloin
Hi,

I have Spark application which contains the following segment:

val reparitioned = rdd.repartition(16)
val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned,
startDate, endDate)
val mapped: RDD[(DateTime, myData)] =
filtered.map(kv=(kv._1.processingTime, kv._2))
val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)

When I run this with some logging this is what I see:

reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512,
2508, 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]

My logging is done using these two lines:

val sizes: RDD[Int] = rdd.mapPartitions(iter =>
Array(iter.size).iterator, true)log.info(s"rdd ==>
[${sizes.collect.toList}]")

My question is why does my data end up in one partition after the
reduceByKey? After the filter it can be seen that the data is evenly
distributed, but the reduceByKey results in data in only one partition.

Thanks,

Patrick


Re: Data in one partition after reduceByKey

2015-11-23 Thread Patrick McGloin
I will answer my own question, since I figured it out.  Here is my answer
in case anyone else has the same issue.

My DateTimes were all without seconds and milliseconds since I wanted to
group data belonging to the same minute. The hashCode() for Joda DateTimes
which are one minute apart is a constant:

scala> val now = DateTime.now
now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z

scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode -
now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
res42: Int = 6

As can be seen by this example, if the hashCode values are similarly
spaced, they can end up in the same partition:

scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i)
nums: scala.collection.immutable.IndexedSeq[(Int, Int)] =
Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6),
(140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13),
(280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20),
(420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27),
(560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34),
(700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41),
(840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48),
(980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55),
(120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62),
(260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69),
(400,70), (420,71), (440,72), (460,73), (480,74), (500...

scala> val rddNum = sc.parallelize(nums)
rddNum: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at :23

scala> val reducedNum = rddNum.reduceByKey(_+_)
reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
reduceByKey at :25

scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator,
true).collect.toList

res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0)

To distribute my data more evenly across the partitions I created my own
custom Partitoiner:

class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
  def numPartitions: Int = rddNumPartitions
  def getPartition(key: Any): Int = {
key match {
  case dateTime: DateTime =>
val sum = dateTime.getYear + dateTime.getMonthOfYear +
dateTime.getDayOfMonth + dateTime.getMinuteOfDay  +
dateTime.getSecondOfDay
sum % numPartitions
  case _ => 0
}
  }
}


On 20 November 2015 at 17:17, Patrick McGloin 
wrote:

> Hi,
>
> I have Spark application which contains the following segment:
>
> val reparitioned = rdd.repartition(16)
> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, 
> endDate)
> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, 
> kv._2))
> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)
>
> When I run this with some logging this is what I see:
>
> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 
> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]
>
> My logging is done using these two lines:
>
> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, 
> true)log.info(s"rdd ==> [${sizes.collect.toList}]")
>
> My question is why does my data end up in one partition after the
> reduceByKey? After the filter it can be seen that the data is evenly
> distributed, but the reduceByKey results in data in only one partition.
>
> Thanks,
>
> Patrick
>


Driver Hangs before starting Job

2015-12-01 Thread Patrick Brown
Hi,

I am building a Spark app which aggregates sensor data stored in Cassandra.
After I submit my app to spark the driver and application show up quickly
then, before any Spark job shows up in the application UI  there is a huge
lag, on the order of minutes to sometimes hours. Once the Spark job itself
gets added, processing is very fast (ms to possibly a few seconds).

While this hang is occurring there is not a high load on my driver (cpu
etc. through top) or my Cassandra cluster (disk/network etc. seen through
top and Datastax Opscenter). Once the job does start you can see the load
spike.

I am accessing Cassandra using the Datastax Spark Cassandra connector. I do
have a lot of Cassandra partitions accessed in each job (7 days * 24 hours
* 50+ sensors) and I can see these Cassandra partitions in the DAG graph. I
have tried coalesce, which seems to help somewhat but the lag is still
orders of magnitude larger than any processing time.

Thanks,

Patrick


Class weights and prediction probabilities in random forest?

2015-07-23 Thread Patrick Crenshaw
I was just wondering if there were plans to implement class weights and
prediction probabilities in random forest? Is anyone working on this?

smime.p7s
Description: S/MIME cryptographic signature


Re: Twitter4J streaming question

2015-07-23 Thread Patrick McCarthy
How can I tell if it's the sample stream or full stream ?
Thanks

Sent from my iPhone

On Jul 23, 2015, at 4:17 PM, Enno Shioji 
mailto:eshi...@gmail.com>> wrote:

You are probably listening to the sample stream, and THEN filtering. This means 
you listen to 1% of the twitter stream, and then looking for the tweet by 
Bloomberg, so there is a very good chance you don't see the particular tweet.

In order to get all Bloomberg related tweets, you must connect to twitter using 
the filter API and not the sample API: 
https://dev.twitter.com/streaming/reference/post/statuses/filter

On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy 
mailto:pmccar...@eatonvance.com>> wrote:
Hopefully this is an easy one.  I am trying to filter a twitter dstream by
user ScreenName - my code is as follows
val stream = TwitterUtils.createStream(ssc, None)
.filter(_.getUser.getScreenName.contains("markets"))

however nothing gets returned and I can see that Bloomberg has tweeted.  If
I remove the filter I get tweets
If I change the code to looke for engligh or french tweets that works

Is there a better way to do it ?

Can anyone assist ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Twitter4J streaming question

2015-07-23 Thread Patrick McCarthy
Ahh
Makes sense - thanks for the help

Sent from my iPhone

On Jul 23, 2015, at 4:29 PM, Enno Shioji 
mailto:eshi...@gmail.com>> wrote:

You need to pay a lot of money to get the full stream, so unless you are doing 
that, it's the sample stream!

On Thu, Jul 23, 2015 at 9:26 PM, Patrick McCarthy 
mailto:pmccar...@eatonvance.com>> wrote:
How can I tell if it's the sample stream or full stream ?
Thanks

Sent from my iPhone

On Jul 23, 2015, at 4:17 PM, Enno Shioji 
mailto:eshi...@gmail.com>> wrote:

You are probably listening to the sample stream, and THEN filtering. This means 
you listen to 1% of the twitter stream, and then looking for the tweet by 
Bloomberg, so there is a very good chance you don't see the particular tweet.

In order to get all Bloomberg related tweets, you must connect to twitter using 
the filter API and not the sample API: 
https://dev.twitter.com/streaming/reference/post/statuses/filter

On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy 
mailto:pmccar...@eatonvance.com>> wrote:
Hopefully this is an easy one.  I am trying to filter a twitter dstream by
user ScreenName - my code is as follows
val stream = TwitterUtils.createStream(ssc, None)
.filter(_.getUser.getScreenName.contains("markets"))

however nothing gets returned and I can see that Bloomberg has tweeted.  If
I remove the filter I get tweets
If I change the code to looke for engligh or french tweets that works

Is there a better way to do it ?

Can anyone assist ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com<http://Nabble.com>.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>





Re: Extremely poor predictive performance with RF in mllib

2015-08-04 Thread Patrick Lam
Yes, I rechecked and the label is correct. As you can see in the code
posted, I used the exact same features for all the classifiers so unless rf
somehow switches the labels, it should be correct.

I have posted a sample dataset and sample code to reproduce what I'm
getting here:

https://github.com/pkphlam/spark_rfpredict

On Tue, Aug 4, 2015 at 6:42 AM, Yanbo Liang  wrote:

> It looks like the predicted result just opposite with expectation, so
> could you check whether the label is right?
> Or could you share several data which can help to reproduce this output?
>
> 2015-08-03 19:36 GMT+08:00 Barak Gitsis :
>
>> hi,
>> I've run into some poor RF behavior, although not as pronounced as you..
>> would be great to get more insight into this one
>>
>> Thanks!
>>
>> On Mon, Aug 3, 2015 at 8:21 AM pkphlam  wrote:
>>
>>> Hi,
>>>
>>> This might be a long shot, but has anybody run into very poor predictive
>>> performance using RandomForest with Mllib? Here is what I'm doing:
>>>
>>> - Spark 1.4.1 with PySpark
>>> - Python 3.4.2
>>> - ~30,000 Tweets of text
>>> - 12289 1s and 15956 0s
>>> - Whitespace tokenization and then hashing trick for feature selection
>>> using
>>> 10,000 features
>>> - Run RF with 100 trees and maxDepth of 4 and then predict using the
>>> features from all the 1s observations.
>>>
>>> So in theory, I should get predictions of close to 12289 1s (especially
>>> if
>>> the model overfits). But I'm getting exactly 0 1s, which sounds
>>> ludicrous to
>>> me and makes me suspect something is wrong with my code or I'm missing
>>> something. I notice similar behavior (although not as extreme) if I play
>>> around with the settings. But I'm getting normal behavior with other
>>> classifiers, so I don't think it's my setup that's the problem.
>>>
>>> For example:
>>>
>>> >>> lrm = LogisticRegressionWithSGD.train(lp, iterations=10)
>>> >>> logit_predict = lrm.predict(predict_feat)
>>> >>> logit_predict.sum()
>>> 9077
>>>
>>> >>> nb = NaiveBayes.train(lp)
>>> >>> nb_predict = nb.predict(predict_feat)
>>> >>> nb_predict.sum()
>>> 10287.0
>>>
>>> >>> rf = RandomForest.trainClassifier(lp, numClasses=2,
>>> >>> categoricalFeaturesInfo={}, numTrees=100, seed=422)
>>> >>> rf_predict = rf.predict(predict_feat)
>>> >>> rf_predict.sum()
>>> 0.0
>>>
>>> This code was all run back to back so I didn't change anything in
>>> between.
>>> Does anybody have a possible explanation for this?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>> --
>> *-Barak*
>>
>
>


-- 
Patrick Lam
Institute for Quantitative Social Science, Harvard University
http://www.patricklam.org


Can't use RandomForestClassificationModel.predict(Vector v) in scala

2016-12-15 Thread Patrick Chen
Hi All
I'm writing Java to use spark 2.0 RandomForestClassificationModel.
After I trained the model, I can use below code to predict :

RandomForestClassificationModel rfModel =
RandomForestClassificationModel.load(modelPath);
Vector v =
Vectors.sparse(FeatureIndex.TOTAL_INDEX.getIndex(), indexes, values);
double result = rfModel.predict(v)

But when I changed to scala , I couldn't use predict method in Classifier
anymore.
How should I use the RandomForestClassificationModel in Scala ?

Thanks
BR

Patrick


Re: FP growth - Items in a transaction must be unique

2017-02-02 Thread Patrick Plaatje
Hi,

 

This indicates you have duplicate products per row in your dataframe, the FP 
implementation only allows unique products per row, so you will need to dedupe 
duplicate products before running the FPGrowth algorithm.

 

Best,

Patrick

 

From: "Devi P.V" 
Date: Thursday, 2 February 2017 at 07:17
To: "user @spark" 
Subject: FP growth - Items in a transaction must be unique

 

Hi all,

I am trying to run FP growth algorithm using spark and scala.sample input 
dataframe is following,

+---+
|productName

+---+
|Apple Iphone 7 128GB Jet Black with Facetime   

|Levi’s Blue Slim Fit Jeans- L5112,Rimmel London Lasting Finish Matte by Kate 
Moss 101 Dusky|
|Iphone 6 Plus (5.5",Limited Stocks, TRA Oman Approved) 

+---+

Each row contains unique items.

 

I converted it into rdd like following
val transactions = names.as[String].rdd.map(s =>s.split(","))

val fpg = new FPGrowth().
  setMinSupport(0.3).
  setNumPartitions(100)


val model = fpg.run(transactions)
But I got error

WARN TaskSetManager: Lost task 2.0 in stage 27.0 (TID 622, localhost):
org.apache.spark.SparkException: 
Items in a transaction must be unique but got WrappedArray(
Huawei GR3 Dual Sim 16GB 13MP 5Inch 4G,
 Huawei G8 Gold 32GB,  4G,  
5.5 Inches, HTC Desire 816 (Dual Sim, 3G, 8GB),
 Samsung Galaxy S7 Single Sim - 32GB,  4G LTE,  
Gold, Huawei P8 Lite 16GB,  4G LTE, Huawei Y625, 
Samsung Galaxy Note 5 - 32GB,  4G LTE, 
Samsung Galaxy S7 Dual Sim - 32GB)

How to solve this?

Thanks



 

 



Re: Market Basket Analysis by deploying FP Growth algorithm

2017-04-05 Thread Patrick Plaatje
Hi Arun,

We have been running into the same issue (having only 1000 unique items, in 
100MM transactions), but have not investigated the root cause of this. We 
decided to run this on a cluster instead (4*16 / 64GB Ram), after which the OOM 
issue went away. However, we ran into the issue that the FPGrowth 
implementation starts spilling over to disk, and we had to increase the /tmp 
partition.

Hope it helps.

BR,
-patrick



On 05/04/2017, 10:29, "asethia"  wrote:

Hi,

We are currently working on a Market Basket Analysis by deploying FP Growth
algorithm on Spark to generate association rules for product recommendation.
We are running on close to 24 million invoices over an assortment of more
than 100k products. However, whenever we relax the support threshold below a
certain level, the stack overflows. We are using Spark 1.6.2 but can somehow
invoke 1.6.3 to counter this error. The problem though is even when we
invoke Spark 1.6.3 and increase the stack size to 100M we are running out of
memory. We believe the tree grows exponentially and is stored in memory
which causes this problem. Can anyone suggest a solution to this issue
please?

Thanks
Arun



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Market-Basket-Analysis-by-deploying-FP-Growth-algorithm-tp28569.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: ERROR Dropping SparkListenerEvent

2017-04-13 Thread Patrick Gomes
Hey all,

I was wondering if anyone could point me where to start debugging the
following error:

ERROR Dropping SparkListenerEvent because no remaining room in event
queue. This likely means one of the SparkListeners is too slow and
cannot keep up with the rate at which tasks are being started by the
scheduler.

This happens when I call the fit method of Count Vectorizer on a fairly
small dataset (< 20 GB).

Running on a cluster with 5 nodes (c3.8xlarge), Spark 2.1, and Hadoop 2.7.
If there is anything else that would be helpful to know just let me know
and I can include it.

Best,
Patrick


Memory problems with simple ETL in Pyspark

2017-04-14 Thread Patrick McCarthy
Hello,

I'm trying to build an ETL job which takes in 30-100gb of text data and
prepares it for SparkML. I don't speak Scala so I've been trying to
implement in PySpark on YARN, Spark 2.1.

Despite the transformations being fairly simple, the job always fails by
running out of executor memory.

The input table is long (~6bn rows) but composed of three simple values:

#
all_data_long.printSchema()

root
|-- id: long (nullable = true)
|-- label: short (nullable = true)
|-- segment: string (nullable = true)

#

First I join it to a table of particular segments of interests and do an
aggregation,

#

audiences.printSchema()

root
 |-- entry: integer (nullable = true)
 |-- descr: string (nullable = true)


print("Num in adl: {}".format(str(all_data_long.count(

aud_str = audiences.select(audiences['entry'].cast('string'),
audiences['descr'])

alldata_aud = all_data_long.join(aud_str,
all_data_long['segment']==aud_str['entry'],
'left_outer')

str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')

idx_df   = str_idx.fit(alldata_aud)
label_df =
idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')

id_seg = (label_df
.filter(label_df.descr.isNotNull())
.groupBy('id')
.agg(collect_list('descr')))

id_seg.write.saveAsTable("hive.id_seg")

#

Then, I use that StringIndexer again on the first data frame to featurize
the segment ID

#

alldat_idx =
idx_df.transform(all_data_long).withColumnRenamed('label','label_val')

#


My ultimate goal is to make a SparseVector, so I group the indexed segments
by id and try to cast it into a vector

#

list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
for v in l}),VectorUDT())

alldat_idx.cache()

feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)

print("alldat_dix: {}".format(str(alldat_idx.count(

feature_df = (alldat_idx
.withColumn('label',alldat_idx['label_val'].cast('double'))
.groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
.withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
.drop('collect_list_is')
.drop('num_feat'))

feature_df.cache()
print("Num in featuredf: {}".format(str(feature_df.count(  ## <-
failure occurs here

#

Here, however, I always run out of memory on the executors (I've twiddled
driver and executor memory to check) and YARN kills off my containers. I've
gone as high as —executor-memory 15g but it still doesn't help.

Given the number of segments is at most 50,000 I'm surprised that a
smallish row-wise operation is enough to blow up the process.


Is it really the UDF that's killing me? Do I have to rewrite it in Scala?





Query plans for the failing stage:

#


== Parsed Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
  +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
(collect_list_is#197, num_feat#202) AS features#208]
 +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
+- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
   +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
 +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
+- MetastoreRelation pmccarthy, all_data_long

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
  +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
(collect_list_is#197, num_feat#202) AS features#208]
 +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
+- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
   +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double)

Re: Memory problems with simple ETL in Pyspark

2017-04-16 Thread Patrick McCarthy
The partitions helped!

I added repartition() and my function looks like this now:

feature_df = (alldat_idx
.withColumn('label',alldat_idx['label_val'].cast('double'))
.groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
.repartition(1000)
.withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
.drop('collect_list_is')
.drop('num_feat'))

I got a few failed containers for memory overflow, but the job was able to
finish successfully. I tried upping the repartition as high as 4000 but a
few still failed.

For posterity's sake, where would I look for the footprint you have in
mind? On the executor tab?

Since the audience part of the task finished successfully and the failure
was on a df that didn't touch it, it shouldn't've made a difference.

Thank you!

On Sat, Apr 15, 2017 at 9:07 PM, ayan guha  wrote:

> What i missed is try increasing number of partitions using repartition
>
> On Sun, 16 Apr 2017 at 11:06 am, ayan guha  wrote:
>
>> It does not look like scala vs python thing. How big is your audience
>> data store? Can it be broadcasted?
>>
>> What is the memory footprint you are seeing? At what point yarn is
>> killing? Depeneding on that you may want to tweak around number of
>> partitions of input dataset and increase number of executors
>>
>> Ayan
>>
>>
>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>> implement in PySpark on YARN, Spark 2.1.
>>>
>>> Despite the transformations being fairly simple, the job always fails by
>>> running out of executor memory.
>>>
>>> The input table is long (~6bn rows) but composed of three simple values:
>>>
>>> #
>>> all_data_long.printSchema()
>>>
>>> root
>>> |-- id: long (nullable = true)
>>> |-- label: short (nullable = true)
>>> |-- segment: string (nullable = true)
>>>
>>> #
>>>
>>> First I join it to a table of particular segments of interests and do an
>>> aggregation,
>>>
>>> #
>>>
>>> audiences.printSchema()
>>>
>>> root
>>>  |-- entry: integer (nullable = true)
>>>  |-- descr: string (nullable = true)
>>>
>>>
>>> print("Num in adl: {}".format(str(all_data_long.count(
>>>
>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>> audiences['descr'])
>>>
>>> alldata_aud = all_data_long.join(aud_str,
>>> all_data_long['segment']==aud_str['entry'],
>>> 'left_outer')
>>>
>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>
>>> idx_df   = str_idx.fit(alldata_aud)
>>> label_df = idx_df.transform(alldata_aud).withColumnRenamed('label','
>>> label_val')
>>>
>>> id_seg = (label_df
>>> .filter(label_df.descr.isNotNull())
>>> .groupBy('id')
>>> .agg(collect_list('descr')))
>>>
>>> id_seg.write.saveAsTable("hive.id_seg")
>>>
>>> #
>>>
>>> Then, I use that StringIndexer again on the first data frame to
>>> featurize the segment ID
>>>
>>> #
>>>
>>> alldat_idx = idx_df.transform(all_data_long).withColumnRenamed('
>>> label','label_val')
>>>
>>> #
>>>
>>>
>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>> segments by id and try to cast it into a vector
>>>
>>> #
>>>
>>> list_to_sparse_udf = udf(lambd

Maximum Partitioner size

2017-04-20 Thread Patrick GRANDJEAN
Hi,
I have implemented a custom Partitioner (org.apache.spark.Partitioner) that 
contains a medium-sized object (some megabytes). Unfortunately Spark (2.1.0) 
fails with a StackOverflowError, and I suspect it is because of the size of the 
partitioner that needs to be serialized. My question is, what is the maximum 
size of a Partitioner accepted by Spark?
Thanks!





Structured Streaming + initialState

2017-05-05 Thread Patrick McGloin
Hi all,

With Spark Structured Streaming, is there a possibility to set an "initial
state" for a query?

Using a join between a streaming Dataset and a static Dataset does not
support full joins.

Using mapGroupsWithState to create a GroupState does not support an
initialState (as the Spark Streaming StateSpec did).

Are there any plans to add support for initial states?  Or is there already
a way to do so?

Best regards,
Patrick


Re: Structured Streaming + initialState

2017-05-06 Thread Patrick McGloin
The initial state is stored in a Parquet file which is effectively a static
Dataset.  I seen there is a Jira open for full joins on streaming plus
static Datasets for Structured Streaming (SPARK-20002
<https://issues.apache.org/jira/browse/SPARK-20002>).  So once that Jira is
completed it would be possible.

For mapGroupsWithState it would be great if you could provide an
initialState Dataset with Key -> State initial values.

On 5 May 2017 at 23:49, Tathagata Das  wrote:

> Can you explain how your initial state is stored? is it a file, or its in
> a database?
> If its in a database, then when initialize the GroupState, you can fetch
> it from the database.
>
> On Fri, May 5, 2017 at 7:35 AM, Patrick McGloin  > wrote:
>
>> Hi all,
>>
>> With Spark Structured Streaming, is there a possibility to set an
>> "initial state" for a query?
>>
>> Using a join between a streaming Dataset and a static Dataset does not
>> support full joins.
>>
>> Using mapGroupsWithState to create a GroupState does not support an
>> initialState (as the Spark Streaming StateSpec did).
>>
>> Are there any plans to add support for initial states?  Or is there
>> already a way to do so?
>>
>> Best regards,
>> Patrick
>>
>
>


Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Patrick McGloin
# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
  .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .start()

Described here:

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



On 19 May 2017 at 10:45,  wrote:

> Is there a Kafka sink for Spark Structured Streaming ?
>
> Sent from my iPhone
>


Re: GC overhead exceeded

2017-08-18 Thread Patrick Alwell
+1 what is the executor memory? You may need to adjust executor memory and 
cores. For the sake of simplicity; each executor can handle 5 concurrent tasks 
and should have 5 cores. So if your cluster has 100 cores, you’d have 20 
executors. And if your cluster memory is 500gb, each executor would have  25gb 
of memory.

What’s more, you can use tools like the Spark UI or Ganglia to determine which 
step is failing and why. What is the overall cluster size? How many executors 
do you have? Is it an appropriate count for this cluster’s cores? I’m assuming 
you are using YARN?

-Pat

From: KhajaAsmath Mohammed 
Date: Friday, August 18, 2017 at 5:30 AM
To: Pralabh Kumar 
Cc: "user @spark" 
Subject: Re: GC overhead exceeded

It is just a sql from hive table with transformation if adding 10 more columns 
calculated for currency. Input size for this query is 2 months which has around 
450gb data.

I added persist but it didn't help. Also the executor memory is 8g . Any 
suggestions please ?

Sent from my iPhone

On Aug 17, 2017, at 11:43 PM, Pralabh Kumar 
mailto:pralabhku...@gmail.com>> wrote:
what's is your exector memory , please share the code also

On Fri, Aug 18, 2017 at 10:06 AM, KhajaAsmath Mohammed 
mailto:mdkhajaasm...@gmail.com>> wrote:

HI,

I am getting below error when running spark sql jobs. This error is thrown 
after running 80% of tasks. any solution?

spark.storage.memoryFraction=0.4
spark.sql.shuffle.partitions=2000
spark.default.parallelism=100
#spark.eventLog.enabled=false
#spark.scheduler.revive.interval=1s
spark.driver.memory=8g


java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.ArrayList.subList(ArrayList.java:955)
at java.lang.String.split(String.java:2311)
at 
sun.net.util.IPAddressUtil.textToNumericFormatV4(IPAddressUtil.java:47)
at java.net.InetAddress.getAllByName(InetAddress.java:1129)
at java.net.InetAddress.getAllByName(InetAddress.java:1098)
at java.net.InetAddress.getByName(InetAddress.java:1048)
at 
org.apache.hadoop.net.NetUtils.normalizeHostName(NetUtils.java:562)
at 
org.apache.hadoop.net.NetUtils.normalizeHostNames(NetUtils.java:579)
at 
org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:109)
at 
org.apache.hadoop.yarn.util.RackResolver.coreResolve(RackResolver.java:101)
at 
org.apache.hadoop.yarn.util.RackResolver.resolve(RackResolver.java:81)
at 
org.apache.spark.scheduler.cluster.YarnScheduler.getRackForHost(YarnScheduler.scala:37)
at 
org.apache.spark.scheduler.TaskSetManager.dequeueTask(TaskSetManager.scala:380)
at 
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:433)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:271)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:352)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)




Re: How to authenticate to ADLS from within spark job on the fly

2017-08-19 Thread Patrick Alwell
This might help; I’ve built a REST API with livyServer: 
https://livy.incubator.apache.org/



From: Steve Loughran 
Date: Saturday, August 19, 2017 at 7:05 AM
To: Imtiaz Ahmed 
Cc: "user@spark.apache.org" 
Subject: Re: How to authenticate to ADLS from within spark job on the fly


On 19 Aug 2017, at 02:42, Imtiaz Ahmed 
mailto:emtiazah...@gmail.com>> wrote:

Hi All,
I am building a spark library which developers will use when writing their 
spark jobs to get access to data on Azure Data Lake. But the authentication 
will depend on the dataset they ask for. I need to call a rest API from within 
spark job to get credentials and authenticate to read data from ADLS. Is that 
even possible? I am new to spark.
E.g, from inside a spark job a user will say:

MyCredentials myCredentials = MyLibrary.getCredentialsForPath(userId, 
"/some/path/on/azure/datalake");

then before spark.read.json("adl://examples/src/main/resources/people.json")
I need to authenticate the user to be able to read that path using the 
credentials fetched above.

Any help is appreciated.

Thanks,
Imtiaz

The ADL filesystem supports addDelegationTokens(); allowing the caller to 
collect the delegation tokens of the current authenticated user & then pass it 
along with the request —which is exactly what spark should be doing in spark 
submit.

if you want to do it yourself, look in SparkHadoopUtils (I think; IDE is closed 
right now) & see how the tokens are picked up and then passed around 
(marshalled over the job request, unmarshalled after & picked up, with bits of 
the UserGroupInformation class doing the low level work)

Java code snippet to write to the path tokenFile:

FileSystem fs = FileSystem.get(conf);
Credentials cred = new Credentials();
Token tokens[] = fs.addDelegationTokens(renewer, cred);
cred.writeTokenStorageFile(tokenFile, conf);

you can then read that file in elsewhere, and then (somehow) get the FS to use 
those toakens

otherwise, ADL supports Oauth, so you may be able to use any Oauth libraries 
for this. hadoop-azure-dalalake pulls in okhttp for that,

 
  com.squareup.okhttp
  okhttp
  2.4.0


-Steve



Re: Apache Spark: Parallelization of Multiple Machine Learning ALgorithm

2017-09-05 Thread Patrick McCarthy
You might benefit from watching this JIRA issue -
https://issues.apache.org/jira/browse/SPARK-19071

On Sun, Sep 3, 2017 at 5:50 PM, Timsina, Prem  wrote:

> Is there a way to parallelize multiple ML algorithms in Spark. My use case
> is something like this:
>
> A) Run multiple machine learning algorithm (Naive Bayes, ANN, Random
> Forest, etc.) in parallel.
>
> 1) Validate each algorithm using 10-fold cross-validation
>
> B) Feed the output of step A) in second layer machine learning algorithm.
>
> My question is:
>
> Can we run multiple machine learning algorithm in step A in parallel?
>
> Can we do cross-validation in parallel? Like, run 10 iterations of Naive
> Bayes training in parallel?
>
>
>
> I was not able to find any way to run the different algorithm in parallel.
> And it seems cross-validation also can not be done in parallel.
>
> I appreciate any suggestion to parallelize this use case.
>
>
>
> Prem
>


Re: CSV write to S3 failing silently with partial completion

2017-09-07 Thread Patrick Alwell
Sounds like an S3 bug. Can you replicate locally with HDFS?

Try using S3a protocol too; there is a jar you can leverage like so: 
spark-submit --packages 
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 
my_spark_program.py

EMR can sometimes be buggy. :/

You could also try leveraging EC2 nodes and manually creating a cluster with 
password less SSH.

But I feel your pain man, I’ve had weird issues with Redshift and EMR as well.

Let me know if you can or can’t replicate locally; and I can bring it up with 
our S3 team for the next release of HDP and we can file a bug with AWS.

-Pat

On 9/7/17, 2:59 AM, "JG Perrin"  wrote:

Are you assuming that all partitions are of equal size? Did you try with 
more partitions (like repartitioning)? Does the error always happen with the 
last (or smaller) file? If you are sending to redshift, why not use the JDBC 
driver?

-Original Message-
From: abbim [mailto:ab...@amazon.com] 
Sent: Thursday, September 07, 2017 1:02 AM
To: user@spark.apache.org
Subject: CSV write to S3 failing silently with partial completion

Hi all,
My team has been experiencing a recurring unpredictable bug where only a 
partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.

This becomes problematic in the following ways:
1. When we copy the data to Redshift, we get a bad decrypt error on the 
partial file, suggesting that the failure occurred at a weird byte in the file. 
2. We lose data - sometimes as much as 10%.

We don't see this problem with parquet format, which we also use, but 
moving all of our data to parquet is not currently feasible. We're using the 
Java API with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:
df.write().csv("s3://some-bucket/some_location"). We're experiencing the 
issue 1-3x/week on a daily job and are unable to reliably reproduce the 
problem. 

Any thoughts on why we might be seeing this and how to resolve?
Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





[Spark Dataframe] How can I write a correct filter so the Hive table partitions are pruned correctly

2017-09-13 Thread Patrick Duin
Hi Spark users,

I've got an issue where I wrote a filter on a Hive table using dataframes
and despite setting:
spark.sql.hive.metastorePartitionPruning=true no partitions are being
pruned.

In short:

Doing this: table.filter("partition=x or partition=y") will result in Spark
fetching all partition metadata from the Hive metastore and doing the
filtering after fetching the partitions.

On the other hand if my filter is "simple":
table.filter("partition=x ")
Spark does a call to the metastore that passes along the filter and fetches
just the ones it needs.

Our case is where we have a lot of partitions on a table and the calls that
result in all the partitions take minutes as well as causing us memory
issues. Is this a bug or is there a better way of doing the filter call?

Thanks,
 Patrick

PS:
Sorry for crossposting I wasn't sure if the user list was the correct place
to ask and I understood to go via stackoverflow first so my question is
also here in more detail:
https://stackoverflow.com/questions/46152526/how-should-i-configure-spark-to-correctly-prune-hive-metastore-partitions


PySpark - Expand rows into dataframes via function

2017-10-02 Thread Patrick McCarthy
Hello,

I'm trying to map ARIN registry files into more explicit IP ranges. They
provide a number of IPs in the range (here it's 8192) and a starting IP,
and I'm trying to map it into all the included /24 subnets. For example,

Input:

array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated',

   'ff26920a408f15613096aa7fe0ddaa57'], dtype=object)


Output:

array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'],
   ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'],

...


I have the input lookup table in a pyspark DF, and a python function
to do the conversion into the mapped output. I think to produce the
full mapping I need a UDTF but this concept doesn't seem to exist in
PySpark. What's the best approach to do this mapping and recombine
into a new DataFrame?


Thanks,

Patrick


  1   2   3   4   >