New JIRA - [SQL] Can't remove columns from DataFrame or save DataFrame from a join due to duplicate columns

2015-04-27 Thread Don Drake
https://issues.apache.org/jira/browse/SPARK-7182

Can anyone suggest a workaround for the above issue?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/


Re: Spark sql and csv data processing question

2015-05-16 Thread Don Drake
Your parenthesis don't look right as you're embedding the filter on the
Row.fromSeq().

Try this:

  val trainRDD  = rawTrainData
 .filter(!_.isEmpty)
 .map(rawRow = Row.fromSeq(rawRow.split(,)))
 .filter(_.length == 15)
 .map(_.toString).map(_.trim)


-Don

On Fri, May 15, 2015 at 11:17 PM, Mike Frampton mike_framp...@hotmail.com
wrote:

 Hi

 Im getting the following error when trying to process a csv based data
 file.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent
 failure: Lost task 1.3 in stage 10.0 (TID 262,
 hc2r1m3.semtech-solutions.co.nz):*
 java.lang.ArrayIndexOutOfBoundsException: 0*
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
 at
 org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

 I have made sure that none of my data rows are empty and that they all
 have 15 records. I have also physically checked the
 data. The error occurs when I run the actual spark sql on the last line.
 The script is as follows.

   val server= hdfs://hc2nn.semtech-solutions.co.nz:8020
   val path  = /data/spark/h2o/

   val train_csv =  server + path + adult.train.data // 32,562 rows
   val test_csv  =  server + path + adult.test.data  // 16,283 rows

   // load the data

   val rawTrainData = sparkCxt.textFile(train_csv)
   val rawTestData  = sparkCxt.textFile(test_csv)

   // create a spark sql schema for the row

   val schemaString = age workclass fnlwgt education educationalnum
 maritalstatus +
   occupation relationship race gender capitalgain
 capitalloss +
   hoursperweek nativecountry income

   val schema = StructType( schemaString.split( )
   .map(fieldName = StructField(fieldName, StringType, false)))

   // create an RDD from the raw training data

   val trainRDD  = rawTrainData
  .filter(!_.isEmpty)
  .map(rawRow = Row.fromSeq(rawRow.split(,)
  .filter(_.length == 15)
  .map(_.toString).map(_.trim) ))

   println(  Raw Training Data Count =  + trainRDD.count() )

   val testRDD   = rawTestData
  .filter(!_.isEmpty)
  .map(rawRow  = Row.fromSeq(rawRow.split(,)
  .filter(_.length == 15)
  .map(_.toString).map(_.trim) ))

   println(  Raw Testing Data Count =  + testRDD.count() )

   // create a schema RDD

   val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
   val testSchemaRDD  = sqlContext.applySchema(testRDD,  schema)

   // register schema RDD as a table

   trainSchemaRDD.registerTempTable(trainingTable)
   testSchemaRDD.registerTempTable(testingTable)

   println(  Schema RDD Training Data Count =  +
 trainSchemaRDD.count() )
   println(  Schema RDD Testing Data Count  =  +
 testSchemaRDD.count() )

   // now run sql against the table to filter the data





 *  val schemaRddTrain = sqlContext.sql(SELECT +
 age,workclass,education,maritalstatus,occupation,relationship,race,+
 gender,hoursperweek,nativecountry,income +FROM trainingTable LIMIT
 5000)*

   println(  Training Data Count =  + schemaRddTrain.count() )

 Any advice is appreciated :)




-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
http://www.DrudgeSiren.com/
http://plu.gd/
800-733-2143


GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Don Drake
I'm running Spark v1.3.1 and when I run the following against my dataset:

model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatu
res, maxDepth=6, numIterations=3)

The job will fail with the following message:
Traceback (most recent call last):
  File /Users/drake/fd/spark/mltest.py, line 73, in module
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
line 553, in trainRegressor
loss, numIterations, learningRate, maxDepth)
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
line 438, in _train
loss, numIterations, learningRate, maxDepth)
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
line 120, in callMLlibFunc
return callJavaFunc(sc, api, *args)
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
line 113, in callJavaFunc
return _java2py(sc, func(*args))
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
  File
/Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
py4j.protocol.Py4JJavaError: An error occurred while calling
o69.trainGradientBoostedTreesModel.
: java.lang.IllegalArgumentException: requirement failed: DecisionTree
requires maxBins (= 32) = max categories in categorical features (= 1895)
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
at
org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
at
org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
at
org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
at
org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)

So, it's complaining about the maxBins, if I provide maxBins=1900 and
re-run it:

model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catFeatu
res, maxDepth=6, numIterations=3, maxBins=1900)

Traceback (most recent call last):
  File /Users/drake/fd/spark/mltest.py, line 73, in module
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo=catF
eatures, maxDepth=6, numIterations=3, maxBins=1900)
TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'

It now says it knows nothing of maxBins.

If I run the same command against DecisionTree or RandomForest (with
maxBins=1900) it works just fine.

Seems like a bug in GradientBoostedTrees.

Suggestions?

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
800-733-2143


Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo

2015-05-20 Thread Don Drake
JIRA created: https://issues.apache.org/jira/browse/SPARK-7781

Joseph, I agree, I'm debating removing this feature altogether, but I'm
putting the model through its paces.

Thanks.

-Don

On Wed, May 20, 2015 at 7:52 PM, Joseph Bradley jos...@databricks.com
wrote:

 One more comment: That's a lot of categories for a feature.  If it makes
 sense for your data, it will run faster if you can group the categories or
 split the 1895 categories into a few features which have fewer categories.

 On Wed, May 20, 2015 at 3:17 PM, Burak Yavuz brk...@gmail.com wrote:

 Could you please open a JIRA for it? The maxBins input is missing for the
 Python Api.

 Is it possible if you can use the current master? In the current master,
 you should be able to use trees with the Pipeline Api and DataFrames.

 Best,
 Burak

 On Wed, May 20, 2015 at 2:44 PM, Don Drake dondr...@gmail.com wrote:

 I'm running Spark v1.3.1 and when I run the following against my dataset:

 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatu
 res, maxDepth=6, numIterations=3)

 The job will fail with the following message:
 Traceback (most recent call last):
   File /Users/drake/fd/spark/mltest.py, line 73, in module
 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
 line 553, in trainRegressor
 loss, numIterations, learningRate, maxDepth)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py,
 line 438, in _train
 loss, numIterations, learningRate, maxDepth)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
 line 120, in callMLlibFunc
 return callJavaFunc(sc, api, *args)
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py,
 line 113, in callJavaFunc
 return _java2py(sc, func(*args))
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
   File
 /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95
 py4j.protocol.Py4JJavaError: An error occurred while calling
 o69.trainGradientBoostedTreesModel.
 : java.lang.IllegalArgumentException: requirement failed: DecisionTree
 requires maxBins (= 32) = max categories in categorical features (= 1895)
 at scala.Predef$.require(Predef.scala:233)
 at
 org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128)
 at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138)
 at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63)
 at
 org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96)
 at
 org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595)

 So, it's complaining about the maxBins, if I provide maxBins=1900 and
 re-run it:

 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catFeatu
 res, maxDepth=6, numIterations=3, maxBins=1900)

 Traceback (most recent call last):
   File /Users/drake/fd/spark/mltest.py, line 73, in module
 model = GradientBoostedTrees.trainRegressor(trainingData,
 categoricalFeaturesInfo=catF
 eatures, maxDepth=6, numIterations=3, maxBins=1900)
 TypeError: trainRegressor() got an unexpected keyword argument 'maxBins'

 It now says it knows nothing of maxBins.

 If I run the same command against DecisionTree or RandomForest (with
 maxBins=1900) it works just fine.

 Seems like a bug in GradientBoostedTrees.

 Suggestions?

 -Don

 --
 Donald Drake
 Drake Consulting
 http://www.drakeconsulting.com/
 800-733-2143






-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
http://www.DrudgeSiren.com/
http://plu.gd/
800-733-2143


Problem reading Parquet from 1.2 to 1.3

2015-06-03 Thread Don Drake
As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that
Spark is behaving differently when reading Parquet directories that contain
a .metadata directory.

It seems that in spark 1.2.x, it would just ignore the .metadata directory,
but now that I'm using Spark 1.3, reading these files causes the following
exceptions:

scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir)

SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
during a parallel computation: java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a
Parquet file. expected magic number at tail [80, 65, 82, 49] but found
[116, 34, 10, 125]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.



java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a
Parquet file. expected magic number at tail [80, 65, 82, 49] but found
[116, 34, 10, 125]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.



java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties is
not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
found [117, 101, 116, 10]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.

at
scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)

at
scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)

at
scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)

at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)

at
scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)

at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)

at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)

at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)

at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)

at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at

Re: Problem reading Parquet from 1.2 to 1.3

2015-06-07 Thread Don Drake
Thanks Cheng,  we have a workaround in place for Spark 1.3 (remove
.metadata directory), good to know it will be resolved in 1.4.

-Don

On Sun, Jun 7, 2015 at 8:51 AM, Cheng Lian lian.cs@gmail.com wrote:

  This issue has been fixed recently in Spark 1.4
 https://github.com/apache/spark/pull/6581

 Cheng


 On 6/5/15 12:38 AM, Marcelo Vanzin wrote:

 I talked to Don outside the list and he says that he's seeing this issue
 with Apache Spark 1.3 too (not just CDH Spark), so it seems like there is a
 real issue here.

 On Wed, Jun 3, 2015 at 1:39 PM, Don Drake dondr...@gmail.com wrote:

 As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that
 Spark is behaving differently when reading Parquet directories that contain
 a .metadata directory.

  It seems that in spark 1.2.x, it would just ignore the .metadata
 directory, but now that I'm using Spark 1.3, reading these files causes the
 following exceptions:

  scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir)

 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
 further details.

 scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
 during a parallel computation: java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a
 Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not
 a Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties
 is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
 found [117, 101, 116, 10]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .

 at
 scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)

 at
 scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)

 at
 scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)

 at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)

 at
 scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)

 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190

Re: importerror using external library with pyspark

2015-06-04 Thread Don Drake
I would try setting PYSPARK_DRIVER_PYTHON environment variable to the
location of your python binary, especially if you are using a virtual
environment.

-Don

On Wed, Jun 3, 2015 at 8:24 PM, AlexG swift...@gmail.com wrote:

 I have libskylark installed on both machines in my two node cluster in the
 same locations, and checked that the following code, which calls
 libskylark,
 works on both nodes with 'pyspark rfmtest.py':

 import re
 import numpy
 import skylark.ml.kernels
 import random
 import os

 from pyspark import SparkContext
 sc = SparkContext(appName=test)

 SIGMA = 10
 NUM_RF = 500
 numfeatures = 100
 numpoints = 1000
 kernel = skylark.ml.kernels.Gaussian(numfeatures, SIGMA)
 S = kernel.rft(NUM_RF)

 rows = sc.parallelize(numpy.random.rand(numpoints, numfeatures).tolist(),
 6)
 sketched_rows = rows.map(lambda row : S /
 numpy.ndarray(shape=(1,numfeatures), buffer=numpy.array(row)).copy())

 os.system(rm -rf spark_out)
 sketched_rows.saveAsTextFile('spark_out')

 However, when I try to run the same code on the cluster with 'spark-submit
 --master spark://master:7077 rfmtest.py', I get an ImportError saying that
 skylark.sketch does not exist:

 15/06/04 01:21:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
 on master:40244 (size: 67.5 KB, free: 265.3 MB)
 15/06/04 01:21:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
 on node001:45690 (size: 67.5 KB, free: 265.3 MB)
 15/06/04 01:21:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
 master): org.apache.spark.api.python.PythonException: Traceback (most
 recent
 call last):
   File /opt/Spark/python/pyspark/worker.py, line 88, in main
 command = pickleSer._read_with_length(infile)
   File /opt/Spark/python/pyspark/serializers.py, line 156, in
 _read_with_length
 return self.loads(obj)
   File /opt/Spark/python/pyspark/serializers.py, line 405, in loads
 return cPickle.loads(obj)
 ImportError: No module named skylark.sketch

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at
 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Any ideas what might be going on?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/importerror-using-external-library-with-pyspark-tp23145.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
800-733-2143


Re: Parsing a tsv file with key value pairs

2015-06-25 Thread Don Drake
Use this package:

https://github.com/databricks/spark-csv

and change the delimiter to a tab.

The documentation is pretty straightforward, you'll get a Dataframe back
from the parser.

-Don

On Thu, Jun 25, 2015 at 4:39 AM, Ravikant Dindokar ravikant.i...@gmail.com
wrote:

 So I have a file where each line represents an edge in the graph  has two
 values separated by a tab. Both values are vertex id's (source and sink). I
 want to parse this file as dictionary in spark RDD.
 So my question is get these values in the form of dictionary in RDD?
 sample file :
 12
 15
 23

 expected output : RDD (1,2,1,5,2,3)

 Thanks
 Ravikant

 On Thu, Jun 25, 2015 at 2:59 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Can you be more specific Or can you provide sample file .

 On Thu, Jun 25, 2015 at 11:00 AM, Ravikant Dindokar 
 ravikant.i...@gmail.com wrote:

 Hi Spark user,

 I am new to spark so forgive me for asking a basic question. I'm trying
 to import my tsv file into spark. This file has key and value separated by
 a \t per line. I want to import this file as dictionary of key value pairs
 in Spark.

 I came across this code to do the same for csv file:

 import csv
 import StringIO
 ...
 def loadRecord(line):
 Parse a CSV line
   input = StringIO.StringIO(line)
   reader = csv.DictReader(input, fieldnames=[name,
 favouriteAnimal])
   return reader.next()
 input = sc.textFile(inputFile).map(loadRecord)

 Can you point out the changes required to parse a tsv file?

 After following operation :

 split_lines = lines.map(_.split(\t))

 what should I do to read the key values in dictionary?


 Thanks

 Ravikant





 --
 Thanks  Regards,
 Anshu Shukla





-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
800-733-2143


Re: --packages Failed to load class for data source v1.4

2015-06-14 Thread Don Drake
I looked at this again, and when I use the Scala spark-shell and load a CSV
using the same package it works just fine, so this seems specific to
pyspark.

I've created the following JIRA:
https://issues.apache.org/jira/browse/SPARK-8365

-Don

On Sat, Jun 13, 2015 at 11:46 AM, Don Drake dondr...@gmail.com wrote:

 I downloaded the pre-compiled Spark 1.4.0 and attempted to run an existing
 Python Spark application against it and got the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling o90.save.
 : java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv

 I pass the following on the command-line to my spark-submit:
 --packages com.databricks:spark-csv_2.10:1.0.3

 This worked fine on 1.3.1, but not in 1.4.

 I was able to replicate it with the following pyspark:

 a = {'a':1.0, 'b':'asdf'}
 rdd = sc.parallelize([a])
 df = sqlContext.createDataFrame(rdd)
 df.save(/tmp/d.csv, com.databricks.spark.csv)


 Even using the new
 df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the
 same error.

 I see it was added in the web UI:
 file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By 
 Userfile:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 http://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By 
 Userhttp://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 Thoughts?

 -Don



 Gory details:

 $ pyspark --packages com.databricks:spark-csv_2.10:1.0.3
 Python 2.7.6 (default, Sep  9 2014, 15:04:36)
 [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
 Type help, copyright, credits or license for more information.
 Ivy Default Cache set to: /Users/drake/.ivy2/cache
 The jars for the packages stored in: /Users/drake/.ivy2/jars
 :: loading settings :: url =
 jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 com.databricks#spark-csv_2.10 added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
 confs: [default]
 found com.databricks#spark-csv_2.10;1.0.3 in central
 found org.apache.commons#commons-csv;1.1 in central
 :: resolution report :: resolve 590ms :: artifacts dl 17ms
 :: modules in use:
 com.databricks#spark-csv_2.10;1.0.3 from central in [default]
 org.apache.commons#commons-csv;1.1 from central in [default]
 -
 |  |modules||   artifacts   |
 |   conf   | number| search|dwnlded|evicted|| number|dwnlded|
 -
 |  default |   2   |   0   |   0   |   0   ||   2   |   0   |
 -
 :: retrieving :: org.apache.spark#spark-submit-parent
 confs: [default]
 0 artifacts copied, 2 already retrieved (0kB/15ms)
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0
 2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info from
 SCDynamicStore
 15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local
 resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on
 interface en0)
 15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
 another address
 15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(drake); users
 with modify permissions: Set(drake)
 15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started
 15/06/13 11:06:10 INFO Remoting: Starting remoting
 15/06/13 11:06:10 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@10.0.0.222:56870]
 15/06/13 11:06:10 INFO Utils: Successfully started service 'sparkDriver'
 on port 56870.
 15/06/13 11:06:10 INFO SparkEnv: Registering MapOutputTracker
 15/06/13 11:06:10 INFO SparkEnv: Registering BlockManagerMaster
 15/06/13 11:06:10 INFO DiskBlockManager: Created local directory at
 /private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/blockmgr-a1412b71-fe56-429c-a193-ce3fb95d2ffd
 15/06/13 11:06:10 INFO MemoryStore: MemoryStore started with capacity
 265.4 MB
 15/06/13 11:06:10 INFO HttpFileServer: HTTP File server directory is
 /private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/httpd-84d178da-7e60-4eed-8031-e6a0c465bd4c
 15/06/13 11:06:10 INFO HttpServer: Starting HTTP Server

Re: --packages Failed to load class for data source v1.4

2015-06-17 Thread Don Drake
I don't think this is the same issue as it works just fine in pyspark
v1.3.1.

Are you aware of any workaround? I was hoping to start testing one of my
apps in Spark 1.4 and I use the CSV exports as a safety valve to easily
debug my data flow.

-Don


On Sun, Jun 14, 2015 at 7:18 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi Don,
 This seems related to a known issue, where the classpath on the driver is
 missing the related classes. This is a bug in py4j as py4j uses the System
 Classloader rather than Spark's Context Classloader. However, this problem
 existed in 1.3.0 as well, therefore I'm curious whether it's the same
 issue. Thanks for opening the Jira, I'll take a look.

 Best,
 Burak
 On Jun 14, 2015 2:40 PM, Don Drake dondr...@gmail.com wrote:


 I looked at this again, and when I use the Scala spark-shell and load a
 CSV using the same package it works just fine, so this seems specific to
 pyspark.

 I've created the following JIRA:
 https://issues.apache.org/jira/browse/SPARK-8365

 -Don

 On Sat, Jun 13, 2015 at 11:46 AM, Don Drake dondr...@gmail.com wrote:

 I downloaded the pre-compiled Spark 1.4.0 and attempted to run an
 existing Python Spark application against it and got the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling o90.save.
 : java.lang.RuntimeException: Failed to load class for data source:
 com.databricks.spark.csv

 I pass the following on the command-line to my spark-submit:
 --packages com.databricks:spark-csv_2.10:1.0.3

 This worked fine on 1.3.1, but not in 1.4.

 I was able to replicate it with the following pyspark:

 a = {'a':1.0, 'b':'asdf'}
 rdd = sc.parallelize([a])
 df = sqlContext.createDataFrame(rdd)
 df.save(/tmp/d.csv, com.databricks.spark.csv)


 Even using the new
 df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the
 same error.

 I see it was added in the web UI:
 file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By User
 file:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 http://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
 By User
 http://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jarAdded
 By User
 Thoughts?

 -Don



 Gory details:

 $ pyspark --packages com.databricks:spark-csv_2.10:1.0.3
 Python 2.7.6 (default, Sep  9 2014, 15:04:36)
 [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
 Type help, copyright, credits or license for more information.
 Ivy Default Cache set to: /Users/drake/.ivy2/cache
 The jars for the packages stored in: /Users/drake/.ivy2/jars
 :: loading settings :: url =
 jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 com.databricks#spark-csv_2.10 added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
 confs: [default]
 found com.databricks#spark-csv_2.10;1.0.3 in central
 found org.apache.commons#commons-csv;1.1 in central
 :: resolution report :: resolve 590ms :: artifacts dl 17ms
 :: modules in use:
 com.databricks#spark-csv_2.10;1.0.3 from central in [default]
 org.apache.commons#commons-csv;1.1 from central in [default]
 -
 |  |modules||   artifacts   |
 |   conf   | number| search|dwnlded|evicted|| number|dwnlded|
 -
 |  default |   2   |   0   |   0   |   0   ||   2   |   0   |
 -
 :: retrieving :: org.apache.spark#spark-submit-parent
 confs: [default]
 0 artifacts copied, 2 already retrieved (0kB/15ms)
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0
 2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info
 from SCDynamicStore
 15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local
 resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on
 interface en0)
 15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
 another address
 15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake
 15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(drake); users
 with modify permissions: Set(drake)
 15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started
 15/06/13 11:06:10 INFO Remoting: Starting remoting
 15/06/13 11:06:10 INFO Remoting: Remoting started; listening on
 addresses :[akka.tcp://sparkDriver@10.0.0.222:56870]
 15/06/13 11:06

Re: [Spark] What is the most efficient way to do such a join and column manipulation?

2015-06-13 Thread Don Drake
Take a look at https://github.com/databricks/spark-csv to read in the
tab-delimited file (change the default delimiter)

and once you have that as a DataFrame, SQL can do the rest.

https://spark.apache.org/docs/latest/sql-programming-guide.html

-Don


On Fri, Jun 12, 2015 at 8:46 PM, Rex X dnsr...@gmail.com wrote:

 Hi,

 I want to use spark to select N columns, top M rows of all csv files under
 a folder.

 To be concrete, say we have a folder with thousands of tab-delimited csv
 files with following attributes format (each csv file is about 10GB):

 idnameaddresscity...
 1Mattadd1LA...
 2Willadd2LA...
 3Lucyadd3SF...
 ...

 And we have a lookup table based on name above

 namegender
 MattM
 LucyF
 ...

 Now we are interested to output from top 100K rows of each csv file into
 following format:

 idnamegender
 1MattM
 ...

 Can we use pyspark to efficiently handle this?





-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
http://www.MailLaunder.com/
800-733-2143


--packages Failed to load class for data source v1.4

2015-06-13 Thread Don Drake
I downloaded the pre-compiled Spark 1.4.0 and attempted to run an existing
Python Spark application against it and got the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o90.save.
: java.lang.RuntimeException: Failed to load class for data source:
com.databricks.spark.csv

I pass the following on the command-line to my spark-submit:
--packages com.databricks:spark-csv_2.10:1.0.3

This worked fine on 1.3.1, but not in 1.4.

I was able to replicate it with the following pyspark:

a = {'a':1.0, 'b':'asdf'}
rdd = sc.parallelize([a])
df = sqlContext.createDataFrame(rdd)
df.save(/tmp/d.csv, com.databricks.spark.csv)


Even using the new
df.write.format('com.databricks.spark.csv').save('/tmp/d.csv') gives the
same error.

I see it was added in the web UI:
file:/Users/drake/.ivy2/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
By Userfile:/Users/drake/.ivy2/jars/org.apache.commons_commons-csv-1.1.jarAdded
By Userhttp://10.0.0.222:56871/jars/com.databricks_spark-csv_2.10-1.0.3.jarAdded
By Userhttp://10.0.0.222:56871/jars/org.apache.commons_commons-csv-1.1.jarAdded
By User
Thoughts?

-Don



Gory details:

$ pyspark --packages com.databricks:spark-csv_2.10:1.0.3
Python 2.7.6 (default, Sep  9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type help, copyright, credits or license for more information.
Ivy Default Cache set to: /Users/drake/.ivy2/cache
The jars for the packages stored in: /Users/drake/.ivy2/jars
:: loading settings :: url =
jar:file:/Users/drake/spark/spark-1.4.0-bin-hadoop2.6/lib/spark-assembly-1.4.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.databricks#spark-csv_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.databricks#spark-csv_2.10;1.0.3 in central
found org.apache.commons#commons-csv;1.1 in central
:: resolution report :: resolve 590ms :: artifacts dl 17ms
:: modules in use:
com.databricks#spark-csv_2.10;1.0.3 from central in [default]
org.apache.commons#commons-csv;1.1 from central in [default]
-
|  |modules||   artifacts   |
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
-
|  default |   2   |   0   |   0   |   0   ||   2   |   0   |
-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/15ms)
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/06/13 11:06:08 INFO SparkContext: Running Spark version 1.4.0
2015-06-13 11:06:08.921 java[19233:2145789] Unable to load realm info from
SCDynamicStore
15/06/13 11:06:09 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/06/13 11:06:09 WARN Utils: Your hostname, Dons-MacBook-Pro-2.local
resolves to a loopback address: 127.0.0.1; using 10.0.0.222 instead (on
interface en0)
15/06/13 11:06:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
15/06/13 11:06:09 INFO SecurityManager: Changing view acls to: drake
15/06/13 11:06:09 INFO SecurityManager: Changing modify acls to: drake
15/06/13 11:06:09 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(drake); users
with modify permissions: Set(drake)
15/06/13 11:06:10 INFO Slf4jLogger: Slf4jLogger started
15/06/13 11:06:10 INFO Remoting: Starting remoting
15/06/13 11:06:10 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@10.0.0.222:56870]
15/06/13 11:06:10 INFO Utils: Successfully started service 'sparkDriver' on
port 56870.
15/06/13 11:06:10 INFO SparkEnv: Registering MapOutputTracker
15/06/13 11:06:10 INFO SparkEnv: Registering BlockManagerMaster
15/06/13 11:06:10 INFO DiskBlockManager: Created local directory at
/private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/blockmgr-a1412b71-fe56-429c-a193-ce3fb95d2ffd
15/06/13 11:06:10 INFO MemoryStore: MemoryStore started with capacity 265.4
MB
15/06/13 11:06:10 INFO HttpFileServer: HTTP File server directory is
/private/var/folders/7_/k5h82ws97b95v5f5h8wf9j0hgn/T/spark-f36f39f5-7f82-42e0-b3e0-9eb1e1cc0816/httpd-84d178da-7e60-4eed-8031-e6a0c465bd4c
15/06/13 11:06:10 INFO HttpServer: Starting HTTP Server
15/06/13 11:06:10 INFO Utils: Successfully started service 'HTTP file
server' on port 56871.
15/06/13 11:06:10 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/13 11:06:11 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/06/13 11:06:11 INFO Utils: Successfully started service 'SparkUI' on
port 4041.
15/06/13 11:06:11 INFO SparkUI: Started SparkUI at 

Re: Does feature parity exist between Spark and PySpark

2015-10-06 Thread Don Drake
If you are using Dataframes in PySpark, then the performance will be the
same as Scala.  However, if you need to implement your own UDF, or run a
map() against a DataFrame in Python, then you will pay the penalty for
performance when executing those functions since all of your data has to go
through a gateway to Python and back.

In regards to API features, Scala does get better treatment, but things are
much better in the Python API than it was even 10 months ago.

-Don


On Tue, Oct 6, 2015 at 5:15 PM, dant  wrote:

> Hi
>
> I'm hearing a common theme running that I should only do serious
> programming
> in Scala on Spark (1.5.1). Real power users use Scala. It is said that
> Python is great for analytics but in the end the code should be written to
> Scala to finalise. There are a number of reasons I'm hearing:
>
> 1. Spark is written in Scala so will always be faster than any other
> language implementation on top of it.
> 2. Spark releases always favour more features being visible and enabled for
> Scala API than Python API.
>
> Are there any truth's to the above? I'm a little sceptical.
>
> Apologies for the duplication, my previous message was held up due to
> subscription issue. Reposting now.
>
> Thanks
> Dan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Spark-and-PySpark-tp24963.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Utility for PySpark DataFrames - smartframes

2015-10-05 Thread Don Drake
I would like to announce a Python package that makes creating rows in
DataFrames in PySpark as easy as creating an object.

Code is available on GitHub, PyPi, and soon to be on spark-packages.org.


https://github.com/dondrake/smartframes

Motivation

Spark DataFrames provide a nice interface to datasets that have a schema.
Getting data from your code into a DataFrame in Python means creating a
Row() object with field names and respective values. Given that you already
have a schema with data types per field, it would be nice to easily take an
object that represents the row and create the Row() object automatically.

Smartframes allow you to define a class by just creating the schema that
represents the fields and datatypes. You can then create an object and set
the values like any other Python class. When you are ready to store that in
a DataFrame, just call the createRow() method.

The createRow() method will coerce any values into the correct data types,
for example, if a field is defined as an IntegerType and the value set in
the class is a String, it will attempt to convert the string to an Integer
before creating the Row().

This was written when creating Row()'s with Long datatypes and finding that
Spark did not handle converting integers as longs when passing values to
the JVM. I needed a consistent manner to create Row() for all of my
DataFrames.
Installation

pip install smartframes


Any feedback is appreciated.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
800-733-2143


Re: Jupyter configuration

2015-12-02 Thread Don Drake
Here's what I set in a shell script to start the notebook:

export PYSPARK_PYTHON=~/anaconda/bin/python
export PYSPARK_DRIVER_PYTHON=~/anaconda/bin/ipython
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

If you want to use HiveContext w/CDH:

export HADOOP_CONF_DIR=/etc/hive/conf

Then just run pyspark:
pyspark --master yarn-client --driver-memory 4G --executor-memory 2G
--num-executors 10


-Don


On Wed, Dec 2, 2015 at 6:11 AM, Roberto Pagliari 
wrote:

> Does anyone have a pointer to Jupyter configuration with pyspark? The
> current material on python inotebook is out of date, and jupyter ignores
> ipython profiles.
>
> Thank you,
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: copy/mv hdfs file to another directory by spark program

2016-01-04 Thread Don Drake
You will need to use the HDFS API to do that.

Try something like:

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
fs.rename(new org.apache.hadoop.fs.Path("/path/on/hdfs/file.txt"), new
org.apache.hadoop.fs.Path("/path/on/hdfs/other/file.txt"))

Full API for FileSystem is here:
https://hadoop.apache.org/docs/r2.6.2/api/org/apache/hadoop/fs/FileSystem.html

-Don


On Mon, Jan 4, 2016 at 9:07 PM, Zhiliang Zhu 
wrote:

>
> For some file on hdfs, it is necessary to copy/move it to some another
> specific hdfs  directory, and the directory name would keep unchanged.
> Just need finish it in spark program, but not hdfs commands.
> Is there any codes, it seems not to be done by searching spark doc ...
>
> Thanks in advance!
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


df.partitionBy().parquet() java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-11-28 Thread Don Drake
I have a 2TB dataset that I have in a DataFrame that I am attempting to
partition by 2 fields and my YARN job seems to write the partitioned
dataset successfully.  I can see the output in HDFS once all Spark tasks
are done.

After the spark tasks are done, the job appears to be running for over an
hour, until I get the following (full stack trace below):

java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)

I had set the driver memory to be 20GB.

I attempted to read in the partitioned dataset and got another error saying
the /_metadata directory was not a parquet file.  I removed the _metadata
directory and was able to query the data, but it appeared to not use the
partitioned directory when I attempted to filter the data (it read every
directory).

This is Spark 1.5.2 and I saw the same problem when running the code in
both Scala and Python.

Any suggestions are appreciated.

-Don

15/11/25 00:00:19 ERROR datasources.InsertIntoHadoopFsRelation: Aborting
job.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(ParquetMetadataConverter.java:238)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.addRowGroup(ParquetMetadataConverter.java:167)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetMetadata(ParquetMetadataConverter.java:79)
at
org.apache.parquet.hadoop.ParquetFileWriter.serializeFooter(ParquetFileWriter.java:405)
at
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:433)
at
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:423)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:208)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
at com.dondrake.qra.ScalaApp$.main(ScalaApp.scala:53)
at com.dondrake.qra.ScalaApp.main(ScalaApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
15/11/25 00:00:20 ERROR actor.ActorSystemImpl: exception on LARS? timer
thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)
15/11/25 00:00:20 ERROR akka.ErrorMonitor: exception on LARS? timer thread
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Don Drake
I'm seeing similar slowness in saveAsTextFile(), but only in Python.

I'm sorting data in a dataframe, then transform it and get a RDD, and then
coalesce(1).saveAsTextFile().

I converted the Python to Scala and the run-times were similar, except for
the saveAsTextFile() stage.  The scala version was much faster.

When looking at the executor logs during that stage, I see the following
when running the Scala code:

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
non-empty blocks out of 600 blocks

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184
remote fetches in 64 ms

15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (0  time so far)

15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (1  time so far)

15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (2  times so far)

15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (3  times so far)

15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (4  times so far)

15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (5  times so far)

15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (6  times so far)

15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (7  times so far)

15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (8  times so far)



When running the Python version during the saveAsTextFile() stage, I see:

15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
init = 144, finish = 16041

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 82 ms

15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot =
-415, init = 447, finish = 12148

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 129 ms

15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot =
-372, init = 398, finish = 11424

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 70 ms

15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot =
-378, init = 403, finish = 14455

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 62 ms

15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot =
-366, init = 381, finish = 11853

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 59 ms

15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot =
-392, init = 403, finish = 15364

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 48 ms


The python version is approximately 10 times slower than the Scala
version.  Any ideas why?


-Don

On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Xiao and Sabarish
>
> Using the Stage tab on the UI. It turns out you can see how many
> partitions there are. If I did nothing I would have 228155 partition.
> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
> fails. I though given I have 3 workers and 1/3 of the data would easily
> fit into memory this would be a good choice.
>
> If I use coalesce(30) count works. How ever it still seems slow. It took
> 2.42 min to read 4720 records. My total data set size is 34M.
>
> Any suggestions how to choose the number of partitions.?
>
>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>
>
> The data was originally collected using spark stream. I noticed that the
> number of default partitions == the number of files create on hdfs. I bet
> each file is one spark streaming mini-batchI suspect if I concatenate
> these into a small number of files things will run much faster. I suspect
> I would not need to call coalesce() and that coalesce() is taking a lot of
> time. Any suggestions how to choose the file number of files.
>
> Kind regards
>
> Andy
>
>
> From:  Xiao Li 
> Date:  Monday, November 23, 2015 at 

Re: Spark 1.6 and Application History not working correctly

2016-01-13 Thread Don Drake
I noticed a similar problem going from 1.5.x to 1.6.0 on YARN.

I resolved it be setting the following command-line parameters:

spark.eventLog.enabled=true
spark.eventLog.dir=

-Don

On Wed, Jan 13, 2016 at 8:29 AM, Darin McBeath 
wrote:

> I tried using Spark 1.6 in a stand-alone cluster this morning.
>
> I submitted 2 jobs (and they both executed fine).  In fact, they are the
> exact same jobs with just some different parameters.
>
> I was able to view the application history for the first job.
>
> However, when I tried to view the second job, I get the following error
> message.
>
> Application history not found (app-20160113140054-0001)
> No event logs found for application SparkSync Application in
> file:///root/spark/applicationHistory. Did you specify the correct logging
> directory?
>
>
> Everything works fine with Spark 1.5.  I'm able to view the application
> history for both jobs.
>
> Has anyone else noticed this issue?  Any suggestions?
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: output the datas(txt)

2016-02-28 Thread Don Drake
If you use the spark-csv package:

$ spark-shell  --packages com.databricks:spark-csv_2.11:1.3.0



scala> val df =
sc.parallelize(Array(Array(1,2,3),Array(2,3,4),Array(3,4,6))).map(x =>
(x(0), x(1), x(2))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: int, _2: int, _3: int]

scala> df.write.format("com.databricks.spark.csv").option("delimiter", "
").save("1.txt")

$ cat 1.txt/*
1 2 3
2 3 4
3 4 6


-Don


On Sat, Feb 27, 2016 at 7:20 PM, Bonsen  wrote:

> I get results from RDDs,
> like :
> Array(Array(1,2,3),Array(2,3,4),Array(3,4,6))
> how can I output them to 1.txt
> like :
> 1 2 3
> 2 3 4
> 3 4 6
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/output-the-datas-txt-tp26350.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Building a REST Service with Spark back-end

2016-03-01 Thread Don Drake
I'm interested in building a REST service that utilizes a Spark SQL Context
to return records from a DataFrame (or IndexedRDD?) and even add/update
records.

This will be a simple REST API, with only a few end-points.  I found this
example:

https://github.com/alexmasselot/spark-play-activator

which looks close to what I am interested in doing.

Are there any other ideas or options if I want to run this in a YARN
cluster?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: AVRO vs Parquet

2016-03-03 Thread Don Drake
My tests show Parquet has better performance than Avro in just about every
test.  It really shines when you are querying a subset of columns in a wide
table.

-Don

On Wed, Mar 2, 2016 at 3:49 PM, Timothy Spann 
wrote:

> Which format is the best format for SparkSQL adhoc queries and general
> data storage?
>
> There are lots of specialized cases, but generally accessing some but not
> all the available columns with a reasonable subset of the data.
>
> I am learning towards Parquet as it has great support in Spark.
>
> I also have to consider any file on HDFS may be accessed from other tools
> like Hive, Impala, HAWQ.
>
> Suggestions?
> —
> airis.DATA
> Timothy Spann, Senior Solutions Architect
> C: 609-250-5894
> http://airisdata.com/
> http://meetup.com/nj-datascience
>
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Spark 2.0 Aggregator problems

2016-04-23 Thread Don Drake
I've downloaded a nightly build of Spark 2.0 (from today 4/23) and was
attempting to create an aggregator that will create a Seq[Rows], or
specifically a Seq[Class1], my custom class.

When I attempt to run the following code in a spark-shell, it errors out:

Gist: https://gist.github.com/dondrake/be6b92aff71433e9fb627b478b78b839

Code:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5:
Double)
val teams = sc.parallelize(Seq(
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88),
  C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"),
4322.12),
  C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"),
10283.72)
  )).toDS

//
https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]]  {
  def zero: Seq[C1] = null
  def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
  def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
  def finish(r: Seq[C1]): Seq[C1] = r

  override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
  override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
}

val g_c1 = teams.select(C1Agg.toColumn)


scala> val g_c1 = teams.select(C1Agg.toColumn)
scala.ScalaReflectionException: object $line37.$read not found.
  at
scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
  at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
  at $typecreator1$1.apply(:45)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
  at
org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
  ... 52 elided

If I tweak my teams to be a DataFrame instead of a DataSet, and leave
everything else the same, I get a different error:

scala> val g_c1 = teams.select(C1Agg.toColumn)
org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate
[(C1Agg(unknown),mode=Complete,isDistinct=false) AS
c1agg(staticinvoke(class scala.collection.mutable.WrappedArray$,
ObjectType(interface scala.collection.Seq), make,
mapobjects(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false)), if
(isnull(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false null else newInstance(class C1),
upcast(value, ArrayType(StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(f3,StringType,true),
StructField(f4,DateType,true), StructField(f5,DoubleType,false)),true), -
root class: "scala.collection.Seq")).array, true))#63];
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
  at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2443)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:935)
  ... 52 elided

I'm not sure how to diagnose those errors.  Thoughts?

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Wide Datasets (v1.6.1)

2016-05-20 Thread Don Drake
I have been working to create a Dataframe that contains a nested
structure.  The first attempt is to create an array of structures.   I've
written previously on this list how it doesn't work in Dataframes in 1.6.1,
but it does in 2.0.

I've continued my experimenting and have it working in Datasets in 1.6.1,
using ds.groupBy($"col").groupMaps().  This works great when the number
of columns is less than the maximum for a case class (22 in scala 2.10, 254
in scala 2.11).  However, while using a custom written case class of 200+
fields, I did run into a Catalyst/Janino stack overflow exception (during
runtime, it as attempting to compile my large class) so that doesn't work.
I can provide an example/open a Jira if there is a chance this will be
fixed.

My question is the following: Datasets rely on case classes, if I have a
dataset with more than 254 fields (and I have a lot of them), how am I
supposed to use Datasets with these wide tables?  Am I forced to use
Dataframes?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Wide Datasets (v1.6.1)

2016-05-21 Thread Don Drake
I was able to verify the similar exceptions occur in Spark 2.0.0-preview.
I have create this JIRA: https://issues.apache.org/jira/browse/SPARK-15467

You mentioned using beans instead of case classes, do you have an example
(or test case) that I can see?

-Don

On Fri, May 20, 2016 at 3:49 PM, Michael Armbrust 
wrote:

> I can provide an example/open a Jira if there is a chance this will be
>> fixed.
>>
>
> Please do!  Ping me on it.
>
> Michael
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Don Drake
You can call rdd.coalesce(10, shuffle = true) and the returning rdd will be
evenly balanced.  This obviously triggers a shuffle, so be advised it could
be an expensive operation depending on your RDD size.

-Don

On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
wrote:

> Hello,
>
> I have 50,000 items parallelized into an RDD with 10 partitions, I would
> like to evenly split the items over the partitions so:
> 50,000/10 = 5,000 in each RDD partition.
>
> What I get instead is the following (partition index, partition count):
> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
> 5120), (7, 5120), (8, 5120), (9, 4944)]
>
> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
> are imbalanced.
>
> Is there a way to do that?
>
> Thank you,
> Ayman
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Don Drake
Well, for Python, it should be rdd.coalesce(10, shuffle=True)

I have had good success with this using the Scala API in Spark 1.6.1.

-Don

On Tue, May 10, 2016 at 3:15 PM, Ayman Khalil <aymkhali...@gmail.com> wrote:

> And btw, I'm using the Python API if this makes any difference.
>
> On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil <aymkhali...@gmail.com>
> wrote:
>
>> Hi Don,
>>
>> This didn't help. My original rdd is already created using 10 partitions.
>> As a matter of fact, after trying with rdd.coalesce(10, shuffle =
>> true) out of curiosity, the rdd partitions became even more imbalanced:
>> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
>> 5120), (7, 5120), (8, 5120), (9, *6144*)]
>>
>>
>> On Tue, May 10, 2016 at 10:16 PM, Don Drake <dondr...@gmail.com> wrote:
>>
>>> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
>>> be evenly balanced.  This obviously triggers a shuffle, so be advised it
>>> could be an expensive operation depending on your RDD size.
>>>
>>> -Don
>>>
>>> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil <aymkhali...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have 50,000 items parallelized into an RDD with 10 partitions, I
>>>> would like to evenly split the items over the partitions so:
>>>> 50,000/10 = 5,000 in each RDD partition.
>>>>
>>>> What I get instead is the following (partition index, partition count):
>>>> [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
>>>> 5120), (7, 5120), (8, 5120), (9, 4944)]
>>>>
>>>> the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
>>>> are imbalanced.
>>>>
>>>> Is there a way to do that?
>>>>
>>>> Thank you,
>>>> Ayman
>>>>
>>>
>>>
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake <http://www.MailLaunder.com/>
>>> 800-733-2143
>>>
>>
>>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143


Outer Explode needed

2016-07-24 Thread Don Drake
I have a nested data structure (array of structures) that I'm using the DSL
df.explode() API to flatten the data.  However, when the array is empty,
I'm not getting the rest of the row in my output as it is skipped.

This is the intended behavior, and Hive supports a SQL "OUTER explode()" to
generate the row when the explode would not yield any output.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView

Can we get this same outer explode in the DSL?  I have to jump through some
outer join hoops to get the rows where the array is empty.

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: how to order data in descending order in spark dataset

2016-07-30 Thread Don Drake
Try:

ts.groupBy("b").count().orderBy(col("count").desc());

-Don

On Sat, Jul 30, 2016 at 1:30 PM, Tony Lane  wrote:

> just to clarify I am try to do this in java
>
> ts.groupBy("b").count().orderBy("count");
>
>
>
> On Sun, Jul 31, 2016 at 12:00 AM, Tony Lane 
> wrote:
>
>> ts.groupBy("b").count().orderBy("count");
>>
>> how can I order this data in descending order of count
>> Any suggestions
>>
>> -Tony
>>
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Don Drake
Thanks for the reply.   I did give that syntax a try [A : Encoder]
yesterday, but I kept getting this exception in a spark-shell and Zeppelin
browser.

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala>

scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
java.sql.Timestamp, data_filename: String)
defined class RawTemp

scala>

scala> import spark.implicits._
import spark.implicits._

scala>

scala> abstract class RawTable[A : Encoder](inDir: String) {
 | import spark.implicits._
 | def load() = {
 | import spark.implicits._
 | spark.read
 | .option("header", "true")
 | .option("mode", "FAILFAST")
 | .option("escape", "\"")
 | .option("nullValue", "")
 | .option("indferSchema", "true")
 | .csv(inDir)
 | .as[A]
 | }
 | }
:13: error: not found: type Encoder
   abstract class RawTable[A : Encoder](inDir: String) {
   ^
:24: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   .as[A]


I gave it a try today in a Scala application and it seems to work.  Is this
a known issue in a spark-shell?

In my Scala application, this is being defined in a separate file, etc.
without direct access to a Spark session.

I had to add the following code snippet so the import spark.implicits._
would take effect:

// ugly hack to get around Encoder can't be found compile time errors

private object myImplicits extends SQLImplicits {

  protected override def _sqlContext: SQLContext =
MySparkSingleton.getCurrentSession().sqlContext

}

import myImplicits._

I found that in about the hundredth SO post I searched for this problem.
Is this the best way to let implicits do its thing?

Thanks.

-Don



On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> You need to enforce that an Encoder is available for the type A using a 
> context
> bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.
>
> import org.apache.spark.sql.Encoder
> abstract class RawTable[A : Encoder](inDir: String) {
>   ...
> }
>
> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake <dondr...@gmail.com> wrote:
>
>> I have a set of CSV that I need to perform ETL on, with the plan to
>> re-use a lot of code between each file in a parent abstract class.
>>
>> I tried creating the following simple abstract class that will have a
>> parameterized type of a case class that represents the schema being read in.
>>
>> This won't compile, it just complains about not being able to find an
>> encoder, but I'm importing the implicits and don't believe this error.
>>
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
>> java.sql.Timestamp, data_filename: String)
>> defined class RawTemp
>>
>> scala>
>>
>> scala> abstract class RawTable[A](inDir: String) {
>>  | def load() = {
>>  | spark.read
>>  | .option("header", "true")
>>  | .option("mode", "FAILFAST")
>>  | .option("escape", "\"")
>>  | .option("nullValue", "")
>>  | .option("indferSchema", "true")
>>  | .csv(inDir)
>>  | .as[A]
>>  | }
>>  | }
>> :27: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>.as[A]
>>
>> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>> :13: error: not found: type RawTable
>>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>>   ^
>>
>> What's odd is that this output looks okay:
>>
>> scala> val RTEncoder = Encoders.product[RawTemp]
>> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
>> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, da

Spark 2 - Creating datasets from dataframes with extra columns

2017-02-02 Thread Don Drake
In 1.6, when you created a Dataset from a Dataframe that had extra columns,
the columns not in the case class were dropped from the Dataset.

For example in 1.6, the column c4 is gone:

scala> case class F(f1: String, f2: String, f3:String)

defined class F


scala> import sqlContext.implicits._

import sqlContext.implicits._


scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
"j","z")).toDF("f1", "f2", "f3", "c4")

df: org.apache.spark.sql.DataFrame = [f1: string, f2: string, f3: string,
c4: string]


scala> val ds = df.as[F]

ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string, f3: string]


scala> ds.show

+---+---+---+

| f1| f2| f3|

+---+---+---+

|  a|  b|  c|

|  d|  e|  f|

|  h|  i|  j|


This seems to have changed in Spark 2.0 and also 2.1:

Spark 2.1.0:

scala> case class F(f1: String, f2: String, f3:String)
defined class F

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
"j","z")).toDF("f1", "f2", "f3", "c4")
df: org.apache.spark.sql.DataFrame = [f1: string, f2: string ... 2 more
fields]

scala> val ds = df.as[F]
ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string ... 2 more
fields]

scala> ds.show
+---+---+---+---+
| f1| f2| f3| c4|
+---+---+---+---+
|  a|  b|  c|  x|
|  d|  e|  f|  y|
|  h|  i|  j|  z|
+---+---+---+---+

Is there a way to get a Dataset that conforms to the case class in Spark
2.1.0?  Basically, I'm attempting to use the case class to define an output
schema, and these extra columns are getting in the way.

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Don Drake
I imported that as my first command in my previous email.  I'm using a
spark-shell.

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala>


Any comments regarding importing implicits in an application?

Thanks.

-Don

On Wed, Feb 1, 2017 at 6:10 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> This is the error, you are missing an import:
>
> :13: error: not found: type Encoder
>abstract class RawTable[A : Encoder](inDir: String) {
>
> Works for me in a REPL.
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/204687029790319/2840265927289860/latest.html>
>
> On Wed, Feb 1, 2017 at 3:34 PM, Don Drake <dondr...@gmail.com> wrote:
>
>> Thanks for the reply.   I did give that syntax a try [A : Encoder]
>> yesterday, but I kept getting this exception in a spark-shell and Zeppelin
>> browser.
>>
>> scala> import org.apache.spark.sql.Encoder
>> import org.apache.spark.sql.Encoder
>>
>> scala>
>>
>> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
>> java.sql.Timestamp, data_filename: String)
>> defined class RawTemp
>>
>> scala>
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala>
>>
>> scala> abstract class RawTable[A : Encoder](inDir: String) {
>>  | import spark.implicits._
>>  | def load() = {
>>  | import spark.implicits._
>>  | spark.read
>>  | .option("header", "true")
>>  | .option("mode", "FAILFAST")
>>  | .option("escape", "\"")
>>  | .option("nullValue", "")
>>  | .option("indferSchema", "true")
>>  | .csv(inDir)
>>  | .as[A]
>>  | }
>>  | }
>> :13: error: not found: type Encoder
>>abstract class RawTable[A : Encoder](inDir: String) {
>>^
>> :24: error: Unable to find encoder for type stored in a
>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>> classes) are supported by importing spark.implicits._  Support for
>> serializing other types will be added in future releases.
>>.as[A]
>>
>>
>> I gave it a try today in a Scala application and it seems to work.  Is
>> this a known issue in a spark-shell?
>>
>> In my Scala application, this is being defined in a separate file, etc.
>> without direct access to a Spark session.
>>
>> I had to add the following code snippet so the import spark.implicits._
>> would take effect:
>>
>> // ugly hack to get around Encoder can't be found compile time errors
>>
>> private object myImplicits extends SQLImplicits {
>>
>>   protected override def _sqlContext: SQLContext =
>> MySparkSingleton.getCurrentSession().sqlContext
>>
>> }
>>
>> import myImplicits._
>>
>> I found that in about the hundredth SO post I searched for this problem.
>> Is this the best way to let implicits do its thing?
>>
>> Thanks.
>>
>> -Don
>>
>>
>>
>> On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> You need to enforce that an Encoder is available for the type A using a 
>>> context
>>> bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.
>>>
>>> import org.apache.spark.sql.Encoder
>>> abstract class RawTable[A : Encoder](inDir: String) {
>>>   ...
>>> }
>>>
>>> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake <dondr...@gmail.com> wrote:
>>>
>>>> I have a set of CSV that I need to perform ETL on, with the plan to
>>>> re-use a lot of code between each file in a parent abstract class.
>>>>
>>>> I tried creating the following simple abstract class that will have a
>>>> parameterized type of a case class that represents the schema being read 
>>>> in.
>>>>
>>>> This won't compile, it just complains about not being able to find an
>>>> encoder, but I'm importing the implicits and don't believe this error.
>>>>
>>>>
>>>> scala> import spark.implicits._
>>>> import spark.implicits._
>>>>
>>&

Parameterized types and Datasets - Spark 2.1.0

2017-01-31 Thread Don Drake
I have a set of CSV that I need to perform ETL on, with the plan to re-use
a lot of code between each file in a parent abstract class.

I tried creating the following simple abstract class that will have a
parameterized type of a case class that represents the schema being read in.

This won't compile, it just complains about not being able to find an
encoder, but I'm importing the implicits and don't believe this error.


scala> import spark.implicits._
import spark.implicits._

scala>

scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
java.sql.Timestamp, data_filename: String)
defined class RawTemp

scala>

scala> abstract class RawTable[A](inDir: String) {
 | def load() = {
 | spark.read
 | .option("header", "true")
 | .option("mode", "FAILFAST")
 | .option("escape", "\"")
 | .option("nullValue", "")
 | .option("indferSchema", "true")
 | .csv(inDir)
 | .as[A]
 | }
 | }
:27: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   .as[A]

scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
:13: error: not found: type RawTable
   class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
  ^

What's odd is that this output looks okay:

scala> val RTEncoder = Encoders.product[RawTemp]
RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
string]

scala> RTEncoder.schema
res4: org.apache.spark.sql.types.StructType =
StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(temp,LongType,false),
StructField(created_at,TimestampType,true),
StructField(data_filename,StringType,true))

scala> RTEncoder.clsTag
res5: scala.reflect.ClassTag[RawTemp] = RawTemp

Any ideas?

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Converting timezones in Spark

2017-01-31 Thread Don Drake
So, to follow up on this.

A few lessons learned, when you print a timestamp, it will only show the
date/time in your current timezone, regardless of any conversions you
applied to it.

The trick is to convert it (cast) to a Long, and then the Java8 java.time.*
functions can translate to any timezone and generate a string representing
the timestamp.

Here's a working example:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf

def convertToTZ(col: Long, zone: String, formatter:
DateTimeFormatter):String = {

  val i = Instant.ofEpochSecond(col)
  val z = ZonedDateTime.ofInstant(i, ZoneId.of(zone))

  z.format(formatter)

}

def convertToTZFullTimestamp = udf((col: Long, zone:String) =>
convertToTZ(col, zone, DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss
z")) )

val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-30 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp")).withColumn("EST_tz",
convertToTZFullTimestamp($"created_at".cast("long"),
lit("America/New_York")))

df2.show(4, false)


// Exiting paste mode, now interpreting.

+---+---+-+---+
|id |dts|created_at   |EST_tz |
+---+---+-+---+
|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|2016-09-14 12:46:32 EDT|
|2  |not a timestamp|null |null   |
|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|2016-09-14 12:59:57 EDT|
|4  |2016-11-30 12:00:01 UTC|2016-11-30 06:00:01.0|2016-11-30 07:00:01 EST|
+---+---+-+---+

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf
convertToTZ: (col: Long, zone: String, formatter:
java.time.format.DateTimeFormatter)String
convertToTZFullTimestamp:
org.apache.spark.sql.expressions.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 2 more
fields]

scala>



On Fri, Jan 27, 2017 at 12:01 PM, Don Drake <dondr...@gmail.com> wrote:

> I'm reading CSV with a timestamp clearly identified in the UTC timezone,
> and I need to store this in a parquet format and eventually read it back
> and convert to different timezones as needed.
>
> Sounds straightforward, but this involves some crazy function calls and
> I'm seeing strange results as I build a test case.
>
> See my example below.  Why are the values for est_ts and cst_ts the same
> in rows 1 and 3 (wrong), but different and correct in row 4?  I have a
> feeling it has to do with daylight savings time, but I'm not sure where to
> resolve it.
>
> Please note that I'm in the Central timezone.
>
> Is there a better method to do this?
>
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1485539128193).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.1.0
>
>   /_/
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_60)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> :paste
>
> // Entering paste mode (ctrl-D to finish)
>
>
> import org.apache.spark.sql.Column
>
> def stringts_to_tz(col:Column, tz:String) = {
>
> from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
> "-MM-dd HH:mm:ss Z")), "CST"), tz)
>
> }
>
>
> val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"),
> (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01
> UTC")).toDF("id", "dts")
>
> val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
> HH:mm:ss Z").cast("timestamp"))
>
> .withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd H

Re: Spark 2 - Creating datasets from dataframes with extra columns

2017-02-08 Thread Don Drake
Please see: https://issues.apache.org/jira/browse/SPARK-19477

Thanks.

-Don

On Wed, Feb 8, 2017 at 6:51 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> i checked it, it seems is a bug. do you create a jira  now plesae?
>
> ---Original---
> *From:* "Don Drake"<dondr...@gmail.com>
> *Date:* 2017/2/7 01:26:59
> *To:* "user"<user@spark.apache.org>;
> *Subject:* Re: Spark 2 - Creating datasets from dataframes with extra
> columns
>
> This seems like a bug to me, the schemas should match.
>
> scala> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.Encoders
>
> scala> val fEncoder = Encoders.product[F]
> fEncoder: org.apache.spark.sql.Encoder[F] = class[f1[0]: string, f2[0]:
> string, f3[0]: string]
>
> scala> fEncoder.schema == ds.schema
> res2: Boolean = false
>
> scala> ds.schema
> res3: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(f3,StringType,true),
> StructField(c4,StringType,true))
>
> scala> fEncoder.schema
> res4: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(f3,StringType,true))
>
> I'll open a JIRA.
>
> -Don
>
> On Thu, Feb 2, 2017 at 2:46 PM, Don Drake <dondr...@gmail.com> wrote:
>
>> In 1.6, when you created a Dataset from a Dataframe that had extra
>> columns, the columns not in the case class were dropped from the Dataset.
>>
>> For example in 1.6, the column c4 is gone:
>>
>> scala> case class F(f1: String, f2: String, f3:String)
>>
>> defined class F
>>
>>
>> scala> import sqlContext.implicits._
>>
>> import sqlContext.implicits._
>>
>>
>> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
>> "j","z")).toDF("f1", "f2", "f3", "c4")
>>
>> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string, f3: string,
>> c4: string]
>>
>>
>> scala> val ds = df.as[F]
>>
>> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string, f3:
>> string]
>>
>>
>> scala> ds.show
>>
>> +---+---+---+
>>
>> | f1| f2| f3|
>>
>> +---+---+---+
>>
>> |  a|  b|  c|
>>
>> |  d|  e|  f|
>>
>> |  h|  i|  j|
>>
>>
>> This seems to have changed in Spark 2.0 and also 2.1:
>>
>> Spark 2.1.0:
>>
>> scala> case class F(f1: String, f2: String, f3:String)
>> defined class F
>>
>> scala> import spark.implicits._
>> import spark.implicits._
>>
>> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
>> "j","z")).toDF("f1", "f2", "f3", "c4")
>> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string ... 2 more
>> fields]
>>
>> scala> val ds = df.as[F]
>> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string ... 2 more
>> fields]
>>
>> scala> ds.show
>> +---+---+---+---+
>> | f1| f2| f3| c4|
>> +---+---+---+---+
>> |  a|  b|  c|  x|
>> |  d|  e|  f|  y|
>> |  h|  i|  j|  z|
>> +---+---+---+---+
>>
>> Is there a way to get a Dataset that conforms to the case class in Spark
>> 2.1.0?  Basically, I'm attempting to use the case class to define an output
>> schema, and these extra columns are getting in the way.
>>
>> Thanks.
>>
>> -Don
>>
>> --
>> Donald Drake
>> Drake Consulting
>> http://www.drakeconsulting.com/
>> https://twitter.com/dondrake <http://www.MailLaunder.com/>
>> 800-733-2143
>>
>
>
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake <http://www.MailLaunder.com/>
> 800-733-2143 <(800)%20733-2143>
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143


Re: Spark 2 - Creating datasets from dataframes with extra columns

2017-02-06 Thread Don Drake
This seems like a bug to me, the schemas should match.

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> val fEncoder = Encoders.product[F]
fEncoder: org.apache.spark.sql.Encoder[F] = class[f1[0]: string, f2[0]:
string, f3[0]: string]

scala> fEncoder.schema == ds.schema
res2: Boolean = false

scala> ds.schema
res3: org.apache.spark.sql.types.StructType =
StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(f3,StringType,true),
StructField(c4,StringType,true))

scala> fEncoder.schema
res4: org.apache.spark.sql.types.StructType =
StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(f3,StringType,true))

I'll open a JIRA.

-Don

On Thu, Feb 2, 2017 at 2:46 PM, Don Drake <dondr...@gmail.com> wrote:

> In 1.6, when you created a Dataset from a Dataframe that had extra
> columns, the columns not in the case class were dropped from the Dataset.
>
> For example in 1.6, the column c4 is gone:
>
> scala> case class F(f1: String, f2: String, f3:String)
>
> defined class F
>
>
> scala> import sqlContext.implicits._
>
> import sqlContext.implicits._
>
>
> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
> "j","z")).toDF("f1", "f2", "f3", "c4")
>
> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string, f3: string,
> c4: string]
>
>
> scala> val ds = df.as[F]
>
> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string, f3: string]
>
>
> scala> ds.show
>
> +---+---+---+
>
> | f1| f2| f3|
>
> +---+---+---+
>
> |  a|  b|  c|
>
> |  d|  e|  f|
>
> |  h|  i|  j|
>
>
> This seems to have changed in Spark 2.0 and also 2.1:
>
> Spark 2.1.0:
>
> scala> case class F(f1: String, f2: String, f3:String)
> defined class F
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
> "j","z")).toDF("f1", "f2", "f3", "c4")
> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string ... 2 more
> fields]
>
> scala> val ds = df.as[F]
> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string ... 2 more
> fields]
>
> scala> ds.show
> +---+---+---+---+
> | f1| f2| f3| c4|
> +---+---+---+---+
> |  a|  b|  c|  x|
> |  d|  e|  f|  y|
> |  h|  i|  j|  z|
> +---+---+---+---+
>
> Is there a way to get a Dataset that conforms to the case class in Spark
> 2.1.0?  Basically, I'm attempting to use the case class to define an output
> schema, and these extra columns are getting in the way.
>
> Thanks.
>
> -Don
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake <http://www.MailLaunder.com/>
> 800-733-2143 <(800)%20733-2143>
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143


Converting timezones in Spark

2017-01-27 Thread Don Drake
I'm reading CSV with a timestamp clearly identified in the UTC timezone,
and I need to store this in a parquet format and eventually read it back
and convert to different timezones as needed.

Sounds straightforward, but this involves some crazy function calls and I'm
seeing strange results as I build a test case.

See my example below.  Why are the values for est_ts and cst_ts the same in
rows 1 and 3 (wrong), but different and correct in row 4?  I have a feeling
it has to do with daylight savings time, but I'm not sure where to resolve
it.

Please note that I'm in the Central timezone.

Is there a better method to do this?


Spark context available as 'sc' (master = local[*], app id =
local-1485539128193).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0

  /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_60)

Type in expressions to have them evaluated.

Type :help for more information.


scala> :paste

// Entering paste mode (ctrl-D to finish)


import org.apache.spark.sql.Column

def stringts_to_tz(col:Column, tz:String) = {

from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
"-MM-dd HH:mm:ss Z")), "CST"), tz)

}


val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp"))

.withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd HH:mm:ss Z"))

.withColumn("local_hour", hour($"created_at"))

.withColumn("s2", from_unixtime($"unix_ts"))

.withColumn("s3", to_utc_timestamp($"s2", "CST"))

.withColumn("s4", from_utc_timestamp($"s3", "EST"))

.withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))

.withColumn("est_ts", stringts_to_tz($"dts", "CST"))

.withColumn("cst_ts", stringts_to_tz($"dts", "EST"))

df2.show(4,false)

df2.printSchema



// Exiting paste mode, now interpreting.


+---+---+-+--+--+---+-+-+-+-+-+

|id |dts|created_at   |unix_ts   |local_hour|s2
|s3   |s4   |utc_ts
  |est_ts   |cst_ts   |

+---+---+-+--+--+---+-+-+-+-+-+

|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11
 |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14
11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14
11:46:32.0|

|2  |not a timestamp|null |null  |null
 |null   |null |null |null
|null |null |

|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11
 |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14
11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14
11:59:57.0|

|4  |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6
|2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01
12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0|

+---+---+-+--+--+---+-+-+-+-+-+


root

 |-- id: long (nullable = false)

 |-- dts: string (nullable = true)

 |-- created_at: timestamp (nullable = true)

 |-- unix_ts: long (nullable = true)

 |-- local_hour: integer (nullable = true)

 |-- s2: string (nullable = true)

 |-- s3: timestamp (nullable = true)

 |-- s4: timestamp (nullable = true)

 |-- utc_ts: timestamp (nullable = true)

 |-- est_ts: timestamp (nullable = true)

 |-- cst_ts: timestamp (nullable = true)


import org.apache.spark.sql.Column

stringts_to_tz: (col: org.apache.spark.sql.Column, tz:
String)org.apache.spark.sql.Column

df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]

df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more
fields]


scala>

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Spark 2.0 - Parquet data with fields containing periods "."

2016-08-31 Thread Don Drake
Yes, I just tested it against the nightly build from 8/31.  Looking at the
PR, I'm happy the test added verifies my issue.

Thanks.

-Don

On Wed, Aug 31, 2016 at 6:49 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Hi Don, I guess this should be fixed from 2.0.1.
>
> Please refer this PR. https://github.com/apache/spark/pull/14339
>
> On 1 Sep 2016 2:48 a.m., "Don Drake" <dondr...@gmail.com> wrote:
>
>> I am in the process of migrating a set of Spark 1.6.2 ETL jobs to Spark
>> 2.0 and have encountered some interesting issues.
>>
>> First, it seems the SQL parsing is different, and I had to rewrite some
>> SQL that was doing a mix of inner joins (using where syntax, not inner) and
>> outer joins to get the SQL to work.  It was complaining about columns not
>> existing.  I can't reproduce that one easily and can't share the SQL.  Just
>> curious if anyone else is seeing this?
>>
>> I do have a showstopper problem with Parquet dataset that have fields
>> containing a "." in the field name.  This data comes from an external
>> provider (CSV) and we just pass through the field names.  This has worked
>> flawlessly in Spark 1.5 and 1.6, but now spark can't seem to read these
>> parquet files.
>>
>> I've reproduced a trivial example below. Jira created:
>> https://issues.apache.org/jira/browse/SPARK-17341
>>
>>
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1472664486578).
>> Spark session available as 'spark'.
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0
>>   /_/
>>
>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.7.0_51)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>>
>> scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i
>> * i)).toDF("value", "squared.value")
>> 16/08/31 12:28:44 WARN ObjectStore: Version information not found in
>> metastore. hive.metastore.schema.verification is not enabled so
>> recording the schema version 1.2.0
>> 16/08/31 12:28:44 WARN ObjectStore: Failed to get database default,
>> returning NoSuchObjectException
>> squaresDF: org.apache.spark.sql.DataFrame = [value: int, squared.value:
>> int]
>>
>> scala> squaresDF.take(2)
>> res0: Array[org.apache.spark.sql.Row] = Array([1,1], [2,4])
>>
>> scala> squaresDF.write.parquet("squares")
>> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>> SLF4J: Defaulting to no-operation (NOP) logger implementation
>> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further details.
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
>> Compression: SNAPPY
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet block size to 134217728
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet block size to 134217728
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet page size to 1048576
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet block size to 134217728
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet block size to 134217728
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet block size to 134217728
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet page size to 1048576
>> Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.ParquetOutputFormat:
>> Parquet page size to 1048576

Spark 2.0.0 - SQL - Running query with outer join from 1.6 fails

2016-09-01 Thread Don Drake
So I was able to reproduce in a simple case the issue I'm seeing with a
query from Spark 1.6.2 that would run fine that is no longer working on
Spark 2.0.

Example code:
https://gist.github.com/dondrake/c136d61503b819f0643f8c02854a9cdf

Here's the code for Spark 2.0 that doesn't run (this runs fine in Spark
1.6.2):

case class C1(f1: String, f2: String, f3: String, f4: String)
case class C2(g1: String, g2: String, g3: String, g4: String)
case class C3(h1: String, h2: String, h3: String, h4: String)

val sqlContext = spark.sqlContext

val c1 = sc.parallelize(Seq(
  C1("h1", "c1a1", "c1b1", "c1c1"),
  C1("h2", "c1a2", "c1b2", "c1c2"),
  C1(null, "c1a3", "c1b3", "c1c3")
  )).toDF
c1.createOrReplaceTempView("c1")

val c2 = sc.parallelize(Seq(
  C2("h1", "c2a1", "c2b1", "c2c1"),
  C2("h2", "c2a2", "c2b2", "c2c2"),
  C2(null, "c2a3", "c2b3", "c2c3"),
  C2(null, "c2a4", "c2b4", "c2c4"),
  C2("h333", "c2a333", "c2b333", "c2c333")
  )).toDF
c2.createOrReplaceTempView("c2")

val c3 = sc.parallelize(Seq(
  C3("h1", "c3a1", "c3b1", "c3c1"),
  C3("h2", "c3a2", "c3b2", "c3c2"),
  C3(null, "c3a3", "c3b3", "c3c3")
  )).toDF
c3.createOrReplaceTempView("c3")

// doesn't work in Spark 2.0, works in Spark 1.6
val bad_df = sqlContext.sql("""
  select *
  from c1, c3
  left outer join c2 on (c1.f1 = c2.g1)
  where c1.f1 = c3.h1
""").show()

// works in both
val works_df = sqlContext.sql("""
  select *
  from c1
  left outer join c2 on (c1.f1 = c2.g1),
  c3
  where c1.f1 = c3.h1
""").show()


Here's the output after running bad_df in Spark 2.0:

scala> val bad_df = sqlContext.sql("""
 |   select *
 |   from c1, c3
 |   left outer join c2 on (c1.f1 = c2.g1)
 |   where c1.f1 = c3.h1
 | """).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`c1.f1`' given
input columns: [h3, g3, h4, g2, g4, h2, h1, g1]; line 4 pos 25
  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
  at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:190)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:201)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at

Spark 2.0 - Parquet data with fields containing periods "."

2016-08-31 Thread Don Drake
I am in the process of migrating a set of Spark 1.6.2 ETL jobs to Spark 2.0
and have encountered some interesting issues.

First, it seems the SQL parsing is different, and I had to rewrite some SQL
that was doing a mix of inner joins (using where syntax, not inner) and
outer joins to get the SQL to work.  It was complaining about columns not
existing.  I can't reproduce that one easily and can't share the SQL.  Just
curious if anyone else is seeing this?

I do have a showstopper problem with Parquet dataset that have fields
containing a "." in the field name.  This data comes from an external
provider (CSV) and we just pass through the field names.  This has worked
flawlessly in Spark 1.5 and 1.6, but now spark can't seem to read these
parquet files.

I've reproduced a trivial example below. Jira created:
https://issues.apache.org/jira/browse/SPARK-17341


Spark context available as 'sc' (master = local[*], app id =
local-1472664486578).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i *
i)).toDF("value", "squared.value")
16/08/31 12:28:44 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
16/08/31 12:28:44 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException
squaresDF: org.apache.spark.sql.DataFrame = [value: int, squared.value: int]

scala> squaresDF.take(2)
res0: Array[org.apache.spark.sql.Row] = Array([1,1], [2,4])

scala> squaresDF.write.parquet("squares")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size to
134217728
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet dictionary page size
to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet dictionary page size
to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Dictionary is on
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet dictionary page size
to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet page size to 1048576
Aug 31, 2016 12:29:08 PM INFO:

Re: Sqoop vs spark jdbc

2016-09-21 Thread Don Drake
We just had this conversation at work today.  We have a long sqoop pipeline
and I argued to keep it in sqoop since we can take advantage of OraOop
(direct mode) for performance and spark can't match that AFAIK.  Sqoop also
allows us to write directly into parquet format, which then Spark can read
optimally.

In regards to the MB_MILLIS_MAP exception, I ran into this same exception a
while ago running Titan DB map/reduce jobs.  The cause was a mismatch in
.jars in the distribution.  Your CLASSPATH probably contains some old .jar
files.

-Don

On Wed, Sep 21, 2016 at 6:17 PM, Mich Talebzadeh 
wrote:

> I do not know why this happening.
>
> Trying to load an Hbase table at command line
>
> hbase org.apache.hadoop.hbase.mapreduce.ImportTsv
> -Dimporttsv.separator=',' -Dimporttsv.columns="HBASE_ROW_KEY,c1,c2" t2
> hdfs://rhes564:9000/tmp/crap.txt
>
> Comes back with this error
>
>
> 2016-09-22 00:12:46,576 INFO  [main] mapreduce.JobSubmitter: Submitting
> tokens for job: job_1474455325627_0052
> 2016-09-22 00:12:46,755 INFO  [main] impl.YarnClientImpl: Submitted
> application application_1474455325627_0052 to ResourceManager at rhes564/
> 50.140.197.217:8032
> 2016-09-22 00:12:46,783 INFO  [main] mapreduce.Job: The url to track the
> job: http://http://rhes564:8088/proxy/application_1474455325627_0052/
> 2016-09-22 00:12:46,783 INFO  [main] mapreduce.Job: Running job:
> job_1474455325627_0052
> 2016-09-22 00:12:55,913 INFO  [main] mapreduce.Job: Job
> job_1474455325627_0052 running in uber mode : false
> 2016-09-22 00:12:55,915 INFO  [main] mapreduce.Job:  map 0% reduce 0%
> 2016-09-22 00:13:01,994 INFO  [main] mapreduce.Job:  map 100% reduce 0%
> 2016-09-22 00:13:03,008 INFO  [main] mapreduce.Job: Job
> job_1474455325627_0052 completed successfully
> Exception in thread "main" java.lang.IllegalArgumentException: No enum
> constant org.apache.hadoop.mapreduce.JobCounter.MB_MILLIS_MAPS
> at java.lang.Enum.valueOf(Enum.java:238)
> at org.apache.hadoop.mapreduce.counters.
> FrameworkCounterGroup.valueOf(FrameworkCounterGroup.java:148)
> at org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.
> findCounter(FrameworkCounterGroup.java:182)
> at org.apache.hadoop.mapreduce.counters.AbstractCounters.
> findCounter(AbstractCounters.java:154)
> at org.apache.hadoop.mapreduce.TypeConverter.fromYarn(
> TypeConverter.java:240)
> at org.apache.hadoop.mapred.ClientServiceDelegate.getJobCounters(
> ClientServiceDelegate.java:370)
> at org.apache.hadoop.mapred.YARNRunner.getJobCounters(
> YARNRunner.java:511)
> at org.apache.hadoop.mapreduce.Job$7.run(Job.java:756)
> at org.apache.hadoop.mapreduce.Job$7.run(Job.java:753)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1491)
> at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:753)
> at org.apache.hadoop.mapreduce.Job.monitorAndPrintJob(Job.
> java:1361)
> at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.
> java:1289)
> at org.apache.hadoop.hbase.mapreduce.ImportTsv.run(
> ImportTsv.java:680)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> at org.apache.hadoop.hbase.mapreduce.ImportTsv.main(
> ImportTsv.java:684)
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 21 September 2016 at 21:47, Jörn Franke  wrote:
>
>> I think there might be still something messed up with the classpath. It
>> complains in the logs about deprecated jars and deprecated configuration
>> files.
>>
>> On 21 Sep 2016, at 22:21, Mich Talebzadeh 
>> wrote:
>>
>> Well I am left to use Spark for importing data from RDBMS table to Hadoop.
>>
>> You may argue why and it is because Spark does it in one process and no
>> errors
>>
>> With sqoop I am getting this error message which leaves the RDBMS table
>> data on HDFS file but stops there.
>>
>> 2016-09-21 21:00:15,084 [myid:] - INFO  [main:OraOopLog@103] - Data
>> Connector for Oracle and Hadoop is disabled.
>> 2016-09-21 21:00:15,095 [myid:] - INFO  [main:SqlManager@98] - Using
>> default fetchSize of 1000
>> 2016-09-21 

Re: sbt shenanigans for a Spark-based project

2016-11-13 Thread Don Drake
I would upgrade your Scala version to 2.11.8 as Spark 2.0 uses Scala 2.11
by default.

On Sun, Nov 13, 2016 at 3:01 PM, Marco Mistroni  wrote:

> HI all
>  i have a small Spark-based project which at the moment depends on jar
> from Spark 1.6.0
> The project has few Spark examples plus one which depends on Flume
> libraries
>
>
> I am attempting to move to Spark 2.0, but i am having issues with
> my dependencies
> The stetup below works fine when compiled against 1.6.0 dependencies
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.10.5"
> val sparkVersion = "1.6.0"
>
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume" %
> "1.3.0"
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
>
>
> resolvers += "softprops-maven" at "http://dl.bintray.com/
> content/softprops/maven"
>
>
>
> This is the build.sbt version for using Spark 2 dependencies
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.10.6"
> val sparkVersion = "2.0.1"
>
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
> "2.0.1"
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
> resolvers += "softprops-maven" at "http://dl.bintray.com/
> content/softprops/maven"
>
> but the sbt compile fails miserably...below few of the errors (it actually
> compiles like i forgot all the depencencies as it is complaining on all
> org.apache.spark.ml and mllib packages
>
> [warn] Multiple dependencies with the same organization/name but different
> versions. To avoid conflict, pick one version:
> [warn]  * org.apache.spark:spark-core_2.10:(1.6.1, 2.0.1)
> [warn]  * org.apache.spark:spark-streaming_2.10:(1.6.1, 2.0.1)
> [warn]  * org.apache.spark:spark-sql_2.10:(1.6.1, 2.0.1)
> [warn]  * org.apache.spark:spark-mllib_2.10:(1.6.1, 2.0.1)
> [info] Resolving org.scala-lang#scala-library;2.10.6 ...
> .
> [warn] * org.apache.spark:spark-mllib_2.10:1.6.1 -> 2.0.1
> [warn] * org.apache.spark:spark-sql_2.10:1.6.1 -> 2.0.1
> [warn] * org.apache.spark:spark-streaming_2.10:1.6.1 -> 2.0.1
> [warn] * org.apache.spark:spark-core_2.10:1.6.1 -> 2.0.1
> [warn] Run 'evicted' to see detailed eviction warnings
> [info] Compiling 18 Scala sources to C:\Users\marco\SparkExamples\
> target\scala-2.10\classes...
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:2: object mllib is not a member of
> package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.{ Vector, Vectors }
> [error] ^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:3: object mllib is not a member of
> package org.apache.spark
> [error] import org.apache.spark.mllib.regression.LabeledPoint
> [error] ^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:4: object classification is not a member
> of package org.apache.spark.ml
> [error] import org.apache.spark.ml.classification._
> [error]^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:5: object mllib is not a member of
> package org.apache.spark
> [error] import org.apache.spark.mllib.tree.DecisionTree
> [error] ^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:6: object mllib is not a member of
> package org.apache.spark
> [error] import org.apache.spark.mllib.tree.model.DecisionTreeModel
> [error] ^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> AnotherDecisionTreeExample.scala:7: object mllib is not a member of
> package org.apache.spark
> [error] import org.apache.spark.mllib.util.MLUtils
> [error] ^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> 

Re: sbt shenanigans for a Spark-based project

2016-11-14 Thread Don Drake
I would remove your entire local Maven repo (~/.m2/repo in linux) and try
again. I'm able to compile sample code with your build.sbt and sbt
v.0.13.12.

-Don

On Mon, Nov 14, 2016 at 3:11 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> uhm.sorry.. still same issues. this is hte new version
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.11.8"
> val sparkVersion = "2.0.1"
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
> "2.0.1"
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
>
>
> resolvers += "softprops-maven" at "http://dl.bintray.com/
> content/softprops/maven"
>
> Still seeing these kinds of errors  which seems to lead to the fact that
> somehow sbt is getting confused..
>
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:2:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.linalg.{ Vector, Vectors }
> [error] ^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:3:
> object mllib is not a member of package org.apache.spark
> [error] import org.apache.spark.mllib.regression.LabeledPoint
> [error] ^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:4:
> object classification is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.classification.{
> RandomForestClassifier, RandomForestClassificationModel }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:6:
> object feature is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.feature.{ StringIndexer, IndexToString,
> VectorIndexer, VectorAssembler }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:7:
> object evaluation is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.evaluation.{ RegressionEvaluator,
> MulticlassClassificationEvaluator }
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:8:
> object classification is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.classification._
> [error]^
> [error] 
> C:\Users\marco\SparkExamples\src\main\scala\DecisionTreeExampleML.scala:9:
> object tuning is not a member of package org.apache.spark.ml
> [error] import org.apache.spark.ml.tuning.{ CrossValidator,
> ParamGridBuilder }
> [error]^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> DecisionTreeExampleML.scala:10: object tuning is not a member of package
> org.apache.spark.ml
> [error] import org.apache.spark.ml.tuning.{ ParamGridBuilder,
> TrainValidationSplit }
> [error]^
> [error] C:\Users\marco\SparkExamples\src\main\scala\
> DecisionTreeExampleML.scala:16: object Pipeline is not a member of
> package org.apache.spark.ml
> [error] import org.apache.spark.ml.{ Pipeline, PipelineModel }
>
> any other hints?
>
> thanks and regarsd
>  marco
>
>
>
>
> On Sun, Nov 13, 2016 at 10:52 PM, Don Drake <dondr...@gmail.com> wrote:
>
>> I would upgrade your Scala version to 2.11.8 as Spark 2.0 uses Scala 2.11
>> by default.
>>
>> On Sun, Nov 13, 2016 at 3:01 PM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> HI all
>>>  i have a small Spark-based project which at the moment depends on jar
>>> from Spark 1.6.0
>>> The project has few Spark examples plus one which depends on Flume
>>> libraries
>>>
>>>
>>> I am attempting to move to Spark 2.0, but i am havi

Re: how can I set the log configuration file for spark history server ?

2016-12-08 Thread Don Drake
You can update $SPARK_HOME/spark-env.sh by setting the environment
variable SPARK_HISTORY_OPTS.

See
http://spark.apache.org/docs/latest/monitoring.html#spark-configuration-options
for options (spark.history.fs.logDirectory) you can set.

There is log rotation built in (by time, not size) to the history server,
you need to enable/configure it.

Hope that helps.

-Don

On Thu, Dec 8, 2016 at 9:20 PM, John Fang 
wrote:

> ./start-history-server.sh
> starting org.apache.spark.deploy.history.HistoryServer,
> logging to /home/admin/koala/data/versions/0/SPARK/2.0.2/
> spark-2.0.2-bin-hadoop2.6/logs/spark-admin-org.apache.
> spark.deploy.history.HistoryServer-1-v069166214.sqa.zmf.out
>
> Then the history will print all log to the XXX.sqa.zmf.out, so i can't
> limit the file max size.  I want limit the size of the log file
>



-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Dynamically working out upperbound in JDBC connection to Oracle DB

2017-05-29 Thread Don Drake
Try passing maxID.toString, I think it wants the number as a string.

On Mon, May 29, 2017 at 3:12 PM, Mich Talebzadeh 
wrote:

> thanks Gents but no luck!
>
> scala> val s = HiveContext.read.format("jdbc").options(
>  | Map("url" -> _ORACLEserver,
>  | "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>  | "partitionColumn" -> "ID",
>  | "lowerBound" -> "1",
>  | "upperBound" -> maxID,
>  | "numPartitions" -> "4",
>  |  "user" -> _username,
>  | "password" -> _password)).load
> :34: error: overloaded method value options with alternatives:
>   (options: java.util.Map[String,String])org.apache.spark.sql.DataFrameReader
> 
>   (options: scala.collection.Map[String,String])org.apache.spark.sql.
> DataFrameReader
>  cannot be applied to (scala.collection.immutable.Map[String,Comparable[_
> >: java.math.BigDecimal with String <: Comparable[_ >: java.math.BigDecimal
> with String <: java.io.Serializable] with java.io.Serializable] with
> java.io.Serializable])
>val s = HiveContext.read.format("jdbc").options(
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 May 2017 at 20:12, ayan guha  wrote:
>
>> You are using maxId as a string literal. Try removing the quotes around
>> maxId
>>
>> On Tue, 30 May 2017 at 2:56 am, Jörn Franke  wrote:
>>
>>> I think you need to remove the hyphen around maxid
>>>
>>> On 29. May 2017, at 18:11, Mich Talebzadeh 
>>> wrote:
>>>
>>> Hi,
>>>
>>> This JDBC connection works with Oracle table with primary key ID
>>>
>>> val s = HiveContext.read.format("jdbc").options(
>>> Map("url" -> _ORACLEserver,
>>> "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
>>> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>> "partitionColumn" -> "ID",
>>>
>>> *"lowerBound" -> "1","upperBound" -> "1",*
>>> "numPartitions" -> "4",
>>> "user" -> _username,
>>> "password" -> _password)).load
>>>
>>> Note that both lowerbound and upperbound for ID column are fixed.
>>>
>>> However, Itried to workout upperbound dynamically as follows:
>>>
>>> //
>>> // Get maxID first
>>> //
>>> scala> val maxID = HiveContext.read.format("jdbc").options(Map("url" ->
>>> _ORACLEserver,"dbtable" -> "(SELECT MAX(ID) AS maxID FROM
>>> scratchpad.dummy)",
>>>  | "user" -> _username, "password" -> _password)).load().collect.app
>>> ly(0).getDecimal(0)
>>> maxID: java.math.BigDecimal = 1.00
>>>
>>> and this fails
>>>
>>> scala> val s = HiveContext.read.format("jdbc").options(
>>>  | Map("url" -> _ORACLEserver,
>>>  | "dbtable" -> "(SELECT ID, CLUSTERED, SCATTERED, RANDOMISED,
>>> RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>>  | "partitionColumn" -> "ID",
>>>
>>>
>>> *| "lowerBound" -> "1", | "upperBound" -> "maxID",* |
>>> "numPartitions" -> "4",
>>>  | "user" -> _username,
>>>  | "password" -> _password)).load
>>> java.lang.NumberFormatException: For input string: "maxID"
>>>   at java.lang.NumberFormatException.forInputString(NumberFormatE
>>> xception.java:65)
>>>   at java.lang.Long.parseLong(Long.java:589)
>>>   at java.lang.Long.parseLong(Long.java:631)
>>>   at scala.collection.immutable.StringLike$class.toLong(StringLik
>>> e.scala:276)
>>>   at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
>>>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelation
>>> Provider.createRelation(JdbcRelationProvider.scala:42)
>>>   at org.apache.spark.sql.execution.datasources.DataSource.
>>> resolveRelation(DataSource.scala:330)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.
>>> scala:152)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.
>>> scala:125)
>>>   ... 56 elided
>>>
>>>
>>> Any ideas how this can work!
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


flatMap() returning large class

2017-12-14 Thread Don Drake
I'm looking for some advice when I have a flatMap on a Dataset that is
creating and returning a sequence of a new case class
(Seq[BigDataStructure]) that contains a very large amount of data, much
larger than the single input record (think images).

In python, you can use generators (yield) to bypass creating a large list
of structures and returning the list.

I'm programming this is in Scala and was wondering if there are any similar
tricks to optimally return a list of classes?? I found the for/yield
semantics, but it appears the compiler is just creating a sequence for you
and this will blow through my Heap given the number of elements in the list
and the size of each element.

Is there anything else I can use?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: flatMap() returning large class

2017-12-17 Thread Don Drake
Hey Richard,

Good to hear from you as well.  I thought I would ask if there was
something Scala specific I was missing in handling these large classes.

I can tweak my job to do a map() and then only one large object will be
created at a time and returned, which should allow me to lower my executor
memory size.

Thanks.

-Don


On Thu, Dec 14, 2017 at 2:58 PM, Richard Garris <rlgar...@databricks.com>
wrote:

> Hi Don,
>
> Good to hear from you. I think the problem is that regardless of whether
> you use yield or a generator - Spark internally will produce the entire
> result as a single large JVM object which will blow up your heap space.
>
> Would it be possible to shrink the overall size of the image object
> storing it as a vector or Array vs a large Java class object?
>
> That might be the more prudent approach.
>
> -RG
>
> Richard Garris
>
> Principal Architect
>
> Databricks, Inc
>
> 650.200.0840 <(650)%20200-0840>
>
> rlgar...@databricks.com
>
> On December 14, 2017 at 10:23:00 AM, Marcelo Vanzin (van...@cloudera.com)
> wrote:
>
> This sounds like something mapPartitions should be able to do, not
> sure if there's an easier way.
>
> On Thu, Dec 14, 2017 at 10:20 AM, Don Drake <dondr...@gmail.com> wrote:
> > I'm looking for some advice when I have a flatMap on a Dataset that is
> > creating and returning a sequence of a new case class
> > (Seq[BigDataStructure]) that contains a very large amount of data, much
> > larger than the single input record (think images).
> >
> > In python, you can use generators (yield) to bypass creating a large
> list of
> > structures and returning the list.
> >
> > I'm programming this is in Scala and was wondering if there are any
> similar
> > tricks to optimally return a list of classes?? I found the for/yield
> > semantics, but it appears the compiler is just creating a sequence for
> you
> > and this will blow through my Heap given the number of elements in the
> list
> > and the size of each element.
> >
> > Is there anything else I can use?
> >
> > Thanks.
> >
> > -Don
> >
> > --
> > Donald Drake
> > Drake Consulting
> > http://www.drakeconsulting.com/
> > https://twitter.com/dondrake
> > 800-733-2143 <(800)%20733-2143>
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143