SparkSql newbie problems with nested selects

2014-07-13 Thread Andy Davidson
Hi 

I am running into trouble with a nested query using python. To try and debug
it, I first wrote the query I want using sqlite3

select freq.docid, freqTranspose.docid, sum(freq.count *
freqTranspose.count) from
   Frequency as freq,
   (select term, docid, count from Frequency) as freqTranspose
   where freq.term = freqTranspose.term
   group by freq.docid, freqTranspose.docid
   ;


Sparksql has trouble parsing the (select ) as freqTranspose ³ line

Here is what my input data looks like
$ head -n 3 reuters.db.csv
docid,term,count
1_txt_earn,net,1
1_txt_earn,rogers,4

The output from sqlite3 is
$ head -n 6 3hSimilarityMatrix.slow.sql.out
freq.docid  freqTranspose.docid  sum(freq.count * freqTranspose.count)
--  ---  -
1_txt_earn  1_txt_earn   127
1_txt_earn  10054_txt_earn   33
1_txt_earn  10080_txt_crude  146
1_txt_earn  10088_txt_acq11
$ 


My code example pretty much follows
http://spark.apache.org/docs/latest/sql-programming-guide.html

dataFile = sc.textFile(reuters.db.csv²)
lines = dataFile.map(lambda l: l.split(,²))
def mapLines(line) :
ret = {}
ret['docid'] = line[0]
ret['term'] = line[1]
ret['count'] = line[2]
return ret
frequency = lines.map(mapLines)
schemaFrequency = sqlContext.inferSchema(frequency)
schemaFrequency.registerAsTable(frequency²)

Okay here is where I run into trouble

sqlCmd = select \
freq.docid, \
freqTranspose.docid \
  from \
  frequency as freq, \
  (select term, docid, count from frequency)  \

similarities = sqlContext.sql(sqlCmd)


/Users/andy/workSpace/dataBricksIntroToApacheSpark/USBStick/spark/python/lib
/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer,
gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o40.sql.
: java.lang.RuntimeException: [1.153] failure: ``('' expected but `from'
found

select freq.docid, freqTranspose.docid
from   frequency as freq,   (select term, docid,
count from frequency)
   

Simple sql seems to ³parse² I.e. Select  freq.docid from frequency as freq

Any suggestions would be greatly appreciated.

Andy

P.s. I should note, I think I am using version 1.0 ?






Re: SparkSql newbie problems with nested selects

2014-07-13 Thread Andy Davidson
Hi Michael

Changing my col name to something other the Œcount¹ . Fixed the parse error

Many thanks, 

Andy

From:  Michael Armbrust mich...@databricks.com
Reply-To:  user@spark.apache.org
Date:  Sunday, July 13, 2014 at 1:18 PM
To:  user@spark.apache.org
Cc:  u...@spark.incubator.apache.org u...@spark.incubator.apache.org
Subject:  Re: SparkSql newbie problems with nested selects

 Hi Andy,
 
 The SQL parser is pretty basic (we plan to improve this for the 1.2 release).
 In this case I think part of the problem is that one of your variables is
 count, which is a reserved word.  Unfortunately, we don't have the ability
 to escape identifiers at this point.
 
 However, I did manage to get your query to parse using the HiveQL parser,
 provided by HiveContext.
 
 hiveCtx.hql(
 select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count)
 from 
Frequency freq JOIN
(select term, docid, count from Frequency) freqTranspose
where freq.term = freqTranspose.term
group by freq.docid, freqTranspose.docid)
 
 Michael
 
 
 On Sun, Jul 13, 2014 at 12:43 PM, Andy Davidson
 a...@santacruzintegration.com wrote:
 Hi 
 
 I am running into trouble with a nested query using python. To try and debug
 it, I first wrote the query I want using sqlite3
 
 select freq.docid, freqTranspose.docid, sum(freq.count * freqTranspose.count)
 from 
Frequency as freq,
(select term, docid, count from Frequency) as freqTranspose
where freq.term = freqTranspose.term
group by freq.docid, freqTranspose.docid
;
 
 
 Sparksql has trouble parsing the (select ) as freqTranspose ³ line
 
 Here is what my input data looks like
 $ head -n 3 reuters.db.csv
 docid,term,count
 1_txt_earn,net,1
 1_txt_earn,rogers,4
 
 The output from sqlite3 is
 $ head -n 6 3hSimilarityMatrix.slow.sql.out
 freq.docid  freqTranspose.docid  sum(freq.count * freqTranspose.count)
 --  ---  -
 1_txt_earn  1_txt_earn   127
 1_txt_earn  10054_txt_earn   33
 1_txt_earn  10080_txt_crude  146
 1_txt_earn  10088_txt_acq11
 $ 
 
 
 My code example pretty much follows
 http://spark.apache.org/docs/latest/sql-programming-guide.html
 
 dataFile = sc.textFile(reuters.db.csv²)
 lines = dataFile.map(lambda l: l.split(,²))
 def mapLines(line) :
 ret = {}
 ret['docid'] = line[0]
 ret['term'] = line[1]
 ret['count'] = line[2]
 return ret
 frequency = lines.map(mapLines)
 schemaFrequency = sqlContext.inferSchema(frequency)
 schemaFrequency.registerAsTable(frequency²)
 
 Okay here is where I run into trouble
 
 sqlCmd = select \
 freq.docid, \
 freqTranspose.docid \
   from \
   frequency as freq, \
   (select term, docid, count from frequency)  \
 
 similarities = sqlContext.sql(sqlCmd)
 
 
 /Users/andy/workSpace/dataBricksIntroToApacheSpark/USBStick/spark/python/lib/
 py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer,
 gateway_client, target_id, name)298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.-- 300 format(target_id, '.', name),
 value)
 301 else:302 raise Py4JError(
 
 Py4JJavaError: An error occurred while calling o40.sql.
 : java.lang.RuntimeException: [1.153] failure: ``('' expected but `from'
 found
 
 select freq.docid, freqTranspose.docid   from
 frequency as freq,   (select term, docid, count from frequency)
 
 
 Simple sql seems to ³parse² I.e. Select  freq.docid from frequency as freq
 
 Any suggestions would be greatly appreciated.
 
 Andy
 
 P.s. I should note, I think I am using version 1.0 ?
 
 
 




how to report documentation bug?

2014-09-16 Thread Andy Davidson

http://spark.apache.org/docs/latest/quick-start.html#standalone-applications

Click on java tab There is a bug in the maven section

  version1.1.0-SNAPSHOT/version
 

Should be 
version1.1.0/version

Hope this helps

Andy




spark-1.1.0-bin-hadoop2.4 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass

2014-09-17 Thread Andy Davidson
Hi I am new to spark.

I am trying to write a simple java program that process tweets that where
collected and stored in a file. I figured the simplest thing to do would be
to convert the JSON string into a java map. When I submit my jar file I keep
getting the following error

java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass


For the life of me I can not figure out what the problem is. I am using
maven shade plugin and check to see that the missing class is my uber jar

Any suggestions would be greatly appreciated.

Andy

P.s. I should mention that I am running everything on my local machine.

properties

jackson.version1.8.8/jackson.version !-- 1.9.13 --

project.build.sourceEncodingUTF-8/project.build.sourceEncoding

/properties



dependencies

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-core-asl/artifactId

version${jackson.version}/version

/dependency

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-mapper-asl/artifactId

version${jackson.version}/version

/dependency



I am using shade to build an uber jar



$jar ­tvf target/myUber.jar

Š

   580 Wed Sep 17 16:17:36 PDT 2014
org/codehaus/jackson/annotate/JsonClass.class

Š




14/09/17 16:13:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass

at 
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeseri
alizationType(JacksonAnnotationIntrospector.java:524)

at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotati
on(BasicDeserializerFactory.java:732)

at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.createMapDeserialize
r(BasicDeserializerFactory.java:337)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(S
tdDeserializerProvider.java:377)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdD
eserializerProvider.java:307)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueD
eserializer(StdDeserializerProvider.java:287)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer
(StdDeserializerProvider.java:136)

at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeseria
lizer(StdDeserializerProvider.java:157)

at 
org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.jav
a:2468)

at 
org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:240
2)

at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1609)

at com.santacruzintegration.spark.JSONHelper.toJsonMap(JSONHelper.java:40)





public class JSONHelper {

public static final ObjectMapper mapper = new ObjectMapper();

static {

mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);

//mapper.configure(Feature.USE_ANNOTATIONS, false);

}

Š

   public static MapString, String toJsonMap(String json) {

System.err.println(AEDWIP toJsonMap:  + json);

try {

TypeReferenceHashMapString, String typeRef = new
TypeReferenceHashMapString, String() {

};

//return mapper.readValue(json, new
TypeReferenceHashMapString,String(){});

return mapper.readValue(json, typeRef);

//return mapper.convertValue(json, typeRef);



//HashMapString,String ret =

//new ObjectMapper().readValue(json, HashMap.class);

} catch (Exception e) {

throw new IOError(e);

}

}









Re: spark-1.1.0-bin-hadoop2.4 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass

2014-09-18 Thread Andy Davidson
After lots of hacking I figure out how to resolve this problem. This is good
solution. It severalty cripples jackson but at least for now I am unblocked

1) turn off annotations.
mapper.configure(Feature.USE_ANNOTATIONS, false);


2) in maven set the jackson dependencies as provided.

properties

jackson.version1.9.13/jackson.version !-- 1.8.8 1.9.13 2.3.4-Final --

/properties



dependencies

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-core-asl/artifactId

scopeprovided/scope

version${jackson.version}/version

/dependency

dependency

groupIdorg.codehaus.jackson/groupId

artifactIdjackson-mapper-asl/artifactId

scopeprovided/scope

version${jackson.version}/version

/dependency



Hope this helps someone else



andy


From:  Andrew Davidson a...@santacruzintegration.com
Date:  Wednesday, September 17, 2014 at 4:28 PM
To:  user@spark.apache.org user@spark.apache.org
Subject:  spark-1.1.0-bin-hadoop2.4 java.lang.NoClassDefFoundError:
org/codehaus/jackson/annotate/JsonClass

 Hi I am new to spark.
 
 I am trying to write a simple java program that process tweets that where
 collected and stored in a file. I figured the simplest thing to do would be to
 convert the JSON string into a java map. When I submit my jar file I keep
 getting the following error
 
 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
 
 
 For the life of me I can not figure out what the problem is. I am using maven
 shade plugin and check to see that the missing class is my uber jar
 
 Any suggestions would be greatly appreciated.
 
 Andy
 
 P.s. I should mention that I am running everything on my local machine.
 
 properties
 
 jackson.version1.8.8/jackson.version !-- 1.9.13 --
 
 project.build.sourceEncodingUTF-8/project.build.sourceEncoding
 
 /properties
 
 
 
 dependencies
 
 dependency
 
 groupIdorg.codehaus.jackson/groupId
 
 artifactIdjackson-core-asl/artifactId
 
 version${jackson.version}/version
 
 /dependency
 
 dependency
 
 groupIdorg.codehaus.jackson/groupId
 
 artifactIdjackson-mapper-asl/artifactId
 
 version${jackson.version}/version
 
 /dependency
 
 
 
 I am using shade to build an uber jar
 
 
 
 $jar ­tvf target/myUber.jar
 
 Š
 
580 Wed Sep 17 16:17:36 PDT 2014
 org/codehaus/jackson/annotate/JsonClass.class
 
 Š
 
 
 
 
 14/09/17 16:13:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 
 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
 
 at 
 org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserial
 izationType(JacksonAnnotationIntrospector.java:524)
 
 at 
 org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation
 (BasicDeserializerFactory.java:732)
 
 at 
 org.codehaus.jackson.map.deser.BasicDeserializerFactory.createMapDeserializer(
 BasicDeserializerFactory.java:337)
 
 at 
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(Std
 DeserializerProvider.java:377)
 
 at 
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDes
 erializerProvider.java:307)
 
 at 
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDes
 erializer(StdDeserializerProvider.java:287)
 
 at 
 org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(S
 tdDeserializerProvider.java:136)
 
 at 
 org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeseriali
 zer(StdDeserializerProvider.java:157)
 
 at 
 org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:
 2468)
 
 at 
 org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402)
 
 at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1609)
 
 at com.santacruzintegration.spark.JSONHelper.toJsonMap(JSONHelper.java:40)
 
 
 
 
 
 public class JSONHelper {
 
 public static final ObjectMapper mapper = new ObjectMapper();
 
 static {
 
 mapper.configure(Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
 //mapper.configure(Feature.USE_ANNOTATIONS, false);
 
 }
 
 Š
 
public static MapString, String toJsonMap(String json) {
 
 System.err.println(AEDWIP toJsonMap:  + json);
 
 try {
 
 TypeReferenceHashMapString, String typeRef = new
 TypeReferenceHashMapString, String() {
 
 };
 
 //return mapper.readValue(json, new
 TypeReferenceHashMapString,String(){});
 
 return mapper.readValue(json, typeRef);
 
 //return mapper.convertValue(json, typeRef);
 
 
 
 //HashMapString,String ret =
 
 //new ObjectMapper().readValue(json, HashMap.class);
 
 } catch (Exception e) {
 
 throw new IOError(e);
 
 }
 
 }
 
 
 
 
 




RDD pipe example. Is this a bug or a feature?

2014-09-19 Thread Andy Davidson
Hi

I am wrote a little java job to try and figure out how RDD pipe works.
Bellow is my test shell script. If in the script I turn on debugging I get
output. In my console. If debugging is turned off in the shell script, I do
not see anything in my console. Is this a bug or feature?

I am running the job locally on a Mac

Thanks

Andy


Here is my Java

rdd.pipe(pwd + /src/main/bin/RDDPipe.sh).collect();



#!/bin/sh 



#

# Use this shell script to figure out how spark RDD pipe() works

#



set -x # turns shell debugging on

#set +x # turns shell debugging off



while read x ; 

do 

echo RDDPipe.sh $x ;

Done



Here is the output if debugging is turned on

$ !grep

grep RDDPipe run.sh.out

+ echo RDDPipe.sh 0

+ echo RDDPipe.sh 0

+ echo RDDPipe.sh 2

+ echo RDDPipe.sh 0

+ echo RDDPipe.sh 3

+ echo RDDPipe.sh 0

+ echo RDDPipe.sh 0

$ 




spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-25 Thread Andy Davidson
Hi

I am running into trouble using iPython notebook on my cluster. Use the
following command to set the cluster up

$ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
--region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME


On master I launch python as follows
$ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
$SPARK_HOME/bin/pyspark


It looks like the problem is the cluster is using an old version of python
and python. Any idea how I can easily upgrade ? The following version works
on my mac

Thanks

Andy

{'commit_hash': '681fd77',
 'commit_source': 'installation',
 'default_encoding': 'UTF-8',
 'ipython_path': '/Library/Python/2.7/site-packages/IPython',
 'ipython_version': '2.1.0',
 'os_name': 'posix',
 'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
 'sys_executable': '/usr/bin/python',
 'sys_platform': 'darwin',
 'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}







problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-26 Thread Andy Davidson
Hi Davies

The real issue is about cluster management. I am new to the spark world and
am not a system administrator.  It seem like the problem is with the
spark-ec2 launch script. It is installing  old version of python

In the mean time I am trying to figure out how I can manually install the
correct version on all the machines in my cluster

Thanks

Andy

From:  Davies Liu dav...@databricks.com
Date:  Thursday, September 25, 2014 at 9:58 PM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

 Maybe you have Python 2.7 on master but Python 2.6 in cluster,
 you should upgrade python to 2.7 in cluster, or use python 2.6 in
 master by set PYSPARK_PYTHON=python2.6
 
 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
 a...@santacruzintegration.com wrote:
  Hi
 
  I am running into trouble using iPython notebook on my cluster. Use the
  following command to set the cluster up
 
  $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
  --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME
 
 
  On master I launch python as follows
 
  $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
  $SPARK_HOME/bin/pyspark
 
 
  It looks like the problem is the cluster is using an old version of python
  and python. Any idea how I can easily upgrade ? The following version works
  on my mac
 
  Thanks
 
  Andy
 
  {'commit_hash': '681fd77',
   'commit_source': 'installation',
   'default_encoding': 'UTF-8',
   'ipython_path': '/Library/Python/2.7/site-packages/IPython',
   'ipython_version': '2.1.0',
   'os_name': 'posix',
   'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
   'sys_executable': '/usr/bin/python',
   'sys_platform': 'darwin',
   'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
  Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}
 
 
 
 
 




Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-26 Thread Andy Davidson
Many many thanks

Andy

From:  Nicholas Chammas nicholas.cham...@gmail.com
Date:  Friday, September 26, 2014 at 11:24 AM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  Davies Liu dav...@databricks.com, user@spark.apache.org
user@spark.apache.org
Subject:  Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line
magic function `%matplotlib` not found

 Are you able to use the regular PySpark shell on your EC2 cluster? That would
 be the first thing to confirm is working.
 
 I don’t know whether the version of Python on the cluster would affect whether
 IPython works or not, but if you want to try manually upgrading Python on a
 cluster launched by spark-ec2, there are some instructions in the comments
 here https://issues.apache.org/jira/browse/SPARK-922  for doing so.
 
 Nick
 
 ​
 
 On Fri, Sep 26, 2014 at 2:18 PM, Andy Davidson a...@santacruzintegration.com
 wrote:
 Hi Davies
 
 The real issue is about cluster management. I am new to the spark world and
 am not a system administrator.  It seem like the problem is with the
 spark-ec2 launch script. It is installing  old version of python
 
 In the mean time I am trying to figure out how I can manually install the
 correct version on all the machines in my cluster
 
 Thanks
 
 Andy
 
 From:  Davies Liu dav...@databricks.com
 Date:  Thursday, September 25, 2014 at 9:58 PM
 To:  Andrew Davidson a...@santacruzintegration.com
 Cc:  user@spark.apache.org user@spark.apache.org
 Subject:  Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
 
 Maybe you have Python 2.7 on master but Python 2.6 in cluster,
 you should upgrade python to 2.7 in cluster, or use python 2.6 in
 master by set PYSPARK_PYTHON=python2.6
 
 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
 a...@santacruzintegration.com wrote:
  Hi
 
  I am running into trouble using iPython notebook on my cluster. Use the
  following command to set the cluster up
 
  $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
  --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME
 
 
  On master I launch python as follows
 
  $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000
  $SPARK_HOME/bin/pyspark
 
 
  It looks like the problem is the cluster is using an old version of python
  and python. Any idea how I can easily upgrade ? The following version
 works
  on my mac
 
  Thanks
 
  Andy
 
  {'commit_hash': '681fd77',
   'commit_source': 'installation',
   'default_encoding': 'UTF-8',
   'ipython_path': '/Library/Python/2.7/site-packages/IPython',
   'ipython_version': '2.1.0',
   'os_name': 'posix',
   'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
   'sys_executable': '/usr/bin/python',
   'sys_platform': 'darwin',
   'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
  Compatible Apple LLVM 5.0 (clang-500.0.68)]’}
 
 
 
 
 
 




iPython notebook ec2 cluster matlabplot not found?

2014-09-27 Thread Andy Davidson
Hi 

I am having a heck of time trying to get python to work correctly on my
cluster created using  the spark-ec2 script

The following link was really helpful
https://issues.apache.org/jira/browse/SPARK-922


I am still running into problem with matplotlib. (it works fine on my mac).
I can not figure out how to get libagg, freetype, or Qhull dependencies
installed. 

Has anyone else run into this problem?

Thanks

Andy

sudo yum install freetype-devel

sudo yum install libpng-devel

sudo pip2.7 install six

sudo pip2.7 install python-dateutil

sudo pip2.7 install pyparsing

sudo pip2.7 install pycxx



sudo pip2.7 install matplotlib


ec2-user@ip-172-31-15-87 ~]$ sudo pip2.7 install matplotlib

Downloading/unpacking matplotlib

  Downloading matplotlib-1.4.0.tar.gz (51.2MB): 51.2MB downloaded

  Running setup.py (path:/tmp/pip_build_root/matplotlib/setup.py) egg_info
for package matplotlib




Edit setup.cfg to change the build options



BUILDING MATPLOTLIB

matplotlib: yes [1.4.0]

python: yes [2.7.5 (default, Sep 15 2014, 17:30:20)
[GCC

4.8.2 20140120 (Red Hat 4.8.2-16)]]

  platform: yes [linux2]



REQUIRED DEPENDENCIES AND EXTENSIONS

 numpy: yes [version 1.9.0]

   six: yes [using six version 1.8.0]

  dateutil: yes [using dateutil version 2.2]

   tornado: yes [using tornado version 4.0.2]

 pyparsing: yes [using pyparsing version 2.0.2]

 pycxx: yes [Couldn't import.  Using local copy.]

libagg: yes [pkg-config information for 'libagg' could
not

be found. Using local copy.]

  freetype: no  [Requires freetype2 2.4 or later.  Found

2.3.11.]

   png: yes [version 1.2.49]

 qhull: yes [pkg-config information for 'qhull' could
not be

found. Using local copy.]



OPTIONAL SUBPACKAGES

   sample_data: yes [installing]

  toolkits: yes [installing]

 tests: yes [using nose version 1.3.4 / mock is required
to

run the matplotlib test suite.  pip/easy_install
may

attempt to install it after matplotlib.]

toolkits_tests: yes [using nose version 1.3.4 / mock is required
to

run the matplotlib test suite.  pip/easy_install
may

attempt to install it after matplotlib.]



OPTIONAL BACKEND EXTENSIONS

macosx: no  [Mac OS-X only]

qt5agg: no  [PyQt5 not found]

qt4agg: no  [PyQt4 not found]

pyside: no  [PySide not found]

   gtk3agg: no  [Requires pygobject to be installed.]

 gtk3cairo: no  [Requires cairocffi or pycairo to be
installed.]

gtkagg: no  [Requires pygtk]

 tkagg: no  [TKAgg requires Tkinter.]

 wxagg: no  [requires wxPython]

   gtk: no  [Requires pygtk]

   agg: yes [installing]

 cairo: no  [cairocffi or pycairo not found]

 windowing: no  [Microsoft Windows only]



OPTIONAL LATEX DEPENDENCIES

dvipng: no

   ghostscript: yes [version 8.70]

 latex: yes [version 3.141592]

   pdftops: no






* The following required packages can not be
built:

* freetype

Complete output from command python setup.py egg_info:






Edit setup.cfg to change the build options







BUILDING MATPLOTLIB



matplotlib: yes [1.4.0]



python: yes [2.7.5 (default, Sep 15 2014, 17:30:20)  [GCC



4.8.2 20140120 (Red Hat 4.8.2-16)]]



  platform: yes [linux2]







REQUIRED DEPENDENCIES AND EXTENSIONS



 numpy: yes [version 1.9.0]



   six: yes [using six version 1.8.0]



  dateutil: yes [using dateutil version 2.2]



   tornado: yes [using tornado version 4.0.2]



 pyparsing: yes [using pyparsing version 2.0.2]



 pycxx: yes [Couldn't import.  Using local copy.]



libagg: yes [pkg-config information for 'libagg' could not



be found. Using local copy.]



  freetype: no  [Requires freetype2 2.4 or later.  Found




Re: iPython notebook ec2 cluster matlabplot not found?

2014-09-29 Thread Andy Davidson
Hi Nicholas

Yes out of the box PySpark works. My problem is I am using iPython note book
and matlabplot is not found. It seems that out of the box the cluster has an
old version of python and iPython notebook. It was suggested I upgrade
iPython because the new version include matlabplot. This upgrade requires
going to python 2.7. The python upgrade and iPython upgrade seemed to work
how ever I am still getting my original problem

ERROR: Line magic function `%matplotlib` not found

I also posted to the iPython-dev mail list. So far I have not found a
solution. Maybe I’ll have to switch to a different graphing package

Thanks

Andy

From:  Nicholas Chammas nicholas.cham...@gmail.com
Date:  Saturday, September 27, 2014 at 4:49 PM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: iPython notebook ec2 cluster matlabplot not found?

 Can you first confirm that the regular PySpark shell works on your cluster?
 Without upgrading to 2.7. That is, you log on to your master using spark-ec2
 login and run bin/pyspark successfully without any special flags.
 
 And as far as I can tell, you should be able to use IPython at 2.6, so I’d
 next confirm that that is working before throwing the 2.7 upgrade into the
 mix.
 
 Also, when upgrading or installing things, try doing so for all the nodes in
 your cluster using pssh. If you install stuff just on the master without
 somehow transferring it to the slaves, that will be problematic.
 
 Finally, there is an open pull request
 https://github.com/apache/spark/pull/2554  related to IPython that may be
 relevant, though I haven’t looked at it too closely.
 
 Nick
 
 ​
 
 On Sat, Sep 27, 2014 at 7:33 PM, Andy Davidson a...@santacruzintegration.com
 wrote:
 Hi 
 
 I am having a heck of time trying to get python to work correctly on my
 cluster created using  the spark-ec2 script
 
 The following link was really helpful
 https://issues.apache.org/jira/browse/SPARK-922
 
 
 I am still running into problem with matplotlib. (it works fine on my mac). I
 can not figure out how to get libagg, freetype, or Qhull dependencies
 installed. 
 
 Has anyone else run into this problem?
 
 Thanks
 
 Andy
 
 sudo yum install freetype-devel
 
 sudo yum install libpng-devel
 
 sudo pip2.7 install six
 
 sudo pip2.7 install python-dateutil
 
 sudo pip2.7 install pyparsing
 
 sudo pip2.7 install pycxx
 
 
 
 sudo pip2.7 install matplotlib
 
 
 ec2-user@ip-172-31-15-87 ~]$ sudo pip2.7 install matplotlib
 
 Downloading/unpacking matplotlib
 
   Downloading matplotlib-1.4.0.tar.gz (51.2MB): 51.2MB downloaded
 
   Running setup.py (path:/tmp/pip_build_root/matplotlib/setup.py) egg_info
 for package matplotlib
 
 
 
 
 Edit setup.cfg to change the build options
 
 
 
 BUILDING MATPLOTLIB
 
 matplotlib: yes [1.4.0]
 
 python: yes [2.7.5 (default, Sep 15 2014, 17:30:20)  [GCC
 
 4.8.2 20140120 (Red Hat 4.8.2-16)]]
 
   platform: yes [linux2]
 
 
 
 REQUIRED DEPENDENCIES AND EXTENSIONS
 
  numpy: yes [version 1.9.0]
 
six: yes [using six version 1.8.0]
 
   dateutil: yes [using dateutil version 2.2]
 
tornado: yes [using tornado version 4.0.2]
 
  pyparsing: yes [using pyparsing version 2.0.2]
 
  pycxx: yes [Couldn't import.  Using local copy.]
 
 libagg: yes [pkg-config information for 'libagg' could
 not
 
 be found. Using local copy.]
 
   freetype: no  [Requires freetype2 2.4 or later.  Found
 
 2.3.11.]
 
png: yes [version 1.2.49]
 
  qhull: yes [pkg-config information for 'qhull' could not
 be
 
 found. Using local copy.]
 
 
 
 OPTIONAL SUBPACKAGES
 
sample_data: yes [installing]
 
   toolkits: yes [installing]
 
  tests: yes [using nose version 1.3.4 / mock is required
 to
 
 run the matplotlib test suite.  pip/easy_install
 may
 
 attempt to install it after matplotlib.]
 
 toolkits_tests: yes [using nose version 1.3.4 / mock is required
 to
 
 run the matplotlib test suite.  pip/easy_install
 may
 
 attempt to install it after matplotlib.]
 
 
 
 OPTIONAL BACKEND EXTENSIONS
 
 macosx: no  [Mac OS-X only]
 
 qt5agg: no  [PyQt5 not found]
 
 qt4agg: no  [PyQt4 not found]
 
 pyside: no  [PySide not found]
 
gtk3agg: no  [Requires pygobject to be installed.]
 
  gtk3cairo

Re: iPython notebook ec2 cluster matlabplot not found?

2014-09-29 Thread Andy Davidson
Hi Nicholas

I wrote some test code and found a way to get my matplotlib  script to work
with the out of the box cluster created by spark-ec2

1. I commented out the python inline magic
#%matplotlib inline

2. Replace

 #clear_output(wait=True)
clear_output(True)


The instructions of upgrading python you post on are very helpful.
https://issues.apache.org/jira/browse/SPARK-922

I got a response from the ipthon-dev mail list about the matplotlib upgrade
problem

“The issue is that we are requiring freetype 2.4 when we only need 2.3.
This has been fixed on both the maintenance branch and master and will
be included in 1.4.1 (which we hope to get out in the next week).

See https://github.com/matplotlib/matplotlib/issues/3413.

Tom”


Here is some back ground info about the versions installed out of the box

##
## python version on mac
##
import IPython
print IPython.sys_info()
'ipython_path': '/Library/Python/2.7/site-packages/IPython',
 'ipython_version': '2.1.0',
sys_version': '2.7.5

import matplotlib
matplotlib.__version__
'1.1.1'

matplotlib.__file__
'/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/
matplotlib/__init__.pyc'


##
## ec2 cluster version before upgrade
##
import IPython
print IPython.sys_info()

 'ipython_version': '0.13.2',
 'sys_executable': '/usr/bin/python2.6',
 'sys_version': '2.6.8 (unknown, Mar 14 2013, 09:31:22) \n[GCC 4.6.2
20111027 (Red Hat 4.6.2-2)]'}

import matplotlib
matplotlib.__version__

'0.99.1.1'


matplotlib.__file__

'/usr/lib64/python2.6/site-packages/matplotlib/__init__.pyc'


From:  Nicholas Chammas nicholas.cham...@gmail.com
Date:  Saturday, September 27, 2014 at 4:49 PM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: iPython notebook ec2 cluster matlabplot not found?

 Can you first confirm that the regular PySpark shell works on your cluster?
 Without upgrading to 2.7. That is, you log on to your master using spark-ec2
 login and run bin/pyspark successfully without any special flags.
 
 And as far as I can tell, you should be able to use IPython at 2.6, so I’d
 next confirm that that is working before throwing the 2.7 upgrade into the
 mix.
 
 Also, when upgrading or installing things, try doing so for all the nodes in
 your cluster using pssh. If you install stuff just on the master without
 somehow transferring it to the slaves, that will be problematic.
 
 Finally, there is an open pull request
 https://github.com/apache/spark/pull/2554  related to IPython that may be
 relevant, though I haven’t looked at it too closely.
 
 Nick
 
 ​
 
 On Sat, Sep 27, 2014 at 7:33 PM, Andy Davidson a...@santacruzintegration.com
 wrote:
 Hi 
 
 I am having a heck of time trying to get python to work correctly on my
 cluster created using  the spark-ec2 script
 
 The following link was really helpful
 https://issues.apache.org/jira/browse/SPARK-922
 
 
 I am still running into problem with matplotlib. (it works fine on my mac). I
 can not figure out how to get libagg, freetype, or Qhull dependencies
 installed. 
 
 Has anyone else run into this problem?
 
 Thanks
 
 Andy
 
 sudo yum install freetype-devel
 
 sudo yum install libpng-devel
 
 sudo pip2.7 install six
 
 sudo pip2.7 install python-dateutil
 
 sudo pip2.7 install pyparsing
 
 sudo pip2.7 install pycxx
 
 
 
 sudo pip2.7 install matplotlib
 
 
 ec2-user@ip-172-31-15-87 ~]$ sudo pip2.7 install matplotlib
 
 Downloading/unpacking matplotlib
 
   Downloading matplotlib-1.4.0.tar.gz (51.2MB): 51.2MB downloaded
 
   Running setup.py (path:/tmp/pip_build_root/matplotlib/setup.py) egg_info
 for package matplotlib
 
 
 
 
 Edit setup.cfg to change the build options
 
 
 
 BUILDING MATPLOTLIB
 
 matplotlib: yes [1.4.0]
 
 python: yes [2.7.5 (default, Sep 15 2014, 17:30:20)  [GCC
 
 4.8.2 20140120 (Red Hat 4.8.2-16)]]
 
   platform: yes [linux2]
 
 
 
 REQUIRED DEPENDENCIES AND EXTENSIONS
 
  numpy: yes [version 1.9.0]
 
six: yes [using six version 1.8.0]
 
   dateutil: yes [using dateutil version 2.2]
 
tornado: yes [using tornado version 4.0.2]
 
  pyparsing: yes [using pyparsing version 2.0.2]
 
  pycxx: yes [Couldn't import.  Using local copy.]
 
 libagg: yes [pkg-config information for 'libagg' could
 not
 
 be found. Using local copy.]
 
   freetype: no  [Requires freetype2 2.4 or later.  Found
 
 2.3.11.]
 
png: yes [version 1.2.49]
 
  qhull: yes [pkg-config information for 'qhull' could not
 be
 
 found. Using local copy.]
 
 
 
 OPTIONAL SUBPACKAGES

newbie system architecture problem, trouble using streaming and RDD.pipe()

2014-09-29 Thread Andy Davidson
Hello

I am trying to build a system that does a very simple calculation on a
stream and displays the results in a graph that I want to update the graph
every second or so. I think I have a fundamental mis understanding about how
steams and rdd.pipe() works. I want to do the data visualization part using
Ipython notebook. Its really easy to graph, animate, and share the page. I
understand streaming does not work in python yet. Googleing around it
appears you can use RDD.pipe() to get the streaming data into python.

I have a little Ipython notebook I have been experimenting with. It use
rdd.pipe() to run the following java job The psudo code is

Main() {
   JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(1000));

 JavaDStreamString logs = createStream(ssc)
   JavaDStreamString msg = logs.filter(selectMsgLevel);

JavaDStreamLong count = msg.count()

 Logs.print()
ssc.start();

ssc.awaitTermination();

}



I do not understand how I can pass any data back to python. If my
understanding is correct everything runs on a worker, there is no way for
the driver to get get the value of mini batch count and write it out to
standard out.


The Streaming documentation on Œdesign patterns for using foreachRDD
Πdemonstrates how the slaves/works can send data to other systems. So does
the over all architecture of my little demo need to be something like

Process a) iPython Note book rdd.pipe(myReader.sh)

Process b) myReader.sh is basically some little daemon process that the
workers can connect to. It will just write what ever it receives to standard
out.

Process c) is java spark streaming code

Do I really need the ³daemon in the middle² ?

Any insights would be greatly appreciated

Andy


P.s. I assume that if I wanted to use aggregates I would need to use
JDStream.wrappRDD() . Is this correct? Is it expensive?






how to get actual count from as long from JavaDStream ?

2014-09-30 Thread Andy Davidson
Hi 

I have a simple streaming app. All I want to do is figure out how many lines
I have received in the current mini batch. If numLines was a JavaRDD I could
simply call count(). How do you do something similar in Streaming?



Here is my psudo code



JavaDStreamString msg = logs.filter(selectINFO);

JavaDStreamLong numLines  = msg.count()



Long totalCount = numLines ???





Here is what I am really trying to do. I have a python script that generated
a graph of totalCount vs time. Python does not support streaming. As a work
around I have a java program that does the steaming. I want to pass the data
back to the python script. It has been suggested I can use rdd.pipe().



In python I call rdd.pipe(scriptToStartJavaSteam.sh)



All I need to do is for each mini batch figure out how to get the the count
of the current mini batch and write it to standard out. Seems like this
should be simple. 



Maybe Streams do not work the way I think? In a spark core app, I am able to
get values like count in my driver and do what ever I want with the local
value. With streams I know I am getting mini patches because print() display
the first 10 lines of my steam. I assume that some how print is executed in
my driver so somehow  data was sent from the workers back to the driver.



Any comments or suggestions would be greatly appreciated.



Andy



P.s. Should I be asking a different question?












Re: how to get actual count from as long from JavaDStream ?

2014-09-30 Thread Andy Davidson
Hi Jon

Thanks, foreachRDD seems to work. I am running on a 4 machine cluster.  Its
seems like Function executed by foreachRDD is running on my driver. I used
logging to check. This is exactly what I want. I need to write my final
results back to stdout so RDD.pipe() will work. I do not have any evidence
that anything ever ran on any of the workers. I wonder things are working
because I do not have a lot of data?

I need to do some more testing.

Andy

From:  Jon Gregg jonrgr...@gmail.com
Date:  Tuesday, September 30, 2014 at 1:22 PM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: how to get actual count from as long from JavaDStream ?

 Hi Andy
 
 I'm new to Spark and have been working with Scala not Java but I see there's a
 dstream() method to convert from JavaDStream to DStream.  Then within DStream
 http://people.apache.org/~pwendell/spark-1.1.0-rc4-docs/api/java/org/apache/s
 park/streaming/dstream/DStream.html  there is a foreachRDD() method that
 allows you to do things like:
 
 msgConvertedToDStream.foreachRDD(rdd = println(The count is:  +
 rdd.count().toInt))
 
 The syntax for the casting should be changed for Java and probably the
 function argument syntax is wrong too, but hopefully there's enough there to
 help.
 
 Jon
 
 
 On Tue, Sep 30, 2014 at 3:42 PM, Andy Davidson a...@santacruzintegration.com
 wrote:
 Hi 
 
 I have a simple streaming app. All I want to do is figure out how many lines
 I have received in the current mini batch. If numLines was a JavaRDD I could
 simply call count(). How do you do something similar in Streaming?
 
 
 
 Here is my psudo code
 
 
 
 JavaDStreamString msg = logs.filter(selectINFO);
 
 JavaDStreamLong numLines  = msg.count()
 
 
 
 Long totalCount = numLines ???
 
 
 
 
 
 Here is what I am really trying to do. I have a python script that generated
 a graph of totalCount vs time. Python does not support streaming. As a work
 around I have a java program that does the steaming. I want to pass the data
 back to the python script. It has been suggested I can use rdd.pipe().
 
 
 
 In python I call rdd.pipe(scriptToStartJavaSteam.sh)
 
 
 
 All I need to do is for each mini batch figure out how to get the the count
 of the current mini batch and write it to standard out. Seems like this
 should be simple.
 
 
 
 Maybe Streams do not work the way I think? In a spark core app, I am able to
 get values like count in my driver and do what ever I want with the local
 value. With streams I know I am getting mini patches because print() display
 the first 10 lines of my steam. I assume that some how print is executed in
 my driver so somehow  data was sent from the workers back to the driver.
 
 
 
 Any comments or suggestions would be greatly appreciated.
 
 
 
 Andy
 
 
 
 P.s. Should I be asking a different question?
 
 
 
 
 
 
 
 
 




Re: how to get actual count from as long from JavaDStream ?

2014-10-01 Thread Andy Davidson
Hi Sean

I guess I am missing something.

JavaDStreamString foo = Š
JavaDStreamLong c = foo.count()

This is circular. I need to get the count as an actual scalar value not a
JavaDStream. Some one else posted psudo code that used foreachRDD() . This
seems to work for me.

Thanks

Andy


From:  Sean Owen so...@cloudera.com
Date:  Wednesday, October 1, 2014 at 2:32 AM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: how to get actual count from as long from JavaDStream ?

 It's much easier than all this. Spark Streaming gives you a DStream of
 RDDs. You want the count for each RDD. DStream.count() gives you
 exactly that: a DStream of Longs which are the counts of events in
 each mini batch.
 
 On Tue, Sep 30, 2014 at 8:42 PM, Andy Davidson
 a...@santacruzintegration.com wrote:
  Hi
 
  I have a simple streaming app. All I want to do is figure out how many lines
  I have received in the current mini batch. If numLines was a JavaRDD I could
  simply call count(). How do you do something similar in Streaming?
 
 
  Here is my psudo code
 
 
 
  JavaDStreamString msg = logs.filter(selectINFO);
 
  JavaDStreamLong numLines  = msg.count()
 
 
  Long totalCount = numLines ???
 
 
 
  Here is what I am really trying to do. I have a python script that generated
  a graph of totalCount vs time. Python does not support streaming. As a work
  around I have a java program that does the steaming. I want to pass the data
  back to the python script. It has been suggested I can use rdd.pipe().
 
 
  In python I call rdd.pipe(scriptToStartJavaSteam.sh)
 
 
  All I need to do is for each mini batch figure out how to get the the count
  of the current mini batch and write it to standard out. Seems like this
  should be simple.
 
 
  Maybe Streams do not work the way I think? In a spark core app, I am able to
  get values like count in my driver and do what ever I want with the local
  value. With streams I know I am getting mini patches because print() display
  the first 10 lines of my steam. I assume that some how print is executed in
  my driver so somehow  data was sent from the workers back to the driver.
 
 
  Any comments or suggestions would be greatly appreciated.
 
 
  Andy
 
 
  P.s. Should I be asking a different question?
 
 
 
 
 
 




can I think of JavaDStream foreachRDD() as being 'for each mini batch' ?

2014-10-01 Thread Andy Davidson
Hi 

I am new to Spark Streaming. Can I think of  JavaDStream foreachRDD() as
being 'for each mini batch¹? The java doc does not say much about this
function.

Here is the background. I am writing a little test program to figure out how
to use streams. At some point I wanted to calculate an aggregate. In my
first attempt my driver did a couple of transformations and tries to get the
count(). I wrote this code like I would a spark core app and added some
logging. My log statements where only executed once, how ever
JavaDStreamprint() appears to run on each mini batch.

When I put my logging and aggregation code inside foreachRDD() things work
as expected my aggegrate and logging appear to be executed on each
minibactch

I am running on a 4 machine cluster.  I create a message with the value of
each mini batch aggregate and use logging and System.out. Given the message
shows up on my console is it safe to assume that this output code is
executing in my driver? The ip shows it is running on my Master.

I thought maybe the message is showing up here because I do not have enough
data in the steam to force load onto the workers?


Any in sites would be greatly appreciated.

Andy




Re: can I think of JavaDStream foreachRDD() as being 'for each mini batch' ?

2014-10-01 Thread Andy Davidson
Hi Sean

Many many thanks. This really clears a lot up for me

Andy

From:  Sean Owen so...@cloudera.com
Date:  Wednesday, October 1, 2014 at 11:27 AM
To:  Andrew Davidson a...@santacruzintegration.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: can I think of JavaDStream foreachRDD() as being 'for each
mini batch' ?

 Yes, foreachRDD will do your something for each RDD, which is what you
 get for each mini-batch of input.
 
 The operations you express on a DStream (or JavaDStream) are all,
 really, for each RDD, including print(). Logging is a little harder
 to reason about since the logging will happen on a potentially remote
 receiver. I am not sure if this explains your observed behavior; it
 depends on what you were logging.
 
 On Wed, Oct 1, 2014 at 6:51 PM, Andy Davidson
 a...@santacruzintegration.com wrote:
  Hi
 
  I am new to Spark Streaming. Can I think of  JavaDStream foreachRDD() as
  being 'for each mini batch¹? The java doc does not say much about this
  function.
 
  Here is the background. I am writing a little test program to figure out how
  to use streams. At some point I wanted to calculate an aggregate. In my
  first attempt my driver did a couple of transformations and tries to get the
  count(). I wrote this code like I would a spark core app and added some
  logging. My log statements where only executed once, how ever
  JavaDStreamprint() appears to run on each mini batch.
 
  When I put my logging and aggregation code inside foreachRDD() things work
  as expected my aggegrate and logging appear to be executed on each
  minibactch
 
  I am running on a 4 machine cluster.  I create a message with the value of
  each mini batch aggregate and use logging and System.out. Given the message
  shows up on my console is it safe to assume that this output code is
  executing in my driver? The ip shows it is running on my Master.
 
  I thought maybe the message is showing up here because I do not have enough
  data in the steam to force load onto the workers?
 
 
  Any in sites would be greatly appreciated.
 
  Andy
 




problem with user@spark.apache.org spam filter

2014-10-03 Thread Andy Davidson
Any idea why my email was returned with the following error message?

Thanks

Andy


This is the mail system at host smtprelay06.hostedemail.com.

I'm sorry to have to inform you that your message could not
be delivered to one or more recipients. It's attached below.

For further assistance, please send mail to postmaster.

If you do so, please include this problem report. You can
delete your own text from the attached returned message.

   The mail system

user@spark.apache.org: host mx1.us.apache.org[140.211.11.136] said: 552
spam
score (6.6) exceeded threshold

(HTML_MESSAGE,MANY_SPAN_IN_TEXT,MIME_QP_LONG_LINE,RCVD_IN_DNSWL_NONE,SPF_NEU
TRAL
(in reply to end of DATA command)





bug with IPython notebook?

2014-10-07 Thread Andy Davidson
Hi

I think I found a bug in the iPython notebook integration. I am not sure how
to report it

I am running spark-1.1.0-bin-hadoop2.4 on an AWS ec2 cluster. I start the
cluster using the launch script provided by spark

I start iPython notebook on my cluster master as follows and use an ssh
tunnel to open the notebook in a browser running on my local computer

ec2-user@ip-172-31-20-107 ~]$ IPYTHON_OPTS=notebook --pylab inline
--no-browser --port=7000 /root/spark/bin/pyspark


Bellow is the code my notebook executes


Bug list:
1. Why do I need to create a SparkContext? If I run pyspark interactively
The context is created automatically for me
2. The print statement causes the output to be displayed in the terminal I
started pyspark, not in the notebooks output
Any comments or suggestions would be greatly appreciated

Thanks

Andy


import sys
from operator import add

from pyspark import SparkContext

# only stand alone jobs should create a SparkContext
sc = SparkContext(appName=pyStreamingSparkRDDPipe²)

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

def echo(data):
print python recieved: %s % (data) # output winds up in the shell
console in my cluster (ie. The machine I launched pyspark from)

rdd.foreach(echo)
print we are done






small bug in pyspark

2014-10-10 Thread Andy Davidson
Hi

I am running spark on an ec2 cluster. I need to update python to 2.7. I have
been following the directions on
http://nbviewer.ipython.org/gist/JoshRosen/6856670
https://issues.apache.org/jira/browse/SPARK-922



I noticed that when I start a shell using pyspark, I correctly got
python2.7, how ever when I tried to start a notebook I got python2.6





change

exec ipython $IPYTHON_OPTS

to

exec ipython2 $IPYTHON_OPTS



One clean way to resolve this would be to add another environmental variable
like PYSPARK_PYTHON



Andy





P.s. Matplotlab does not upgrade because of dependency problems. I¹ll let
you know once I get this resolved








Re: Getting spark to use more than 4 cores on Amazon EC2

2014-10-22 Thread Andy Davidson
On a related note, how are you submitting your job?

I have a simple streaming proof of concept and noticed that everything runs
on my master. I wonder if I do not have enough load for spark to push tasks
to the slaves. 

Thanks

Andy

From:  Daniel Mahler dmah...@gmail.com
Date:  Monday, October 20, 2014 at 5:22 PM
To:  Nicholas Chammas nicholas.cham...@gmail.com
Cc:  user user@spark.apache.org
Subject:  Re: Getting spark to use more than 4 cores on Amazon EC2

 I am using globs though
 
 raw = sc.textFile(/path/to/dir/*/*)
 
 and I have tons of files so 1 file per partition should not be a problem.
 
 On Mon, Oct 20, 2014 at 7:14 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:
 The biggest danger with gzipped files is this:
  raw = sc.textFile(/path/to/file.gz, 8)
  raw.getNumPartitions()
 1
 You think you’re telling Spark to parallelize the reads on the input, but
 Spark cannot parallelize reads against gzipped files. So 1 gzipped file gets
 assigned to 1 partition.
 
 It might be a nice user hint if Spark warned when parallelism is disabled by
 the input format.
 
 Nick
 
 ​
 
 On Mon, Oct 20, 2014 at 6:53 PM, Daniel Mahler dmah...@gmail.com wrote:
 Hi Nicholas,
 
 Gzipping is a an impressive guess! Yes, they are.
 My data sets are too large to make repartitioning viable, but I could try it
 on a subset.
 I generally have many more partitions than cores.
 This was happenning before I started setting those configs.
 
 thanks
 Daniel
 
 
 On Mon, Oct 20, 2014 at 5:37 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 Are you dealing with gzipped files by any chance? Does explicitly
 repartitioning your RDD to match the number of cores in your cluster help
 at all? How about if you don't specify the configs you listed and just go
 with defaults all around?
 
 On Mon, Oct 20, 2014 at 5:22 PM, Daniel Mahler dmah...@gmail.com wrote:
 I launch the cluster using vanilla spark-ec2 scripts.
 I just specify the number of slaves and instance type
 
 On Mon, Oct 20, 2014 at 4:07 PM, Daniel Mahler dmah...@gmail.com wrote:
 I usually run interactively from the spark-shell.
 My data definitely has more than enough partitions to keep all the
 workers busy.
 When I first launch the cluster I first do:
 
 +
 cat EOF ~/spark/conf/spark-defaults.conf
 spark.serializerorg.apache.spark.serializer.KryoSerializer
 spark.rdd.compress  true
 spark.shuffle.consolidateFiles  true
 spark.akka.frameSize  20
 EOF
 
 copy-dir /root/spark/conf
 spark/sbin/stop-all.sh
 sleep 5
 spark/sbin/start-all.sh
 +
 
 before starting the spark-shell or running any jobs.
 
 
 
 
 On Mon, Oct 20, 2014 at 2:57 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 Perhaps your RDD is not partitioned enough to utilize all the cores in
 your system.
 
 Could you post a simple code snippet and explain what kind of
 parallelism you are seeing for it? And can you report on how many
 partitions your RDDs have?
 
 On Mon, Oct 20, 2014 at 3:53 PM, Daniel Mahler dmah...@gmail.com
 wrote:
 
 I am launching EC2 clusters using the spark-ec2 scripts.
 My understanding is that this configures spark to use the available
 resources.
 I can see that spark will use the available memory on larger istance
 types.
 However I have never seen spark running at more than 400% (using 100% on
 4 cores)
 on machines with many more cores.
 Am I misunderstanding the docs? Is it just that high end ec2 instances
 get I/O starved when running spark? It would be strange if that
 consistently produced a 400% hard limit though.
 
 thanks
 Daniel
 
 
 
 
 
 
 




java how to configure streaming.dstream.DStream<> saveAsTextFiles() to work with hdfs?

2015-10-24 Thread Andy Davidson
Hi 

I am using spark streaming in Java. One of the problems I have is I need to
save twitter status in JSON format as I receive them

When I run the following code on my local machine. It work how ever all the
output files are created in the current directory of the driver program.
Clearly not a good cluster solution. Any idea how I can configure spark so
that it will write the output to hdfs?

JavaDStream tweets =
TwitterFilterQueryUtils.createStream(ssc, twitterAuth);

DStream dStream = tweets.dstream();

String prefix = ³MyPrefix";

String suffix = "json";

//  this works, when I test locally the files are created in the current
directory  of driver program

dStream.saveAsTextFiles(prefix, suffix);



Is there something I need to set on my SparkConf object?



Kind regards



andy




spark streaming 1.51. uses very old version of twitter4j

2015-10-21 Thread Andy Davidson
While digging around the spark source today I discovered it depends on
version 3.0.3 of twitter4j. This version was released on dec 2 2012. I
noticed that the current version is 4.0.4 and was released on 6/23/2015

I am not aware of any particular problems. Are they any plans to upgrade?
What is the spark policy on upgrading dependencies in general?

Kind regards

Andy

https://github.com/yusuke/twitter4j/releases

http://twitter4j.org/en/index.html#sourceCode


Maven Integration
You can integrate the latest Twitter4J build easily by just including the
following lines in your pom.xml.
   
  
   org.twitter4j
   twitter4j-core
   [4.0,)
   
   ...
   




Re: newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-29 Thread Andy Davidson
Hi Robin and Sabarish

I figure out what the problem

To submit my java app so that it runs in cluster mode (ie. I can close my
laptop and go home) I need to do the following


1. make sure my jar file is available on all the slaves. Spark-submit will
cause my driver to run on a slave, It will not automatically copy my jar
file to slaves. I found placing the jar in hdfs the easiest way to handle
this
2. I needed to pass the command argument ‹deploy-mode cluster
3. Use specify the path the jar file as a url hdfs://

Here are two tricks to figure out the correct URL for master
1. If you know the name of your cluster you can find the public DNS name for
your master. By default use port 7077
cd spark-1.5.1-bin-hadoop2.6/ec2

$ spark-ec2 get-master --region=us-west-1 streamingDC
Searching for existing cluster streamingDC in region us-west-1...
Found 1 master, 3 slaves.
ec2-54-251-207-123.us-west-1.compute.amazonaws.com
$ 
2. If you know the public DNS name of the master go to
http://mastername..compute.amazonaws.com/8080 . The Title should be the
correct url (ie. Port 7077)


On master
/root/ephemeral-hdfs/bin/hadoop fs -mkdir /home/ec2-user/sparkExamples
/root/ephemeral-hdfs/bin/hadoop fs -put sparkPi-1.0-SNAPSHOT.jar
/home/ec2-user/sparkExamples
/root/ephemeral-hdfs/bin/hadoop fs -ls /home/ec2-user/sparkExamples
$SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077
--deploy-mode cluster
hdfs:///home/ec2-user/sparkExamples/sparkPi-1.0-SNAPSHOT.jar 100

Running Spark using the REST application submission protocol.
15/10/29 16:39:08 INFO rest.RestSubmissionClient: Submitting a request to
launch an application in
spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077.
15/10/29 16:39:09 WARN rest.RestSubmissionClient: Unable to connect to
server spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077.
Warning: Master endpoint
spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077 was not a
REST server. Falling back to legacy submission gateway instead.
[ec2-user@ip-172-31-29-60 ~]$
I really appreciate everyone¹s help

Andy




newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-28 Thread Andy Davidson
Hi



I just created new cluster using the spark-c2 script from the
spark-1.5.1-bin-hadoop2.6 distribution. The master and slaves seem to be up
and running. I am having a heck of time figuring out how to submit apps. As
a test I compile the sample JavaSparkPi example. I have copied my jar file
to the master and want to run the application in cluster mode. My real app
will take a long time to complete. I do not want to wait around.



Any idea what the issue is?



Kind regards



Andy





http://spark.apache.org/docs/latest/submitting-applications.html


The following command works fine on my Mac, how ever when I run it on my
master I get the following warning. The app works correctly

[ec2-user@ip-172-31-29-60 ~]$ $SPARK_ROOT/bin/spark-submit --class
org.apache.spark.examples.JavaSparkPi --master local[4]
sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee pi.out

15/10/28 21:07:10 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:07:11 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



Adding ‹num-exactors I still get the same warning. The app works correctly



 $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master local[4] --num-executors 4 sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee
pi.numExecutor4.out

15/10/28 21:09:41 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:09:41 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



I also tried variations on [ec2-user@ip-172-31-29-60 ~]$
$SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master spark://172.31.29.60:7077 --num-executors 4
sparkPi-1.0-SNAPSHOT.jar

15/10/28 21:14:48 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:14:48 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



15/10/28 21:14:48 INFO spark.SecurityManager: Changing view acls to:
ec2-user

15/10/28 21:14:48 INFO spark.SecurityManager: Changing modify acls to:
ec2-user

15/10/28 21:14:48 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(ec2-user); users with modify permissions: Set(ec2-user)

15/10/28 21:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started

15/10/28 21:14:49 INFO Remoting: Starting remoting

15/10/28 21:14:50 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@172.31.29.60:52405]

15/10/28 21:14:50 INFO util.Utils: Successfully started service
'sparkDriver' on port 52405.

15/10/28 21:14:50 INFO spark.SparkEnv: Registering MapOutputTracker

15/10/28 21:14:50 INFO spark.SparkEnv: Registering BlockManagerMaster

15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
/mnt/spark/blockmgr-e6197751-e3a2-40b7-8228-3512ffe2b69d

15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
/mnt2/spark/blockmgr-9547279f-c011-44e2-9c6e-295f6b36b084

15/10/28 21:14:50 INFO storage.MemoryStore: MemoryStore started with
capacity 530.0 MB

15/10/28 21:14:50 INFO spark.HttpFileServer: HTTP File server directory is
/mnt/spark/spark-60c478cd-7adb-4d92-96e4-aad52eaaf8bf/httpd-71c01fdc-0e5f-4a
73-83f0-bac856bc3548

15/10/28 21:14:50 INFO spark.HttpServer: Starting HTTP Server

15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT

15/10/28 21:14:50 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:48262

15/10/28 21:14:50 INFO util.Utils: Successfully started service 'HTTP file
server' on port 48262.

15/10/28 21:14:50 INFO spark.SparkEnv: Registering OutputCommitCoordinator

15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT

15/10/28 21:14:50 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040

15/10/28 21:14:50 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.

15/10/28 21:14:50 INFO ui.SparkUI: Started SparkUI at
http://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:4040

15/10/28 21:14:50 INFO spark.SparkContext: Added JAR
file:/home/ec2-user/sparkPi-1.0-SNAPSHOT.jar at
http://172.31.29.60:48262/jars/sparkPi-1.0-SNAPSHOT.jar with timestamp
1446066890783

15/10/28 21:14:50 WARN metrics.MetricsSystem: Using default name
DAGScheduler for source because spark.app.id is 

Re: newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-28 Thread Andy Davidson
I forgot to mention. I do not have a preference for the cluster manager. I
choose the spark-1.5.1-bin-hadoop2.6 distribution because I want to use
hdfs. I assumed this distribution would use yarn.

Thanks

Andy

From:  Andrew Davidson 
Date:  Wednesday, October 28, 2015 at 2:37 PM
To:  "user@spark.apache.org" 
Subject:  newbie trouble submitting java app to AWS cluster I created using
spark-ec2  script from spark-1.5.1-bin-hadoop2.6 distribution

> Hi
> 
> 
> 
> I just created new cluster using the spark-c2 script from the
> spark-1.5.1-bin-hadoop2.6 distribution. The master and slaves seem to be up
> and running. I am having a heck of time figuring out how to submit apps. As a
> test I compile the sample JavaSparkPi example. I have copied my jar file to
> the master and want to run the application in cluster mode. My real app will
> take a long time to complete. I do not want to wait around.
> 
> 
> 
> Any idea what the issue is?
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 
> 
> 
> 
> http://spark.apache.org/docs/latest/submitting-applications.html
> 
> 
> The following command works fine on my Mac, how ever when I run it on my
> master I get the following warning. The app works correctly
> 
> [ec2-user@ip-172-31-29-60 ~]$ $SPARK_ROOT/bin/spark-submit --class
> org.apache.spark.examples.JavaSparkPi --master local[4]
> sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee pi.out
> 
> 15/10/28 21:07:10 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:07:11 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> Adding ‹num-exactors I still get the same warning. The app works correctly
> 
> 
> 
>  $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
> --master local[4] --num-executors 4 sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee
> pi.numExecutor4.out
> 
> 15/10/28 21:09:41 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:09:41 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> I also tried variations on [ec2-user@ip-172-31-29-60 ~]$
> $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
> --master spark://172.31.29.60:7077 --num-executors 4 sparkPi-1.0-SNAPSHOT.jar
> 
> 15/10/28 21:14:48 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:14:48 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: Changing view acls to: ec2-user
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: Changing modify acls to:
> ec2-user
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(ec2-user); users
> with modify permissions: Set(ec2-user)
> 
> 15/10/28 21:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 
> 15/10/28 21:14:49 INFO Remoting: Starting remoting
> 
> 15/10/28 21:14:50 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@172.31.29.60:52405]
> 
> 15/10/28 21:14:50 INFO util.Utils: Successfully started service 'sparkDriver'
> on port 52405.
> 
> 15/10/28 21:14:50 INFO spark.SparkEnv: Registering MapOutputTracker
> 
> 15/10/28 21:14:50 INFO spark.SparkEnv: Registering BlockManagerMaster
> 
> 15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
> /mnt/spark/blockmgr-e6197751-e3a2-40b7-8228-3512ffe2b69d
> 
> 15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
> /mnt2/spark/blockmgr-9547279f-c011-44e2-9c6e-295f6b36b084
> 
> 15/10/28 21:14:50 INFO storage.MemoryStore: MemoryStore started with capacity
> 530.0 MB
> 
> 15/10/28 21:14:50 INFO spark.HttpFileServer: HTTP File server directory is
> /mnt/spark/spark-60c478cd-7adb-4d92-96e4-aad52eaaf8bf/httpd-71c01fdc-0e5f-4a73
> -83f0-bac856bc3548
> 
> 15/10/28 21:14:50 INFO spark.HttpServer: Starting HTTP Server
> 
> 15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 
> 15/10/28 21:14:50 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:48262
> 
> 15/10/28 21:14:50 

streaming.twitter.TwitterUtils what is the best way to save twitter status to HDFS?

2015-10-23 Thread Andy Davidson
I need to save the twitter status I receive so that I can do additional
batch based processing on them in the future. Is it safe to assume HDFS is
the best way to go?

Any idea what is the best way to save twitter status to HDFS?

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(1000));

Authorization twitterAuth = setupTwitterAuthorization();

JavaDStream tweets =
TwitterFilterQueryUtils.createStream(ssc, twitterAuth, query);



http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-
operations-on-dstreams



saveAsHadoopFiles(prefix, [suffix])Save this DStream's contents as Hadoop
files. The file name at each batch interval is generated based on prefix and
suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.


How ever JavaDStream<> does not support any savesAs* functions



DStream dStream = tweets.dstream();


http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/dstr
eam/DStream.html
DStream only supports saveAsObjectFiles

()and saveAsTextFiles
 (()


saveAsTextFiles
public void saveAsTextFiles(java.lang.String prefix,
   java.lang.String suffix)
Save each RDD in this DStream as at text file, using string representation
of elements. The file name at each batch interval is generated based on
prefix andsuffix: "prefix-TIME_IN_MS.suffix².


Any idea where I would find these files? I assume they will be spread out
all over my cluster?


Also I wonder if using the saveAs*() functions are going to cause other
problems. My duration is set to 1 sec. Am I going to overwhelm the system
with a bunch of tiny files? Many of them will be empty



Kind regards



Andy




problems with spark 1.5.1 streaming TwitterUtils.createStream()

2015-10-21 Thread Andy Davidson
Hi

I want to use twitters public streaming api to follow a set of ids. I want
to implement my driver using java. The current TwitterUtils is a wrapper
around twitter4j and does not expose the full twitter streaming api.

I started by digging through the source code. Unfortunately I do not know
scala

spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterUtils.scala
spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterInputDStream.scala

   String[] filter = {³topic1², ³topic2²};

JavaDStream tweets = TwitterUtils.createStream(ssc,
twitterAuth, filter);


Does anyone know why filters is defined as String[]? Internally spark
creates a twitter4J FilterQueryClass. Ideally I would like to pass an
filterQuery object. It exposes the part of the twitter streaming api I need
to use to follow a set of user.

Here is a link to the 4.0.4 version of the java doc
http://twitter4j.org/oldjavadocs/4.0.4/index.html

Turns out spark 1.5.1 uses version 3.0.3.
http://twitter4j.org/oldjavadocs/3.0.3/index.html . Both versions implement
java.io.Serializable

I put a comment in where I think the change needs to go. It looks like it
might be trivial. I guess in the short term I can try and rewrite the
TwitterUtils and TwitterReceiver to do what I need to do in Java

Thanks in advance

Andy

object TwitterUtils {

  /**

   * Create a input stream that returns tweets received from Twitter.

   * @param ssc StreamingContext object

   * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's
default OAuth

   *authorization; this uses the system properties
twitter4j.oauth.consumerKey,

   *twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and

   *twitter4j.oauth.accessTokenSecret

   * @param filters Set of filter strings to get only those tweets that
match them

   * @param storageLevel Storage level to use for storing the received
objects

   */

  def createStream(

  ssc: StreamingContext,

  twitterAuth: Option[Authorization],

  filters: Seq[String] = Nil, // ??? Can we pass a FilterQuery object
instead

  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

): ReceiverInputDStream[Status] = {

new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)

  }





private[streaming]

class TwitterReceiver(

twitterAuth: Authorization,

filters: Seq[String],

storageLevel: StorageLevel

  ) extends Receiver[Status](storageLevel) with Logging {



  @volatile private var twitterStream: TwitterStream = _

  @volatile private var stopped = false



  def onStart() {

try {

  val newTwitterStream = new
TwitterStreamFactory().getInstance(twitterAuth)

  newTwitterStream.addListener(new StatusListener {

def onStatus(status: Status): Unit = {

  store(status)

}

// Unimplemented

def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

def onTrackLimitationNotice(i: Int) {}

def onScrubGeo(l: Long, l1: Long) {}

def onStallWarning(stallWarning: StallWarning) {}

def onException(e: Exception) {

  if (!stopped) {

restart("Error receiving tweets", e)

  }

}

  })



  val query = new FilterQuery // ??? Can we pass a FilterQuery object
instead

  if (filters.size > 0) {

query.track(filters.toArray)

newTwitterStream.filter(query)

  } else {

newTwitterStream.sample()

  }

  setTwitterStream(newTwitterStream)

  logInfo("Twitter receiver started")

  stopped = false

} catch {

  case e: Exception => restart("Error starting Twitter stream", e)

}

  }








though experiment: Can I use spark streaming to replace all of my rest services?

2015-11-10 Thread Andy Davidson
I just finished watching a great presentation from a recent spark summit on
real time movie recommendations using spark.
https://spark-summit.org/east-2015/talk/real-time-recommendations-using-spar
k . For the purpose of email I am going to really simplify what they did. In
general their real time system took in data about what all users are
watching and calculates the most popular/trending shows. The results are
stored in a data base. When an individual user goes to ³movie guide² they
read the top 10 recommendations from a database.

My guess is the part of their system that services up recommendations to
users in real time is not implemented using spark. Its probably a bunch of
rest servers sitting behind a bunch of proxy servers and load balancers. The
rest servers read the recommendations calculated using spark streaming.

This got me thinking. So in general we have spark handling batch, ingestion
of real time data but not the part of the system that delivers the real time
user experience. Ideally I would like to have one unified platform.

Using spark streaming with a small window size of say 100 ms would meet my
SLA. Each window is going to contain many unrelated requests. In the
recommender system example map() would look up the user specific
recommendation for each request. The trick is how to return the response to
the correctly ³client². I could publish the response to some other system
(kafka? Or custom proxy?) that can truly return the data to the client. Is
this a good idea? What do people do in practice?

Also I assume I would have to use rdd.foreach() to some how mark the cause
the response data to be sent to the correct client.

Comments and suggestions appreciated.

Kind regards

Andy




thought experiment: use spark ML to real time prediction

2015-11-10 Thread Andy Davidson
Lets say I have use spark ML to train a linear model. I know I can save and
load the model to disk. I am not sure how I can use the model in a real time
environment. For example I do not think I can return a ³prediction² to the
client using spark streaming easily. Also for some applications the extra
latency created by the batch process might not be acceptable.

If I was not using spark I would re-implement the model I trained in my
batch environment in a lang like Java  and implement a rest service that
uses the model to create a prediction and return the prediction to the
client. Many models make predictions using linear algebra. Implementing
predictions is relatively easy if you have a good vectorized LA package. Is
there a way to use a model I trained using spark ML outside of spark?

As a motivating example, even if its possible to return data to the client
using spark streaming. I think the mini batch latency would not be
acceptable for a high frequency stock trading system.

Kind regards

Andy

P.s. The examples I have seen so far use spark streaming to ³preprocess²
predictions. For example a recommender system might use what current users
are watching to calculate ³trending recommendations². These are stored on
disk and served up to users when the use the ³movie guide². If a
recommendation was a couple of min. old it would not effect the end users
experience.





Re: How to configure logging...

2015-11-11 Thread Andy Davidson
Hi Hitoshi

Looks like you have read
http://spark.apache.org/docs/latest/configuration.html#configuring-logging

On my ec2 cluster I need to also do the following. I think my notes are not
complete. I think you may also need to restart your cluster

Hope this helps

Andy


#
# setting up logger so logging goes to file, makes demo easier to understand
#
ssh -i $KEY_FILE root@$MASTER
cp /home/ec2-user/log4j.properties  /root/spark/conf/
for i in `cat /root/spark/conf/slaves`; do scp
/home/ec2-user/log4j.properties root@$i:/home/ec2-user/log4j.properties;
done


#
# restart spark
#
/root/spark/sbin/stop-all.sh
/root/spark/sbin/start-all.sh


From:  Hitoshi 
Date:  Tuesday, November 10, 2015 at 1:22 PM
To:  "user @spark" 
Subject:  Re: How to configure logging...

> I don't have akka but with just Spark, I just edited log4j.properties to
> "log4j.rootCategory=ERROR, console" and ran the following command and was
> able to get only the Time row as output.
> 
> run-example org.apache.spark.examples.streaming.JavaNetworkWordCount
> localhost 
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-configure-logging-t
> p25346p25348.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: streaming: missing data. does saveAsTextFile() append or replace?

2015-11-09 Thread Andy Davidson
Thank Gerard

I¹ll give that a try. It seems like this approach is going to create a very
large number of files. I guess I could write a cron job to concatenate the
files by hour or maybe days. I imagine this is a common problem. Do you know
of something that does this already ?

I am using the stand alone cluster manager. I do not think it directly
supports cron job/table functionality. It should be easy to use the hdfs api
and linux crontab or may https://quartz-scheduler.org/

Kind regards

andy

From:  Gerard Maas <gerard.m...@gmail.com>
Date:  Sunday, November 8, 2015 at 2:13 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: streaming: missing data. does saveAsTextFile() append or
replace?

> Andy,
> 
> Using the rdd.saveAsTextFile(...)  will overwrite the data if your target is
> the same file.
> 
> If you want to save to HDFS, DStream offers dstream.saveAsTextFiles(prefix,
> suffix)  where a new file will be written at each streaming interval.
> Note that this will result in a saved file for each streaming interval. If you
> want to increase the file size (usually a good idea in HDFS), you can use a
> window function over the dstream and save the 'windowed'  dstream instead.
> 
> kind regards, Gerard.
> 
> On Sat, Nov 7, 2015 at 10:55 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi
>> 
>> I just started a new spark streaming project. In this phase of the system all
>> we want to do is save the data we received to hdfs. I after running for a
>> couple of days it looks like I am missing a lot of data. I wonder if
>> saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
>> in previous window? I noticed that after running for a couple of days  my
>> hdfs file system has 25 file. The names are something like ³part-6². I
>> used 'hadoop fs ­dus¹ to check the total data captured. While the system was
>> running I would periodically call Œdus¹ I was surprised sometimes the numbers
>> of total bytes actually dropped.
>> 
>> 
>> Is there a better way to save write my data to disk?
>> 
>> Any suggestions would be appreciated
>> 
>> Andy
>> 
>> 
>>public static void main(String[] args) {
>> 
>>   SparkConf conf = new SparkConf().setAppName(appName);
>> 
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>> 
>> JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
>> Duration(5 * 1000));
>> 
>> 
>> 
>> [ deleted code Š]
>> 
>> 
>> 
>> data.foreachRDD(new Function<JavaRDD, Void>(){
>> 
>> private static final long serialVersionUID =
>> -7957854392903581284L;
>> 
>> 
>> 
>> @Override
>> 
>> public Void call(JavaRDD jsonStr) throws Exception {
>> 
>> jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
>> /rawSteamingData is a directory
>> 
>> return null;
>> 
>> }   
>> 
>> });
>> 
>> 
>> 
>> ssc.checkpoint(checkPointUri);
>> 
>> 
>> 
>> ssc.start();
>> 
>> ssc.awaitTermination();
>> 
>> }
> 




does spark ML have some thing like createDataPartition() in R caret package ?

2015-11-13 Thread Andy Davidson
In R, its easy to split a data set into training, crossValidation, and test
set. Is there something like this in spark.ml? I am using python of now.

My real problem is I want to randomly select a relatively small data set to
do some initial data exploration. Its not clear to me how using spark I
could create a random sample from a large data set. I would prefer to sample
with out replacement.

I have not tried to use sparkR yet. I assume I would not be able to use the
caret package with spark ML

Kind regards

Andy

```{R}
   inTrain <- createDataPartition(y=csv$classe, p=0.7, list=FALSE)
trainSetDF <- csv[inTrain,]
testSetDF <- csv[-inTrain,]
```





Re: bin/pyspark SparkContext is missing?

2015-11-16 Thread Andy Davidson
Thanks

andy

From:  Davies Liu <dav...@databricks.com>
Date:  Friday, November 13, 2015 at 3:42 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: bin/pyspark SparkContext is missing?

> You forgot to create a SparkContext instance:
> 
> sc = SparkContext()
> 
> On Tue, Nov 3, 2015 at 9:59 AM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
>>  I am having a heck of a time getting Ipython notebooks to work on my 1.5.1
>>  AWS cluster I created using spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
>> 
>>  I have read the instructions for using iPython notebook on
>>  http://spark.apache.org/docs/latest/programming-guide.html#using-the-shell
>> 
>>  I want to run the notebook server on my master and use an ssh tunnel to
>>  connect a web browser running on my mac.
>> 
>>  I am confident the cluster is set up correctly because the sparkPi example
>>  runs.
>> 
>>  I am able to use IPython notebooks on my local mac and work with spark and
>>  local files with out any problems.
>> 
>>  I know the ssh tunnel is working.
>> 
>>  On my cluster I am able to use python shell in general
>> 
>>  [ec2-user@ip-172-31-29-60 dataScience]$ /root/spark/bin/pyspark --master
>>  local[2]
>> 
>> 
>>>>>  from pyspark import SparkContext
>> 
>>>>>  textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.txt")
>> 
>>>>>  textFile.take(1)
>> 
>> 
>> 
>>  When I run the exact same code in iPython notebook I get
>> 
>>  ---
>>  NameError Traceback (most recent call last)
>>   in ()
>>   11 from pyspark import SparkContext, SparkConf
>>   12
>>  ---> 13 textFile =
>>  sc.textFile("file:///home/ec2-user/dataScience/readme.txt")
>>   14
>>   15 textFile.take(1)
>> 
>>  NameError: name 'sc' is not defined
>> 
>> 
>> 
>> 
>>  To try an debug I wrote a script to launch pyspark and added Œset ­x¹ to
>>  pyspark so I could see what the script was doing
>> 
>>  Any idea how I can debug this?
>> 
>>  Thanks in advance
>> 
>>  Andy
>> 
>>  $ cat notebook.sh
>> 
>>  set -x
>> 
>>  export PYSPARK_DRIVER_PYTHON=ipython
>> 
>>  export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7000"
>> 
>>  /root/spark/bin/pyspark --master local[2]
>> 
>> 
>> 
>> 
>>  [ec2-user@ip-172-31-29-60 dataScience]$ ./notebook.sh
>> 
>>  ++ export PYSPARK_DRIVER_PYTHON=ipython
>> 
>>  ++ PYSPARK_DRIVER_PYTHON=ipython
>> 
>>  ++ export 'PYSPARK_DRIVER_PYTHON_OPTS=notebook --no-browser --port=7000'
>> 
>>  ++ PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=7000'
>> 
>>  ++ /root/spark/bin/pyspark --master 'local[2]'
>> 
>>  +++ dirname /root/spark/bin/pyspark
>> 
>>  ++ cd /root/spark/bin/..
>> 
>>  ++ pwd
>> 
>>  + export SPARK_HOME=/root/spark
>> 
>>  + SPARK_HOME=/root/spark
>> 
>>  + source /root/spark/bin/load-spark-env.sh
>> 
>>   dirname /root/spark/bin/pyspark
>> 
>>  +++ cd /root/spark/bin/..
>> 
>>  +++ pwd
>> 
>>  ++ FWDIR=/root/spark
>> 
>>  ++ '[' -z '' ']'
>> 
>>  ++ export SPARK_ENV_LOADED=1
>> 
>>  ++ SPARK_ENV_LOADED=1
>> 
>>   dirname /root/spark/bin/pyspark
>> 
>>  +++ cd /root/spark/bin/..
>> 
>>  +++ pwd
>> 
>>  ++ parent_dir=/root/spark
>> 
>>  ++ user_conf_dir=/root/spark/conf
>> 
>>  ++ '[' -f /root/spark/conf/spark-env.sh ']'
>> 
>>  ++ set -a
>> 
>>  ++ . /root/spark/conf/spark-env.sh
>> 
>>  +++ export JAVA_HOME=/usr/java/latest
>> 
>>  +++ JAVA_HOME=/usr/java/latest
>> 
>>  +++ export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
>> 
>>  +++ SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark
>> 
>>  +++ export SPARK_MASTER_OPTS=
>> 
>>  +++ SPARK_MASTER_OPTS=
>> 
>>  +++ '[' -n 1 ']'
>> 
>>  +++ export SPARK_WORKER_INSTANCES=1
>> 
>>  +++ SPARK_WORKER_INSTANCES=1
>> 
>>  +++ export SPARK_WORKER_CORES=2
>> 
>>  +++ SPARK_WORKER_CORES=2
>> 
>>  +++ export HADOOP_HOME=/root/ephemeral-hdfs
>> 
>>  +++ HADOOP_HOME=/root/ephemeral-hdfs
>> 
>>  +++ exp

how to run RStudio or RStudio Server on ec2 cluster?

2015-11-04 Thread Andy Davidson
Hi

I just set up a spark cluster on AWS ec2 cluster.  In the past I have done a
lot of work using RStudio on my local machine. Bin/sparkR looks interesting
how ever it looks like you just get an R command line interpreter. Does
anyone have an experience using something like RStudio or Rstudio server and
sparkR?


In an ideal world I would like to use some sort of IDE for R running on my
local machine connected to my spark cluster.

In spark 1.3.1 It was easy to get a similar environment working for python.
I would use pyspark to start an iPython notebook server on my cluster
master. Then set up an SSH tunnel on my local machine to connect a browser
on my local machine to the notebook server.

Any comments or suggestions would be greatly appreciated

Andy





bin/pyspark SparkContext is missing?

2015-11-03 Thread Andy Davidson
I am having a heck of a time getting Ipython notebooks to work on my 1.5.1
AWS cluster I created using spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2

I have read the instructions for using iPython notebook on
http://spark.apache.org/docs/latest/programming-guide.html#using-the-shell

I want to run the notebook server on my master and use an ssh tunnel to
connect a web browser running on my mac.

I am confident the cluster is set up correctly because the sparkPi example
runs. 

I am able to use IPython notebooks on my local mac and work with spark and
local files with out any problems.

I know the ssh tunnel is working.

On my cluster I am able to use python shell in general

[ec2-user@ip-172-31-29-60 dataScience]$ /root/spark/bin/pyspark --master
local[2]



>>> from pyspark import SparkContext

>>> textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.txt")

>>> textFile.take(1)



When I run the exact same code in iPython notebook I get

---
NameError Traceback (most recent call last)
 in ()
 11 from pyspark import SparkContext, SparkConf
 12 
---> 13 textFile = 
sc.textFile("file:///home/ec2-user/dataScience/readme.txt")
 14 
 15 textFile.take(1)

NameError: name 'sc' is not defined



To try an debug I wrote a script to launch pyspark and added Œset ­x¹ to
pyspark so I could see what the script was doing

Any idea how I can debug this?

Thanks in advance

Andy

$ cat notebook.sh

set -x

export PYSPARK_DRIVER_PYTHON=ipython

export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=7000"

/root/spark/bin/pyspark --master local[2]




[ec2-user@ip-172-31-29-60 dataScience]$ ./notebook.sh

++ export PYSPARK_DRIVER_PYTHON=ipython

++ PYSPARK_DRIVER_PYTHON=ipython

++ export 'PYSPARK_DRIVER_PYTHON_OPTS=notebook --no-browser --port=7000'

++ PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=7000'

++ /root/spark/bin/pyspark --master 'local[2]'

+++ dirname /root/spark/bin/pyspark

++ cd /root/spark/bin/..

++ pwd

+ export SPARK_HOME=/root/spark

+ SPARK_HOME=/root/spark

+ source /root/spark/bin/load-spark-env.sh

 dirname /root/spark/bin/pyspark

+++ cd /root/spark/bin/..

+++ pwd

++ FWDIR=/root/spark

++ '[' -z '' ']'

++ export SPARK_ENV_LOADED=1

++ SPARK_ENV_LOADED=1

 dirname /root/spark/bin/pyspark

+++ cd /root/spark/bin/..

+++ pwd

++ parent_dir=/root/spark

++ user_conf_dir=/root/spark/conf

++ '[' -f /root/spark/conf/spark-env.sh ']'

++ set -a

++ . /root/spark/conf/spark-env.sh

+++ export JAVA_HOME=/usr/java/latest

+++ JAVA_HOME=/usr/java/latest

+++ export SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark

+++ SPARK_LOCAL_DIRS=/mnt/spark,/mnt2/spark

+++ export SPARK_MASTER_OPTS=

+++ SPARK_MASTER_OPTS=

+++ '[' -n 1 ']'

+++ export SPARK_WORKER_INSTANCES=1

+++ SPARK_WORKER_INSTANCES=1

+++ export SPARK_WORKER_CORES=2

+++ SPARK_WORKER_CORES=2

+++ export HADOOP_HOME=/root/ephemeral-hdfs

+++ HADOOP_HOME=/root/ephemeral-hdfs

+++ export 
SPARK_MASTER_IP=ec2-54-215-207-132.us-west-1.compute.amazonaws.com

+++ SPARK_MASTER_IP=ec2-54-215-207-132.us-west-1.compute.amazonaws.com

 cat /root/spark-ec2/cluster-url

+++ export 
MASTER=spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077

+++ MASTER=spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077

+++ export SPARK_SUBMIT_LIBRARY_PATH=:/root/ephemeral-hdfs/lib/native/

+++ SPARK_SUBMIT_LIBRARY_PATH=:/root/ephemeral-hdfs/lib/native/

+++ export SPARK_SUBMIT_CLASSPATH=::/root/ephemeral-hdfs/conf

+++ SPARK_SUBMIT_CLASSPATH=::/root/ephemeral-hdfs/conf

 wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname

+++ export 
SPARK_PUBLIC_DNS=ec2-54-215-207-132.us-west-1.compute.amazonaws.com

+++ SPARK_PUBLIC_DNS=ec2-54-215-207-132.us-west-1.compute.amazonaws.com

+++ export YARN_CONF_DIR=/root/ephemeral-hdfs/conf

+++ YARN_CONF_DIR=/root/ephemeral-hdfs/conf

 id -u

+++ '[' 222 == 0 ']'

++ set +a

++ '[' -z '' ']'

++ ASSEMBLY_DIR2=/root/spark/assembly/target/scala-2.11

++ ASSEMBLY_DIR1=/root/spark/assembly/target/scala-2.10

++ [[ -d /root/spark/assembly/target/scala-2.11 ]]

++ '[' -d /root/spark/assembly/target/scala-2.11 ']'

++ export SPARK_SCALA_VERSION=2.10

++ SPARK_SCALA_VERSION=2.10

+ export '_SPARK_CMD_USAGE=Usage: ./bin/pyspark [options]'

+ _SPARK_CMD_USAGE='Usage: ./bin/pyspark [options]'

+ hash python2.7

+ DEFAULT_PYTHON=python2.7

+ [[ -n '' ]]

+ [[ '' == \1 ]]

+ [[ -z ipython ]]

+ [[ -z '' ]]

+ [[ ipython == *ipython* ]]

+ [[ python2.7 != \p\y\t\h\o\n\2\.\7 ]]

+ PYSPARK_PYTHON=python2.7

+ export PYSPARK_PYTHON

+ export PYTHONPATH=/root/spark/python/:

+ PYTHONPATH=/root/spark/python/:

+ export 
PYTHONPATH=/root/spark/python/lib/py4j-0.8.2.1-src.zip:/root/spark/python/:

+ 
PYTHONPATH=/root/spark/python/lib/py4j-0.8.2.1-src.zip:/root/spark/python/:

+ export OLD_PYTHONSTARTUP=

+ OLD_PYTHONSTARTUP=

+ export 

best practices machine learning with python 2 or 3?

2015-11-03 Thread Andy Davidson
I am fairly new to python and am starting a new project that will want to
make use of Spark and the python machine learning libraries (matplotlib,
pandas, Š) . I noticed that the spark-c2 script set up my AWS cluster with
python 2.6 and 2.7

http://spark.apache.org/docs/latest/programming-guide.html#linking-with-spar
k

"Spark 1.5.1 works with Python 2.6+ or Python 3.4+. It can use the standard
CPython interpreter, so C libraries like NumPy can be used. It also works
with PyPy 2.3+²

" PySpark works with IPython 1.0.0 and later.²


I realize there are a lot of legacy python packages that are probably
vectorized and not easy to port.

What would you recommend?

I assume if I wanted to use python 3 I would need to install it on all the
works and master. And follow the direction in linking-with-spark to cause it
to use the correct version of python

(of course I realize I need to install 3rd party packages on all the works)

Kind regards

Andy






streaming: missing data. does saveAsTextFile() append or replace?

2015-11-07 Thread Andy Davidson
Hi

I just started a new spark streaming project. In this phase of the system
all we want to do is save the data we received to hdfs. I after running for
a couple of days it looks like I am missing a lot of data. I wonder if
saveAsTextFile("hdfs:///rawSteamingData²); is overwriting the data I capture
in previous window? I noticed that after running for a couple of days  my
hdfs file system has 25 file. The names are something like ³part-6². I
used 'hadoop fs ­dus¹ to check the total data captured. While the system was
running I would periodically call Œdus¹ I was surprised sometimes the
numbers of total bytes actually dropped.


Is there a better way to save write my data to disk?

Any suggestions would be appreciated

Andy


   public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName(appName);

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(5 * 1000));



[ deleted code Š]



data.foreachRDD(new Function(){

private static final long serialVersionUID =
-7957854392903581284L;



@Override

public Void call(JavaRDD jsonStr) throws Exception {

jsonStr.saveAsTextFile("hdfs:///rawSteamingData²); //
/rawSteamingData is a directory

return null;

}  

});



ssc.checkpoint(checkPointUri);



ssc.start();

ssc.awaitTermination();

}




Re: bug: can not run Ipython notebook on cluster

2015-11-07 Thread Andy Davidson
What a BEAR! The following recipe worked for me. (took a couple of days
hacking).

I hope this improves the out of the box experience for others

Andy

My test program is now

In [1]:
from pyspark import SparkContext
textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
In [2]:
print("hello world²)

hello world
In [3]:
textFile.take(3)

Out[3]:
[' hello world', '']


Installation instructions

1. Ssh to cluster master
2. Sudo su
3. install python3.4 on all machines
```
yum install -y python34
bash-4.2# which python3
/usr/bin/python3

pssh -h /root/spark-ec2/slaves yum install -y python34

```

4. Install pip on all machines


```
yum list available |grep pip
yum install -y python34-pip

find /usr/bin -name "*pip*" -print
/usr/bin/pip-3.4

pssh -h /root/spark-ec2/slaves yum install -y python34-pip
```

5. install python on master

```
/usr/bin/pip-3.4 install ipython

pssh -h /root/spark-ec2/slaves /usr/bin/pip-3.4 install python
```

6. Install python develop stuff and jupiter on master
```
yum install -y python34-devel
/usr/bin/pip-3.4 install jupyter
```

7. Set up update spark-env.sh on all machine so by default we use python3.4

```
cd /root/spark/conf
printf "\n# Set Spark Python version\nexport PYSPARK_PYTHON=python3.4\n" >>
/root/spark/conf/spark-env.sh
for i in `cat slaves` ; do scp spark-env.sh
root@$i:/root/spark/conf/spark-env.sh; done
```

8. Restart cluster

```
/root/spark/sbin/stop-all.sh
/root/spark/sbin/start-all.sh
```

Running ipython notebook

1. set up an ssh tunnel on your local machine
ssh -i $KEY_FILE -N -f -L localhost::localhost:7000
ec2-user@$SPARK_MASTER

2. Log on to cluster master and start ipython notebook server

```
export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000"

$SPARK_ROOT/bin/pyspark --master local[2]

```

3. On your local machine open http://localhost:



From:  Andrew Davidson 
Date:  Friday, November 6, 2015 at 2:18 PM
To:  "user @spark" 
Subject:  bug: can not run Ipython notebook on cluster

> Does anyone use iPython notebooks?
> 
> I am able to use it on my local machine with spark how ever I can not get it
> work on my cluster.
> 
> 
> For unknown reason on my cluster I have to manually create the spark context.
> My test code generated this exception
> 
> Exception: Python in worker has different version 2.7 than that in driver 2.6,
> PySpark cannot run with different minor versions
> 
> On my mac I can solve the exception problem by setting
> 
> export PYSPARK_PYTHON=python3
> 
> export PYSPARK_DRIVER_PYTHON=python3
> 
> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark
> 
> 
> 
> On my cluster I set the values to python2.7. And PYTHON_OPTS=³notebook
> ‹no-browser ‹port=7000² . I connect using a ssh tunnel from my local machine.
> 
> 
> 
> I also tried installing python 3 , pip, ipython, and jupyter in/on my cluster
> 
> 
> 
> I tried adding export PYSPARK_PYTHON=python2.7 to the
> /root/spark/conf/spark-env.sh on all my machines
> 
> 
> from pyspark import SparkContext
> textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
> textFile.take(3
> 
> 
> In [1]:
> from pyspark import SparkContext
> sc = SparkContext("local", "Simple App")
> textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
> textFile.take(3)
> ---Py4
> JJavaError Traceback (most recent call last)
>  in ()  2 sc = SparkContext("local",
> "Simple App")  3 textFile =
> sc.textFile("file:///home/ec2-user/dataScience/readme.md")> 4
> textFile.take(3)/root/spark/python/pyspark/rdd.py in take(self, num)   1297
> 1298 p = range(partsScanned, min(partsScanned + numPartsToTry,
> totalParts))-> 1299 res = self.context.runJob(self,
> takeUpToNumLeft, p)   13001301 items +=
> res/root/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc,
> partitions, allowLocal)914 # SparkContext#runJob.915
> mappedRDD = rdd.mapPartitions(partitionFunc)--> 916 port =
> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)917
> return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))918
> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
> __call__(self, *args)536 answer =
> self.gateway_client.send_command(command)537 return_value =
> get_return_value(answer, self.gateway_client,
> --> 538 self.target_id, self.name)
> 539 540 for temp_arg in
> temp_args:/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
> get_return_value(answer, gateway_client, target_id, name)298
> raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.-->
> 300 format(target_id, '.', name), 

ipython notebook NameError: name 'sc' is not defined

2015-11-02 Thread Andy Davidson
Hi

I recently installed a new cluster using the
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2. SparkPi sample app works correctly.

I am trying to run iPython notebook on my cluster master and use an ssh
tunnel so that I can work with the notebook in a browser running on my mac.
Bellow is how I set up the ssh tunnel

$ ssh -i $KEY_FILE -N -f -L localhost::localhost:7000
ec2-user@$SPARK_MASTER

$ ssh -i $KEY_FILE ec2-user@$SPARK_MASTER
$ cd top level notebook dir
$ IPYTHON_OPTS="notebook --no-browser --port=7000" /root/spark/bin/pyspark

I am able to access my notebooks in the browser by opening
http://localhost:

When I run the following python code I get an error NameError: name 'sc' is
not defined? Any idea what the problem might be?

I looked through pyspark and tried various combinations of the following but
still get the same error

$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook
--no-browser --port=7000" /root/spark/bin/pyspark --master=local[2]

Kind regards

Andy





In [1]:
import sys
print (sys.version)
 
import os
print(os.getcwd() + "\n")
2.6.9 (unknown, Apr  1 2015, 18:16:00)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)]
/home/ec2-user/dataScience

In [2]:
from pyspark import SparkContext
textFile = sc.textFile("readme.txt")
textFile.take(1)
---
NameError Traceback (most recent call last)
 in ()
  1 from pyspark import SparkContext
> 2 textFile = sc.textFile("readme.txt")
  3 textFile.take(1)

NameError: name 'sc' is not defined

In [ ]:
 




bug: can not run Ipython notebook on cluster

2015-11-06 Thread Andy Davidson
Does anyone use iPython notebooks?

I am able to use it on my local machine with spark how ever I can not get it
work on my cluster.


For unknown reason on my cluster I have to manually create the spark
context. My test code generated this exception

Exception: Python in worker has different version 2.7 than that in driver
2.6, PySpark cannot run with different minor versions

On my mac I can solve the exception problem by setting

export PYSPARK_PYTHON=python3

export PYSPARK_DRIVER_PYTHON=python3

IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark



On my cluster I set the values to python2.7. And PYTHON_OPTS=³notebook
‹no-browser ‹port=7000² . I connect using a ssh tunnel from my local
machine.



I also tried installing python 3 , pip, ipython, and jupyter in/on my
cluster



I tried adding export PYSPARK_PYTHON=python2.7 to the
/root/spark/conf/spark-env.sh on all my machines


from pyspark import SparkContext
textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
textFile.take(3


In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
textFile = sc.textFile("file:///home/ec2-user/dataScience/readme.md")
textFile.take(3)
---
Py4JJavaError Traceback (most recent call last)
 in ()
  2 sc = SparkContext("local", "Simple App")
  3 textFile = 
sc.textFile("file:///home/ec2-user/dataScience/readme.md")
> 4 textFile.take(3)

/root/spark/python/pyspark/rdd.py in take(self, num)
   1297 
   1298 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1299 res = self.context.runJob(self, takeUpToNumLeft, p)
   1300 
   1301 items += res

/root/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc,
partitions, allowLocal)
914 # SparkContext#runJob.
915 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, partitions)
917 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
918 

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
(TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
  File "/root/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in
main
("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver
2.6, PySpark cannot run with different minor versions

at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu
ler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched
uler.scala:1271)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched
uler.scala:1270)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 

java TwitterUtils.createStream() how create "user stream" ???

2015-10-19 Thread Andy Davidson
Hi

I wrote a little prototype that created a ³public stream² now I want to
convert it to read tweets for a large number of explicit users.

I to create a ³user stream² or a ³site stream". According to the twitter
developer doc I should be able to set the ³follows² parameter to a list of
users I am interested in

https://dev.twitter.com/streaming/overview/request-parameters#follow
follow
A comma-separated list of user IDs, indicating the users whose Tweets should
be delivered on the stream.


I am not sure how to do this? I found the doc for createStream. I am
guessing I need to set filters? Can anyone provide a example?

Kind regards

Andy

http://spark.apache.org/docs/latest/api/java/index.html

createStream
public static JavaReceiverInputDStream
 
createStream(JavaStreamingContext
  jssc,
  java.lang.String[]
filters)
Create a input stream that returns tweets received from Twitter using
Twitter4J's default OAuth authentication; this requires the system
properties twitter4j.oauth.consumerKey, twitter4j.oauth.consumerSecret,
twitter4j.oauth.accessToken and twitter4j.oauth.accessTokenSecret. Storage
level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
Parameters:jssc - JavaStreamingContext objectfilters - Set of filter strings
to get only those tweets that match themReturns:(undocumented)




Re: WARN LoadSnappy: Snappy native library not loaded

2015-11-17 Thread Andy Davidson

On my master

grep native /root/spark/conf/spark-env.sh

SPARK_SUBMIT_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:/root/ephemeral-hdfs/l
ib/native/"



$ ls /root/ephemeral-hdfs/lib/native/

libhadoop.a   libhadoop.solibhadooputils.a  libsnappy.so
libsnappy.so.1.1.3  Linux-i386-32

libhadooppipes.a  libhadoop.so.1.0.0  libhdfs.a libsnappy.so.1
Linux-amd64-64


From:  Andrew Davidson 
Date:  Tuesday, November 17, 2015 at 2:29 PM
To:  "user @spark" 
Subject:  Re: WARN LoadSnappy: Snappy native library not loaded

> I forgot to mention. I am using spark-1.5.1-bin-hadoop2.6
> 
> From:  Andrew Davidson 
> Date:  Tuesday, November 17, 2015 at 2:26 PM
> To:  "user @spark" 
> Subject:  Re: WARN LoadSnappy: Snappy native library not loaded
> 
>> FYI
>> 
>> After 17 min. only 26112/228155 have succeeded
>> 
>> This seems very slow
>> 
>> Kind regards
>> 
>> Andy
>> 
>> 
>> 
>> From:  Andrew Davidson 
>> Date:  Tuesday, November 17, 2015 at 2:22 PM
>> To:  "user @spark" 
>> Subject:  WARN LoadSnappy: Snappy native library not loaded
>> 
>> 
>>> I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I
>>> have 3 slaves. In general I am running into trouble even with small work
>>> loads. I am using IPython notebooks running on my spark cluster.
>>> Everything is painfully slow. I am using the standAlone cluster manager.
>>> I noticed that I am getting the following warning on my driver console.
>>> Any idea what the problem might be?
>>> 
>>> 
>>> 
>>> 15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for
>>> source because spark.app.id is not set.
>>> 15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded
>>> 
>>> 
>>> 
>>> Here is an overview of my POS app. I have a file on hdfs containing about
>>> 5000 twitter status strings.
>>> 
>>> tweetStrings = sc.textFile(dataURL)
>>> 
>>> jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10))
>>> 
>>> 
>>> Generated the following error ³error occurred while calling
>>> o78.partitions.: java.lang.OutOfMemoryError: GC overhead limit exceeded²
>>> 
>>> Any idea what we need to do to improve new spark user¹s out of the box
>>> experience?
>>> 
>>> Kind regards
>>> 
>>> Andy
>>> 
>>> export PYSPARK_PYTHON=python3.4
>>> export PYSPARK_DRIVER_PYTHON=python3.4
>>> export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"
>>> 
>>> MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077
>>> 
>>> 
>>> numCores=2
>>> $SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores
>>> $numCores $*




newbie simple app, small data set: Py4JJavaError java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-11-18 Thread Andy Davidson
Hi

I am working on a spark POC. I created a ec2 cluster on AWS using
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2

Bellow is a simple python program. I am running using IPython notebook. The
notebook server is running on my spark master. If I run my program more than
1 once using my large data set, I get the GC outOfMemory error. I run it
each time by ³re running the notebook cell². I can run my smaller set a
couple of times with out problems.

I launch using

pyspark --master $MASTER_URL --total-executor-cores 2


Any idea how I can debug this?

Kind regards

Andy

Using the master console I see
* there is only one app run (this is what I expect)
* There are 2 works each on a different slave (this is what I expect)
* Each worker is using 1 core (this is what I expect)
* Each worker memory usage is using 6154 (seems resonable)

* Alive Workers: 3
* Cores in use: 6 Total, 2 Used
* Memory in use: 18.8 GB Total, 12.0 GB Used
* Applications: 1 Running, 5 Completed
* Drivers: 0 Running, 0 Completed
* Status: ALIVE

The data file I am working with is small

I collected this data using spark streaming twitter utilities. All I do is
capture tweets, convert them to JSON and store as strings to hdfs

$ hadoop fs -count  hdfs:///largeSample hdfs:///smallSample

   13226   240098 3839228100

   1   228156   39689877


My python code. I am using python 3.4.

import json
import datetime

startTime = datetime.datetime.now()

#dataURL = "hdfs:///largeSample"
dataURL = "hdfs:///smallSample"
tweetStrings = sc.textFile(dataURL)

t2 = tweetStrings.take(2)
print (t2[1])
print("\n\nelapsed time:%s" % (datetime.datetime.now() - startTime))

---
Py4JJavaError Traceback (most recent call last)
 in ()
  8 tweetStrings = sc.textFile(dataURL)
  9 
---> 10 t2 = tweetStrings.take(2)
 11 print (t2[1])
 12 print("\n\nelapsed time:%s" % (datetime.datetime.now() - startTime))

/root/spark/python/pyspark/rdd.py in take(self, num)
   1267 """
   1268 items = []
-> 1269 totalParts = self.getNumPartitions()
   1270 partsScanned = 0
   1271 

/root/spark/python/pyspark/rdd.py in getNumPartitions(self)
354 2
355 """
--> 356 return self._jrdd.partitions().size()
357 
358 def filter(self, f):

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539 
540 for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 34 def deco(*a, **kw):
 35 try:
---> 36 return f(*a, **kw)
 37 except py4j.protocol.Py4JJavaError as e:
 38 s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o65.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.(String.java:207)
at java.lang.String.substring(String.java:1969)
at java.net.URI$Parser.substring(URI.java:2869)
at java.net.URI$Parser.parseHierarchical(URI.java:3106)
at java.net.URI$Parser.parse(URI.java:3053)
at java.net.URI.(URI.java:746)
at org.apache.hadoop.fs.Path.initialize(Path.java:145)
at org.apache.hadoop.fs.Path.(Path.java:71)
at org.apache.hadoop.fs.Path.(Path.java:50)
at 
org.apache.hadoop.hdfs.protocol.HdfsFileStatus.getFullPath(HdfsFileStatus.ja
va:215)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.makeQualified(DistributedFileSy
stem.java:293)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSyste
m.java:352)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:862)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:887)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:185
)
at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at 

Re: epoch date format to normal date format while loading the files to HDFS

2015-12-08 Thread Andy Davidson
Hi Sonia

I believe you are using java? Take a look at Java Date I am sure you will
find lots of examples of how to format dates

Enjoy share

Andy


/**

 * saves tweets to disk. This a replacement for

 * @param tweets

 * @param outputURI

 */

private static void saveTweets(JavaDStream jsonTweets, String
outputURI) {



/*

using saveAsTestFiles will cause lots of empty directories to be
created.

DStream data = jsonTweets.dstream();

data.saveAsTextFiles(outputURI, null);

*/



jsonTweets.foreachRDD(new Function2() {

private static final long serialVersionUID =
-5482893563183573691L;



@Override

public Void call(JavaRDD rdd, Time time) throws
Exception {

if(!rdd.isEmpty()) {

String dirPath = outputURI + "-" + time.milliseconds();

rdd.saveAsTextFile(dirPath);

}

return null;

}



});




From:  Soni spark 
Date:  Tuesday, December 8, 2015 at 6:26 AM
To:  Andrew Davidson 
Subject:  epoch date format to normal date format while loading the files to
HDFS

> Hi Andy,
> 
> How are you? i need your help again.
> 
> I have written a spark streaming program in Java to access twitter tweets and
> it is working fine. I can able to copy the twitter feeds to HDFS location by
> batch wise.For  each batch, it is creating a folder with epoch time stamp. for
> example,
> 
>  If i give HDFS location as hdfs://localhost:54310/twitter/, the files are
> creating like below
> 
> /spark/twitter/-144958080/
> /spark/twitter/-144957984/
> 
> I want to create a folder name like -MM-dd-HH format instead of by default
> epoch format.
> 
> I want it like below so that i can do hive partitions easily to access the
> data.
> 
> /spark/twitter/2015-12-08-01/
> 
> 
> Can you help me. Thank you so much in advance.
> 
> 
> Thanks
> Soniya




Re: possible bug spark/python/pyspark/rdd.py portable_hash()

2015-12-02 Thread Andy Davidson
Hi Ted an Felix



From:  Ted Yu 
Date:  Sunday, November 29, 2015 at 10:37 AM
To:  Andrew Davidson 
Cc:  Felix Cheung , "user @spark"

Subject:  Re: possible bug spark/python/pyspark/rdd.py portable_hash()

> I think you should file a bug.


Please feel free to update this bug report

https://issues.apache.org/jira/browse/SPARK-12100


Andy

>  




Re: possible bug spark/python/pyspark/rdd.py portable_hash()

2015-11-29 Thread Andy Davidson
Hi Felix and Ted

This is how I am starting spark

Should I file a bug?

Andy


export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"


$SPARK_ROOT/bin/pyspark \

--master $MASTER_URL \

--total-executor-cores $numCores \

--driver-memory 2G \

--executor-memory 2G \

$extraPkgs \

$*


From:  Felix Cheung <felixcheun...@hotmail.com>
Date:  Saturday, November 28, 2015 at 12:11 AM
To:  Ted Yu <yuzhih...@gmail.com>
Cc:  Andrew Davidson <a...@santacruzintegration.com>, "user @spark"
<user@spark.apache.org>
Subject:  Re: possible bug spark/python/pyspark/rdd.py portable_hash()

>  
> Ah, it's there in spark-submit and pyspark.
> Seems like it should be added for spark_ec2
> 
> 
>  
> _
> From: Ted Yu <yuzhih...@gmail.com>
> Sent: Friday, November 27, 2015 11:50 AM
> Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
> To: Felix Cheung <felixcheun...@hotmail.com>
> Cc: Andy Davidson <a...@santacruzintegration.com>, user @spark
> <user@spark.apache.org>
> 
> 
> 
>ec2/spark-ec2 calls ./ec2/spark_ec2.py
>
> 
>
>
> I don't see PYTHONHASHSEED defined in any of these scripts.
>
> 
>
>
> Andy reported this for ec2 cluster.
>
> 
>
>
> I think a JIRA should be opened.
>
> 
>
>   
>   
>
>
> On Fri, Nov 27, 2015 at 11:01 AM, Felix Cheung
> <felixcheun...@hotmail.com> wrote:
> 
>>   
>>   
>>May I ask how you are starting Spark?
>> It looks like PYTHONHASHSEED is being set:
>> https://github.com/apache/spark/search?utf8=%E2%9C%93=PYTHONHASHSEED
>>
>> 
>>
>> 
>> 
>> Date: Thu, 26 Nov 2015 11:30:09 -0800
>> Subject: possible bug spark/python/pyspark/rdd.py portable_hash()
>> From: a...@santacruzintegration.com
>> To: user@spark.apache.org
>> 
>> 
>>  I am using  spark-1.5.1-bin-hadoop2.6. I used
>> spark-1.5.1-bin-hadoop2.6/ec2/s park-ec2 to create a cluster
>> and configured spark-env to use python3. I get and exception '
>> Randomness of hash of string should be disabled via PYTHONHASHSEED¹.
>> Is there any reason rdd.py should not just set PYTHONHASHSEED ?
>> 
>>  
>> 
>> 
>>  Should I file a bug?
>> 
>>  
>> 
>> 
>>  Kind regards
>> 
>>  
>> 
>> 
>>  Andy   
>> 
>>  
>> 
>> 
>>  details
>> 
>>  
>> 
>> 
>>  
>> http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtrac
>> t#pyspark.RDD.subtract
>> 
>>  
>> 
>> 
>>  Example does not work out of the box
>> 
>>  
>> 
>> 
>>   Subtract(   other,
>> numPartitions=None)
>> <http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtra
>> ct#pyspark.RDD.subtract>
>> 
>> Return each value in self that is not contained in other.
>>
>> 
>>  
>>>>> >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])>>> y =
>>>>> sc.parallelize([("a", 3), ("c", None)])>>>
>>>>> sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]
>> 
>>
>> 
>> It raises   
>>  
>> 
>> 
>>  
>> if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
>> raise Exception("Randomness of hash of string should be disabled via
>> PYTHONHASHSEED")
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  The following script fixes the problem
>> 
>>  
>> 
>> 
>>  Sudo printf "\n# set PYTHONHASHSEED so python3 will not generate
>> Exception'Randomness of hash of string should be disabled via
>> PYTHONHASHSEED'\nexport PYTHONHASHSEED=123\n" >>
>> /root/spark/conf/spark-env.sh
>> 
>>  
>> 
>> 
>>  sudo pssh -i -h /root/spark-ec2/slaves cp
>> /root/spark/conf/spark-env.sh /root/spark/conf/spark-env.sh-`date
>> "+%Y-%m-%d:%H:%M"`
>> 
>>  
>> 
>> 
>>  Sudo for i in `cat slaves` ; do scp spark-env.sh
>> root@$i:/root/spark/conf/spark-env.sh; done
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>>
>>
>>  
>>  
>
>
>   
>   
> 
>  




Re: Not all workers seem to run in a standalone cluster setup by spark-ec2 script

2015-12-04 Thread Andy Davidson
Hi Kyohey

I think you need to pass the argument --master $MASTER_URL \


master_URL is something like
spark://ec2-54-215-112-121.us-west-1.compute.amazonaws.com:7077

Its the public url to your master


Andy

From:  Kyohey Hamaguchi 
Date:  Friday, December 4, 2015 at 11:28 AM
To:  "user @spark" 
Subject:  Not all workers seem to run in a standalone cluster setup by
spark-ec2 script

> Hi,
> 
> I have setup a Spark standalone-cluster, which involves 5 workers,
> using spark-ec2 script.
> 
> After submitting my Spark application, I had noticed that just one
> worker seemed to run the application and other 4 workers were doing
> nothing. I had confirmed this by checking CPU and memory usage on the
> Spark Web UI (CPU usage indicates zero and memory is almost fully
> availabile.)
> 
> This is the command used to launch:
> 
> $ ~/spark/ec2/spark-ec2 -k awesome-keypair-name -i
> /path/to/.ssh/awesome-private-key.pem --region ap-northeast-1
> --zone=ap-northeast-1a --slaves 5 --instance-type m1.large
> --hadoop-major-version yarn launch awesome-spark-cluster
> 
> And the command to run application:
> 
> $ ssh -i ~/path/to/awesome-private-key.pem root@ec2-master-host-name
> "mkdir ~/awesome"
> $ scp -i ~/path/to/awesome-private-key.pem spark.jar
> root@ec2-master-host-name:~/awesome && ssh -i
> ~/path/to/awesome-private-key.pem root@ec2-master-host-name
> "~/spark-ec2/copy-dir ~/awesome"
> $ ssh -i ~/path/to/awesome-private-key.pem root@ec2-master-host-name
> "~/spark/bin/spark-submit --num-executors 5 --executor-cores 2
> --executor-memory 5G --total-executor-cores 10 --driver-cores 2
> --driver-memory 5G --class com.example.SparkIsAwesome
> awesome/spark.jar"
> 
> How do I let the all of the workers execute the app?
> 
> Or do I have wrong understanding on what workers, slaves and executors are?
> 
> My understanding is: Spark driver(or maybe master?) sends a part of
> jobs to each worker (== executor == slave), so a Spark cluster
> automatically exploits all resources available in the cluster. Is this
> some sort of misconception?
> 
> Thanks,
> 
> --
> Kyohey Hamaguchi
> TEL:  080-6918-1708
> Mail: tnzk.ma...@gmail.com
> Blog: http://blog.tnzk.org/
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




newbie best practices: is spark-ec2 intended to be used to manage long-lasting infrastructure ?

2015-12-03 Thread Andy Davidson
About 2 months ago I used spark-ec2 to set up a small cluster. The cluster
runs a spark streaming app 7x24 and stores the data to hdfs. I also need to
run some batch analytics on the data.

Now that I have a little more experience I wonder if this was a good way to
set up the cluster the following issues
1. I have not been able to find explicit directions for upgrading the spark
version
> 1. 
> http://search-hadoop.com/m/q3RTt7E0f92v0tKh2=Re+Upgrading+Spark+in+EC2+cl
> usters
2. I am not sure where the data is physically be stored. I think I may
accidentally loose all my data
3. spark-ec2 makes it easy to launch a cluster with as many machines as you
like how ever Its not clear how I would add slaves to an existing
installation

Our Java streaming app we call rdd.saveAsTextFile(³hdfs://path²);

ephemeral-hdfs/conf/hdfs-site.xml:

  

dfs.data.dir

/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data

  



persistent-hdfs/conf/hdfs-site.xml



$ mount

/dev/xvdb on /mnt type ext3 (rw,nodiratime)

/dev/xvdf on /mnt2 type ext3 (rw,nodiratime)



http://spark.apache.org/docs/latest/ec2-scripts.html


"The spark-ec2 script also supports pausing a cluster. In this case, the VMs
are stopped but not terminated, so they lose all data on ephemeral disks but
keep the data in their root partitions and their persistent-pdfs.²


Initially I though using HDFS was a good idea. spark-ec2 makes HDFS easy to
use. I incorrectly thought spark some how knew how HDFS partitioned my data.

I think many people are using amazon s3. I do not have an direct experience
with S3. My concern would be that the data is not physically stored closed
to my slaves. I.e. High communication costs.

Any suggestions would be greatly appreciated

Andy




Re: How and where to update release notes for spark rel 1.6?

2015-12-03 Thread Andy Davidson
Hi JB

Do you know where I can find instructions for upgrading an existing
installation? I search the link you provided for ³update² and ³upgrade²

Kind regards

Andy

From:  Jean-Baptiste Onofré 
Date:  Thursday, December 3, 2015 at 5:29 AM
To:  "user @spark" 
Subject:  Re: How and where to update release notes for spark rel 1.6?

> Hi Ravi,
> 
> Even if it's not perfect, you can take a look on the current
> ReleaseNotes on Jira:
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315420
> ion=12333083
> 
> Regards
> JB
> 
> On 12/03/2015 12:01 PM, RaviShankar KS wrote:
>>  Hi,
>> 
>>  How and where to update release notes for spark rel 1.6?
>>  pls help.
>> 
>>  There are a few methods with changed params, and a few deprecated ones
>>  that need to be documented.
>> 
>>  Thanks
>>  Ravi
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Example of a Trivial Custom PySpark Transformer

2015-12-07 Thread Andy Davidson
FYI

Hopeful other will find this example helpful

Andy

Example of a Trivial Custom PySpark Transformer
ref:
* 
* NLTKWordPunctTokenizer example
 
* 
* pyspark.sql.functions.udf


In [12]:
1
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
2
from pyspark.ml.util import keyword_only
3
 
4
from pyspark.sql.functions import udf
5
from pyspark.sql.types import FloatType
6
from pyspark.ml.pipeline import Transformer
7
 
8
class TrivialTokenizer(Transformer, HasInputCol, HasOutputCol):
9
 
10
@keyword_only
11
def __init__(self, inputCol=None, outputCol=None, constant=None):
12
super(TrivialTokenizer, self).__init__()
13
self.constant = Param(self, "constant", 0)
14
self._setDefault(constant=0)
15
kwargs = self.__init__._input_kwargs
16
self.setParams(**kwargs)
17
 
18
@keyword_only
19
def setParams(self, inputCol=None, outputCol=None, constant=None):
20
kwargs = self.setParams._input_kwargs
21
return self._set(**kwargs)
22
 
23
def setConstant(self, value):
24
self._paramMap[self.constant] = value
25
return self
26
 
27
def getConstant(self):
28
return self.getOrDefault(self.constant)
29
 
30
def _transform(self, dataset):
31
const = self.getConstant()
32
 
33
def f(v):
34
return v + const
35
 
36
t = FloatType()
37
out_col = self.getOutputCol()
38
in_col = dataset[self.getInputCol()]
39
return dataset.withColumn(out_col, udf(f, t)(in_col))
40

41
sentenceDataFrame = sqlContext.createDataFrame([
42
  (0, 1.1, "Hi I heard who the about Spark"),
43
  (0, 1.2, "I wish Java could use case classes"),
44
  (1, 1.3, "Logistic regression models are neat")
45
], ["label", "x1", "sentence"])
46
 
47
testTokenizer = TrivialTokenizer(inputCol="x1", outputCol="x2",
constant=1.0) 
48
 
49
testTokenizer.transform(sentenceDataFrame).show()
+-+---++---+
|label| x1|sentence| x2|
+-+---++---+
|0|1.1|Hi I heard who th...|2.1|
|0|1.2|I wish Java could...|2.2|
|1|1.3|Logistic regressi...|2.3|
+-+---++---+




issue creating pyspark Transformer UDF that creates a LabeledPoint: AttributeError: 'DataFrame' object has no attribute '_get_object_id'

2015-12-07 Thread Andy Davidson
Hi 

I am running into a strange error. I am trying to write a transformer that
takes in to columns and creates a LabeledPoint. I can not figure out why I
am getting 

AttributeError: 'DataFrame' object has no attribute Œ_get_object_id¹

I am using spark-1.5.1-bin-hadoop2.6

Any idea what I am doing wrong? Is this a bug with data frames?

Also I suspect the next problem I will run into is I do not think UDF¹s
support LabeledPoint?

Comments and suggestions are greatly appreciated

Andy




In [37]:
1
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
2
from pyspark.ml.util import keyword_only
3
 
4
from pyspark.sql.functions import udf
5
from pyspark.ml.pipeline import Transformer
6
 
7
from pyspark.sql.types import BinaryType, DataType, ByteType, StringType
8
from pyspark.mllib.linalg import SparseVector
9
from pyspark.mllib.regression import LabeledPoint
10
 
11
 
12
class LabledPointTransformer(Transformer, HasInputCol, HasOutputCol):
13
@keyword_only
14
def __init__(self, inputCol=None, outputCol=None, featureCol=None):
15
super(LabledPointTransformer, self).__init__()
16
self.featureCol = Param(self, "featureCol", "")
17
self._setDefault(featureCol="feature")
18
kwargs = self.__init__._input_kwargs
19
self.setParams(**kwargs)
20

21
@keyword_only
22
def setParams(self, inputCol=None, outputCol=None, featureCol=None):
23
kwargs = self.setParams._input_kwargs
24
return self._set(**kwargs)
25
 
26
def setFeatureCol(self, value):
27
self._paramMap[self.featureCol] = value
28
return self
29
 
30
def getFeatureCol(self):
31
return self.getOrDefault(self.featureCol)
32

33
def _transform(self, dataset): # dataset is a data frame
34
out_col = self.getOutputCol()
35
labelCol = self.getInputCol()
36
featureCol = self.getFeatureCol()
37

38
def f(lf):
39
return str(LabeledPoint(lf[labelCol], lf[featureCol]))
40
 
41
t = StringType()
42
#data = dataset[labelCol, featureCol]
43
data = dataset.select(labelCol, featureCol)
44
return dataset.withColumn(out_col, udf(f, t)(data))
45
 
46
lpData = sqlContext.createDataFrame([
47
(0, SparseVector(3, [0, 1], [1.0, 2.0])),
48
(1, SparseVector(3, [1, 2], [3.0, 1.0])),
49
], ["label", "features"])
50

51
lpData.show()
52
lpt = LabledPointTransformer(inputCol="label", outputCol="labeledPoint",
featureCol="features",)
53
tmp = lpt.transform(lpData)
54
tmp.collect()
+-+---+
|label|   features|
+-+---+
|0|(3,[0,1],[1.0,2.0])|
|1|(3,[1,2],[3.0,1.0])|
+-+---+

---
AttributeErrorTraceback (most recent call last)
 in ()
 51 lpData.show()
 52 lpt = LabledPointTransformer(inputCol="label",
outputCol="labeledPoint", featureCol="features",)
---> 53 tmp = lpt.transform(lpData)
 54 tmp.collect()

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/ml/pipeline
.py in transform(self, dataset, params)
105 return self.copy(params,)._transform(dataset)
106 else:
--> 107 return self._transform(dataset)
108 else:
109 raise ValueError("Params must be either a param map but
got %s." % type(params))

 in _transform(self, dataset)
 42 #data = dataset[labelCol, featureCol]
 43 data = dataset.select(labelCol, featureCol)
---> 44 return dataset.withColumn(out_col, udf(f, t)(data))
 45 
 46 lpData = sqlContext.createDataFrame([

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/functio
ns.py in __call__(self, *cols)
   1436 def __call__(self, *cols):
   1437 sc = SparkContext._active_spark_context
-> 1438 jc = self._judf.apply(_to_seq(sc, cols, _to_java_column))
   1439 return Column(jc)
   1440 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _to_seq(sc, cols, converter)
 58 """
 59 if converter:
---> 60 cols = [converter(c) for c in cols]
 61 return sc._jvm.PythonUtils.toSeq(cols)
 62 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in (.0)
 58 """
 59 if converter:
---> 60 cols = [converter(c) for c in cols]
 61 return sc._jvm.PythonUtils.toSeq(cols)
 62 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _to_java_column(col)
 46 jcol = col._jc
 47 else:
---> 48 jcol = _create_column_from_name(col)
 49 return jcol
 50 

/Users//workSpace/spark/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/column.
py in _create_column_from_name(name)
 39 def _create_column_from_name(name):
 40 sc = 

newbie how to upgrade a spark-ec2 cluster?

2015-12-02 Thread Andy Davidson
I am using spark-1.5.1-bin-hadoop2.6. I used
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2 to create a cluster. Any idea how I
can upgrade to 1.5.2 prebuilt binary?

Also if I choose to build the binary, how would I upgrade my cluster?

Kind regards

Andy






Re: what are the cons/drawbacks of a Spark DataFrames

2015-12-15 Thread Andy Davidson
My understanding is one of the biggest advantages of DF¹s is that  schema
information allows a lot of optimization. For example assume frame had many
column but your computation only uses 2 columns. No need to load all the
data.

Andy

From:  "email2...@gmail.com" 
Date:  Monday, December 14, 2015 at 7:35 PM
To:  "user @spark" 
Subject:  what are the cons/drawbacks of a Spark DataFrames

> Hello All - I've started using the Spark DataFrames and looks like it
> provides rich column level operations and functions.
> 
> In the same time, I would like to understand are there any drawbacks / cons
> of using a DataFrames?. If so please share your experience on that.
> 
> Thanks, 
> Gokul
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-cons-drawback
> s-of-a-Spark-DataFrames-tp25703.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




Re: looking for Spark streaming unit example written in Java

2015-12-17 Thread Andy Davidson
> 
>   public static scala.runtime.Nothing$ fail();
> 
>   public static void expect(java.lang.Object, java.lang.Object);
> 
>   public static void expectResult(java.lang.Object, java.lang.Object);
> 
>   public static void assertResult(java.lang.Object, java.lang.Object);
> 
>   public static void expect(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static void expectResult(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static void assertResult(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static  java.lang.Throwable trap(scala.Function0);
> 
>   public static  T intercept(scala.Function0,
> scala.reflect.Manifest);
> 
>   public static void assume(scala.Option);
> 
>   public static void assume(scala.Option, java.lang.Object);
> 
>   public static void assert(scala.Option);
> 
>   public static void assert(scala.Option, java.lang.Object);
> 
>   public static void
> org$scalatest$Assertions$_setter_$assertionsHelper_$eq(org.scalatest.Assertion
> s$AssertionsHelper);
> 
>   public static org.scalatest.Assertions$AssertionsHelper assertionsHelper();
> 
>   public static org.scalatest.Status run(scala.Option,
> org.scalatest.Reporter, org.scalatest.Stopper, org.scalatest.Filter,
> scala.collection.immutable.Map<java.lang.String, java.lang.Object>,
> scala.Option, org.scalatest.Tracker);
> 
>   public static boolean execute$default$7();
> 
>   public static boolean execute$default$6();
> 
>   public static boolean execute$default$5();
> 
>   public static boolean execute$default$4();
> 
>   public static boolean execute$default$3();
> 
>   public static org.scalatest.ConfigMap execute$default$2();
> 
>   public static java.lang.String execute$default$1();
> 
>   public static scala.Option rerunner();
> 
>   public static int expectedTestCount(org.scalatest.Filter);
> 
>   public static void
> pendingUntilFixed(scala.Function0);
> 
>   public static org.scalatest.PendingNothing pending();
> 
>   public static java.lang.String suiteId();
> 
>   public static java.lang.String suiteName();
> 
>   public static org.scalatest.Status runNestedSuites(org.scalatest.Args);
> 
>   public static org.scalatest.Outcome
> withFixture(org.scalatest.Suite$NoArgTest);
> 
>   public static void execute();
> 
>   public static void execute(java.lang.String, org.scalatest.ConfigMap,
> boolean, boolean, boolean, boolean, boolean);
> 
>   public static scala.collection.immutable.IndexedSeq
> nestedSuites();
> 
>   public static void
> org$scalatest$Suite$_setter_$styleName_$eq(java.lang.String);
> 
>   public static org.scalatest.ConfigMap testDataFor$default$2();
> 
>   public static org.scalatest.TestData testDataFor(java.lang.String,
> org.scalatest.ConfigMap);
> 
>   public static void testsFor(scala.runtime.BoxedUnit);
> 
>   public static org.scalatest.Status runTests(scala.Option,
> org.scalatest.Args);
> 
>   public static scala.collection.immutable.Map<java.lang.String,
> scala.collection.immutable.Set> tags();
> 
>   public static scala.collection.immutable.Set testNames();
> 
>   public static void ignore(java.lang.String,
> scala.collection.Seq,
> scala.Function0);
> 
>   public static void test(java.lang.String,
> scala.collection.Seq,
> scala.Function0);
> 
>   public static void registerIgnoredTest(java.lang.String,
> scala.collection.Seq,
> scala.Function0);
> 
>   public static void registerTest(java.lang.String,
> scala.collection.Seq,
> scala.Function0);
> 
>   public static org.scalatest.Documenter markup();
> 
>   public static org.scalatest.Alerter alert();
> 
>   public static org.scalatest.Notifier note();
> 
>   public static org.scalatest.Informer info();
> 
>   public static void
> org$scalatest$FunSuiteLike$_setter_$styleName_$eq(java.lang.String);
> 
>   public static java.lang.String styleName();
> 
>   public static java.lang.String toString();
> 
> }
> 
> $ 
> 
> 
> 
> From:  Ted Yu <yuzhih...@gmail.com>
> Date:  Tuesday, December 15, 2015 at 7:09 PM
> To:  Andrew Davidson <a...@santacruzintegration.com>
> Cc:  "user @spark" <user@spark.apache.org>
> Subject:  Re: looking for Spark streaming unit example written in Java
> 
>> Have you taken a look at
>> streaming/src/test//java/org/apache/spark/streaming/JavaAPISuite.java ?
>> 
>> JavaDStream stream = ssc.queueStream(rdds);
>> JavaTestUtils.attachTestOutputStream(stream);
>> 
>> FYI
>> 
>> On Tue, Dec 15, 2015 at 6:36 PM, Andy Davidson
>> <a...@santacruz

Re: Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Andy Davidson
Hi Jakob

The cluster was set up using the spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
script

Given my limited knowledge I think this looks okay?

Thanks

Andy

$ sudo netstat -peant | grep 7077

tcp0  0 :::172-31-30-51:7077:::*
LISTEN  0  311641427355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:57311
ESTABLISHED 0  311591927355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42333
ESTABLISHED 0  373666427355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:49796
ESTABLISHED 0  311592527355/java

tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42290
ESTABLISHED 0  311592327355/java



$ ps -aux | grep 27355

Warning: bad syntax, perhaps a bogus '-'? See
/usr/share/doc/procps-3.2.8/FAQ

ec2-user 23867  0.0  0.0 110404   872 pts/0S+   02:06   0:00 grep 27355

root 27355  0.5  6.7 3679096 515836 ?  Sl   Nov26 107:04
/usr/java/latest/bin/java -cp
/root/spark/sbin/../conf/:/root/spark/lib/spark-assembly-1.5.1-hadoop1.2.1.j
ar:/root/spark/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark/lib/datanucleus
-rdbms-3.2.9.jar:/root/spark/lib/datanucleus-core-3.2.10.jar:/root/ephemeral
-hdfs/conf/ -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip
ec2-54-215-217-122.us-west-1.compute.amazonaws.com --port 7077 --webui-port
8080


From:  Jakob Odersky <joder...@gmail.com>
Date:  Thursday, December 10, 2015 at 5:55 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: Warning: Master endpoint spark://ip:7077 was not a REST
server. Falling back to legacy submission gateway instead.

> Is there any other process using port 7077?
> 
> On 10 December 2015 at 08:52, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi
>> 
>> I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My job
>> seems to run with out any problem.
>> 
>> Kind regards 
>> 
>> Andy
>> 
>> + /root/spark/bin/spark-submit --class com.pws.spark.streaming.IngestDriver
>> --master spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077>
>> --total-executor-cores 2 --deploy-mode cluster
>> hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3
>> 
>> Running Spark using the REST application submission protocol.
>> 
>> 15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
>> spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077> .
>> 
>> Warning: Master endpoint
>> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077>  was not a
>> REST server. Falling back to legacy submission gateway instead.
> 




architecture though experiment: what is the advantage of using kafka with spark streaming?

2015-12-10 Thread Andy Davidson
I noticed that many people are using Kafka and spark streaming. Can some one
provide a couple of use case

I image some possible use cases might be

Is the purpose using  Kafka
1. provide some buffering?
2. implementing some sort of load balancing for the over all system?
3. Provide filtering /sorting of data?
4. Simplify client connection. Easy for thousands of producers to connect to
kafka. Probably hard to do with spark streaming
5. ???
Kind regards

Andy




Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-10 Thread Andy Davidson
Hi

I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My
job seems to run with out any problem.

Kind regards 

Andy

+ /root/spark/bin/spark-submit --class com.pws.spark.streaming.IngestDriver
--master spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
--total-executor-cores 2 --deploy-mode cluster
hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3

Running Spark using the REST application submission protocol.

15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077.

Warning: Master endpoint
ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077 was not a REST
server. Falling back to legacy submission gateway instead.




cluster mode uses port 6066 Re: Warning: Master endpoint spark://ip:7077 was not a REST server. Falling back to legacy submission gateway instead.

2015-12-11 Thread Andy Davidson
Hi Andrew

You are correct I am using cluster mode.

Many thanks

Andy

From:  Andrew Or <and...@databricks.com>
Date:  Thursday, December 10, 2015 at 6:31 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  Jakob Odersky <joder...@gmail.com>, "user @spark"
<user@spark.apache.org>
Subject:  Re: Warning: Master endpoint spark://ip:7077 was not a REST
server. Falling back to legacy submission gateway instead.

> Hi Andy,
> 
> You must be running in cluster mode. The Spark Master accepts client mode
> submissions on port 7077 and cluster mode submissions on port 6066. This is
> because standalone cluster mode uses a REST API to submit applications by
> default. If you submit to port 6066 instead the warning should go away.
> 
> -Andrew
> 
> 
> 2015-12-10 18:13 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>:
>> Hi Jakob
>> 
>> The cluster was set up using the spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2
>> script
>> 
>> Given my limited knowledge I think this looks okay?
>> 
>> Thanks
>> 
>> Andy
>> 
>> $ sudo netstat -peant | grep 7077
>> 
>> tcp0  0 :::172-31-30-51:7077:::*
>> LISTEN  0  311641427355/java
>> 
>> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:57311
>> ESTABLISHED 0  311591927355/java
>> 
>> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42333
>> ESTABLISHED 0  373666427355/java
>> 
>> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:49796
>> ESTABLISHED 0  311592527355/java
>> 
>> tcp0  0 :::172-31-30-51:7077:::172-31-30-51:42290
>> ESTABLISHED 0  311592327355/java
>> 
>> 
>> 
>> $ ps -aux | grep 27355
>> 
>> Warning: bad syntax, perhaps a bogus '-'? See /usr/share/doc/procps-3.2.8/FAQ
>> 
>> ec2-user 23867  0.0  0.0 110404   872 pts/0S+   02:06   0:00 grep 27355
>> 
>> root 27355  0.5  6.7 3679096 515836 ?  Sl   Nov26 107:04
>> /usr/java/latest/bin/java -cp
>> /root/spark/sbin/../conf/:/root/spark/lib/spark-assembly-1.5.1-hadoop1.2.1.ja
>> r:/root/spark/lib/datanucleus-api-jdo-3.2.6.jar:/root/spark/lib/datanucleus-r
>> dbms-3.2.9.jar:/root/spark/lib/datanucleus-core-3.2.10.jar:/root/ephemeral-hd
>> fs/conf/ -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip
>> ec2-54-215-217-122.us-west-1.compute.amazonaws.com
>> <http://ec2-54-215-217-122.us-west-1.compute.amazonaws.com>  --port 7077
>> --webui-port 8080
>> 
>> 
>> From:  Jakob Odersky <joder...@gmail.com>
>> Date:  Thursday, December 10, 2015 at 5:55 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  "user @spark" <user@spark.apache.org>
>> Subject:  Re: Warning: Master endpoint spark://ip:7077 was not a REST server.
>> Falling back to legacy submission gateway instead.
>> 
>>> Is there any other process using port 7077?
>>> 
>>> On 10 December 2015 at 08:52, Andy Davidson <a...@santacruzintegration.com>
>>> wrote:
>>>> Hi
>>>> 
>>>> I am using spark-1.5.1-bin-hadoop2.6. Any idea why I get this warning. My
>>>> job seems to run with out any problem.
>>>> 
>>>> Kind regards 
>>>> 
>>>> Andy
>>>> 
>>>> + /root/spark/bin/spark-submit --class com.pws.spark.streaming.IngestDriver
>>>> --master spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>>>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077>
>>>> --total-executor-cores 2 --deploy-mode cluster
>>>> hdfs:///home/ec2-user/build/ingest-all.jar --clusterMode --dirPath week_3
>>>> 
>>>> Running Spark using the REST application submission protocol.
>>>> 
>>>> 15/12/10 16:46:33 WARN RestSubmissionClient: Unable to connect to server
>>>> spark://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>>>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077> .
>>>> 
>>>> Warning: Master endpoint
>>>> ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077
>>>> <http://ec2-54-205-209-122.us-west-1.compute.amazonaws.com:7077>  was not a
>>>> REST server. Falling back to legacy submission gateway instead.
>>> 
> 




looking for Spark streaming unit example written in Java

2015-12-15 Thread Andy Davidson
I am having a heck of a time writing a simple Junit test for my spark
streaming code. The best code example I have been able to find is
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ unfortunately
it is written in Spock and Scala. I am having trouble figuring out how to
get it to work in Java

Seems like the key to the scala example is the ClockWrapper class
https://github.com/mkuthan/example-spark/blob/master/src/test/scala/org/apac
he/spark/ClockWrapper.scala . I have not figured out how to get something
like that to work in Java.

My first unit test is very simple



public class ParseTest {

public static JavaStreamingContext javaStreamingContext= null;

public static SparkContext sparkContext = null;

public static JavaSparkContext javaSparkContext = null;

public static SQLContext sqlContext = null;

public static volatile boolean runningTestSuite = false;



@BeforeClass

public static void setUpBeforeClass() throws Exception {

if (!runningTestSuite) {

SparkConf conf = new
SparkConf().setMaster(master).setAppName(appName);



sparkContext = new SparkContext(conf);

javaSparkContext = new JavaSparkContext(sparkContext);

sqlContext = new org.apache.spark.sql.SQLContext(sparkContext);



Duration batchInterval = Durations.seconds(batchIntervalInSec);

javaStreamingContext = new
JavaStreamingContext(javaSparkContext, batchInterval);



String checkpointDir =
Files.createTempDirectory(appName).toString();

javaStreamingContext.checkpoint(checkpointDir);



runningTestSuite = true;

}

}



@AfterClass

public static void tearDownAfterClass() throws Exception {

if (runningTestSuite && javaSparkContext != null) {

javaSparkContext.stop();

javaSparkContext = null;

sqlContext = null;

sparkContext = null;

runningTestSuite = false;

}

}



@Test

public void test() {

 // 1) create a list of of pojo objects

...



 // 2) convert list of pojo objects to RDD

JavaRDD pojoRDD = javaSparkContext.parallelize(listOfPojos);



  // 3) create a  QueueInputDStream

Queue rddQueue = new LinkedList();

rddQueue.add(pojoRDD);



// covert to DStream

JavaDStream tweets =
javaStreamingContext.queueStream(rddQueue);



javaStreamingContext.start();

//javaStreamingContext.awaitTermination();



Thread.sleep(3 * 1000); // looks like this would not be needed if
ClockWrapper worked



Sample scala spock code has asserts after start(). In java I am not able to
work with the JavaDStreams after start

java.lang.IllegalStateException: Adding new inputs, transformations,
and output operations after starting a context is not supported

at 
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)

at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)

at 
org.apache.spark.streaming.dstream.ForEachDStream.(ForEachDStream.scal
a:26)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
am.scala:642)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
am.scala:642)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
7)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
8)

at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)

at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
:266)

at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:638)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$s
p(DStream.scala:631)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
am.scala:629)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
am.scala:629)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
7)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
8)

at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)

at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
:266)

at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)

at 
org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDSt
reamLike.scala:315)

at 
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaD
StreamLike.scala:43)

at 
com.pws.fantasySport.spark.streaming.util.JavaDStreamCount.hack(JavaDStreamC
ount.java:20)

at com.pws.fantasySport.ml.ParseTweetsTest.test(ParseTweetsTest.java:105)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 

looking for a easier way to count the number of items in a JavaDStream

2015-12-15 Thread Andy Davidson
I am writing  a JUnit test for some simple streaming code. I want to make
assertions about how many things are in a given JavaDStream. I wonder if
there is an easier way in Java to get the count?

I think there are two points of friction.

1. is it easy to create an accumulator of type double or int, How ever Long
is not supported
2. We need to use javaDStream.foreachRDD. The Function interface must return
void. I was not able to define an accumulator in my driver and use a lambda
function. (I am new to lambda in Java)
Here is a little lambda example that logs my test objects. I was not able to
figure out how to get  to return a value or access a accumulator
   data.foreachRDD(rdd -> {

logger.info(³Begin data.foreachRDD" );

for (MyPojo pojo : rdd.collect()) {

logger.info("\n{}", pojo.toString());

}

return null;

});



Any suggestions would be greatly appreciated

Andy

This following code works in my driver but is a lot of code for such a
trivial computation. Because it needs to the JavaSparkContext I do not think
it would work inside a closure. I assume the works do not have access to the
context as a global and that it shipping it in the closure is not a good
idea?

public class JavaDStreamCount implements Serializable {

private static final long serialVersionUID = -3600586183332429887L;

public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);



public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {

Count c = new Count(sc);

javaDStream.foreachRDD(c);

return c.getTotal().value();

}



class Count implements Function {

private static final long serialVersionUID = -5239727633710162488L;

Accumulator total;



public Count(JavaSparkContext sc) {

total = sc.accumulator(0.0);

}



@Override

public java.lang.Void call(JavaRDD rdd) throws Exception {

List data = rdd.collect();

int dataSize = data.size();

logger.error("data.size:{}", dataSize);

long num = rdd.count();

logger.error("num:{}", num);

total.add(new Double(num));

return null;

}



public Accumulator getTotal() {

return total;

}

}

}










problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

2016-01-04 Thread Andy Davidson
I am having a heck of a time writing a simple transformer in Java. I assume
that my Transformer is supposed to append a new column to the dataFrame
argument. Any idea why I get the following exception in Java 8 when I try to
call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
DataFrame 
  by adding a column or replacing the existing column that has the
same name.²


Also do transformers always run in the driver? If not I assume workers do
not have the sqlContext. Any idea how I can convert an javaRDD<> to a Column
with out a sqlContext?

Kind regards

Andy

P.s. I am using spark 1.6.0

org.apache.spark.sql.AnalysisException: resolved attribute(s)
filteredOutput#1 missing from rawInput#0 in operator !Project
[rawInput#0,filteredOutput#1 AS filteredOutput#2];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Chec
kAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:
44)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:183)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$
1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Che
ckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala
:44)
at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.
scala:34)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(Data
Frame.scala:2165)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
at 
com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)



public class StemmerTransformer extends Transformer implements Serializable
{

   String inputCol; // unit test sets to rawInput
   String outputCol; // unit test sets to filteredOutput

  Š


  public StemmerTransformer(SQLContext sqlContext) {

// will only work if transformers execute in the driver

this.sqlContext = sqlContext;

}


 @Override

public DataFrame transform(DataFrame df) {

df.printSchema();

df.show();



JavaRDD inRowRDD = df.select(inputCol).javaRDD();

JavaRDD outRowRDD = inRowRDD.map((Row row) -> {

// TODO add stemming code

// Create a new Row

Row ret = RowFactory.create("TODO");

return ret;

});



//can we create a Col from a JavaRDD?



List fields = new ArrayList();

boolean nullable = true;

fields.add(DataTypes.createStructField(outputCol,
DataTypes.StringType, nullable));



StructType schema =  DataTypes.createStructType(fields);

DataFrame outputDF = sqlContext.createDataFrame(outRowRDD, schema);

outputDF.printSchema();

outputDF.show();

Column newCol = outputDF.col(outputCol);



return df.withColumn(outputCol, newCol);

}



SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]

WARN  03:58:46 main o.a.h.u.NativeCodeLoader  line:62 Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable

root

 |-- rawInput: array (nullable = false)

 ||-- element: string (containsNull = true)



++

|rawInput|

++

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

++



root

 |-- filteredOutput: string (nullable = true)



+--+

|filteredOutput|

+--+

|  TODO|

|  TODO|

|  TODO|

+--+






Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

2016-01-06 Thread Andy Davidson
Hi Micheal

I really appreciate your help. I The following code works. Is there a way
this example can be added to the distribution to make it easier for future
java programmers? It look me a long time get to this simple solution.

I'll need to tweak this example a little to work with the new PipeLine save
functionality. We need the current sqlContext to register our UDF. I see if
I can pass this in the Param Map. I¹ll throw and exception is some one use
transform(df) 

public class StemmerTransformer extends Transformer implements Serializable
{

   void registerUDF() {

if (udf == null) {

udf = new UDF();

DataType returnType =
DataTypes.createArrayType(DataTypes.StringType);

sqlContext.udf().register(udfName, udf, returnType);

}

}



   @Override

public DataFrame transform(DataFrame df) {

df.printSchema();

df.show();



registerUDF();



DataFrame ret = df.selectExpr("*", "StemUDF(rawInput) as
filteredOutput");

return ret;

}



   class UDF implements UDF1 {

private static final long serialVersionUID = 1L;



@Override

public List call(WrappedArray wordsArg) throws
Exception {

List words = JavaConversions.asJavaList(wordsArg);

ArrayList ret = new ArrayList(words.size());

for (String word : words) {

// TODO replace test code

ret.add(word + "_stemed");

}

   

return ret;

}

}

}



root

 |-- rawInput: array (nullable = false)

 ||-- element: string (containsNull = true)



++

|rawInput|

++

|[I, saw, the, red...|

|[Mary, had, a, li...|

|[greet, greeting,...|

++



root

 |-- rawInput: array (nullable = false)

 ||-- element: string (containsNull = true)

 |-- filteredOutput: array (nullable = true)

 ||-- element: string (containsNull = true)



+--+
---+

|rawInput  |filteredOutput
|

+--+
---+

|[I, saw, the, red, baloon]|[I_stemed, saw_stemed, the_stemed,
red_stemed, baloon_stemed]  |

|[Mary, had, a, little, lamb]  |[Mary_stemed, had_stemed, a_stemed,
little_stemed, lamb_stemed]|

|[greet, greeting, greets, greeted]|[greet_stemed, greeting_stemed,
greets_stemed, greeted_stemed] |

+--+
---+



From:  Michael Armbrust 
Date:  Tuesday, January 5, 2016 at 12:58 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

>> I am trying to implement  org.apache.spark.ml 
>> .Transformer interface in Java 8.
>> My understanding is the sudo code for transformers is something like
>> @Override
>> 
>> public DataFrame transform(DataFrame df) {
>> 
>> 1. Select the input column
>> 
>> 2. Create a new column
>> 
>> 3. Append the new column to the df argument and return
>> 
>>}
> 
> 
> The following line can be used inside of the transform function to return a
> Dataframe that has been augmented with a new column using the stem lambda
> function (defined as a UDF below).
> return df.withColumn("filteredInput", expr("stem(rawInput)"));
> This is producing a new column called filterInput (that is appended to
> whatever columns are already there) by passing the column rawInput to your
> arbitrary lambda function.
>  
>> Based on my experience the current DataFrame api is very limited. You can not
>> apply a complicated lambda function. As a work around I convert the data
>> frame to a JavaRDD, apply my complicated lambda, and then convert the
>> resulting RDD back to a Data Frame.
> 
> 
> This is exactly what this code is doing.  You are defining an arbitrary lambda
> function as a UDF.  The difference here, when compared to a JavaRDD map, is
> that you can use this UDF to append columns without having to manually append
> the new data to some existing object.
> sqlContext.udf().register("stem", new UDF1() {
>   @Override
>   public String call(String str) {
> return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
>> Now I select the ³new column² from the Data Frame and try to call
>> df.withColumn().
>> 
>> 
>> 
>> I can try an implement this as a UDF. How ever I need to use several 3rd
>> party jars. Any idea how insure the workers will have the required jar files?
>> If I was submitting a normal java app I would create an uber jar will this
>> work with UDFs?
> 
> 
> Yeah, UDFs are run the same way 

Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

2016-01-05 Thread Andy Davidson
Hi Michael

I am not sure you under stand my code correct.

I am trying to implement  org.apache.spark.ml.Transformer interface in Java
8.


My understanding is the sudo code for transformers is something like
@Override

public DataFrame transform(DataFrame df) {

1. Select the input column

2. Create a new column

3. Append the new column to the df argument and return

   }



Based on my experience the current DataFrame api is very limited. You can
not apply a complicated lambda function. As a work around I convert the data
frame to a JavaRDD, apply my complicated lambda, and then convert the
resulting RDD back to a Data Frame.



Now I select the ³new column² from the Data Frame and try to call
df.withColumn().



I can try an implement this as a UDF. How ever I need to use several 3rd
party jars. Any idea how insure the workers will have the required jar
files? If I was submitting a normal java app I would create an uber jar will
this work with UDFs?



Kind regards



Andy



From:  Michael Armbrust <mich...@databricks.com>
Date:  Monday, January 4, 2016 at 11:14 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

> Its not really possible to convert an RDD to a Column.  You can think of a
> Column as an expression that produces a single output given some set of input
> columns.  If I understand your code correctly, I think this might be easier to
> express as a UDF:
> sqlContext.udf().register("stem", new UDF1<String, String>() {
>   @Override
>   public String call(String str) {
> return // TODO: stemming code here
>   }
> }, DataTypes.StringType);
> DataFrame transformed = df.withColumn("filteredInput",
> expr("stem(rawInput)"));
> 
> On Mon, Jan 4, 2016 at 8:08 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am having a heck of a time writing a simple transformer in Java. I assume
>> that my Transformer is supposed to append a new column to the dataFrame
>> argument. Any idea why I get the following exception in Java 8 when I try to
>> call DataFrame withColumn()? The JavaDoc says withColumn() "Returns a new
>> DataFrame 
>> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrame.
>> html>  by adding a column or replacing the existing column that has the same
>> name.²
>> 
>> 
>> Also do transformers always run in the driver? If not I assume workers do not
>> have the sqlContext. Any idea how I can convert an javaRDD<> to a Column with
>> out a sqlContext?
>> 
>> Kind regards
>> 
>> Andy
>> 
>> P.s. I am using spark 1.6.0
>> 
>> org.apache.spark.sql.AnalysisException: resolved attribute(s)
>> filteredOutput#1 missing from rawInput#0 in operator !Project
>> [rawInput#0,filteredOutput#1 AS filteredOutput#2];
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(Check
>> Analysis.scala:38)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:4
>> 4)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:183)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1
>> .apply(CheckAnalysis.scala:50)
>> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(Chec
>> kAnalysis.scala:50)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:
>> 44)
>> at 
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.s
>> cala:34)
>> at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
>> at org.apache.spark.sql.DataFrame.org
>> <http://org.apache.spark.sql.DataFrame.org>
>> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
>> at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
>> at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1227)
>> at com.pws.poc.ml.StemmerTransformer.transform(StemmerTransformer.java:110)
>> at com.pws.poc.ml.StemmerTransformerTest.test(StemmerTransformerTest.java:45)
>> 
>> 
>> 
>> public class StemmerTransformer extends Transformer implements Serializable {
>> 
>>String inputCol; // unit test sets to rawInput
>>String outputCol; // unit test sets to filteredOutput
>> 
>>   Š
>> 
>> 
>>   public Ste

Re: How to use Java8

2016-01-05 Thread Andy Davidson

Hi Sea

From:  Sea <261810...@qq.com>
Date:  Tuesday, January 5, 2016 at 6:16 PM
To:  "user @spark" 
Subject:  How to use Java8

> Hi, all
> I want to support java8, I use JDK1.8.0_65 in production environment, but
> it doesn't work. Should I build spark using jdk1.8, and set
> 1.8 in pom.xml?
> 
> java.lang.UnsupportedClassVersionError:  Unsupported major.minor version 52.



Here are some notes I wrote about how to configure my data center to use
java 8. You’ll probably need to do something like this

Your mileage may vary

Andy

Setting Java_HOME
ref: configure env vars
 
install java 8 on all nodes (master and slave)
install java 1.8 on master
$ ssh -i $KEY_FILE root@$SPARK_MASTER
# ?? how was this package download from oracle? curl?
yum install jdk-8u65-linux-x64.rpm
copy rpm to slaves and install java 1.8 on slaves
for i in `cat /root/spark-ec2/slaves`;do scp
/home/ec2-user/jdk-8u65-linux-x64.rpm $i:; done
pssh -i -h /root/spark-ec2/slaves ls -l
pssh -i -h /root/spark-ec2/slaves yum install -y jdk-8u65-linux-x64.rpm
remove rpm from slaves. It is 153M
pssh -i -h /root/spark-ec2/slaves rm jdk-8u65-linux-x64.rpm
Configure spark to use java 1.8
ref: configure env vars
 
Make a back up of of config file
cp /root/spark/conf/spark-env.sh /root/spark/conf/spark-env.sh-`date
+%Y-%m-%d:%H:%M:%S`

pssh -i -h /root/spark-ec2/slaves cp /root/spark/conf/spark-env.sh
/root/spark/conf/spark-env.sh-`date +%Y-%m-%d:%H:%M:%S`

pssh -i -h /root/spark-ec2/slaves ls "/root/spark/conf/spark-env.sh*"
Edit /root/spark/conf/spark-env.sh, add
 export JAVA_HOME=/usr/java/latest
Copy spark-env.sh to slaves
pssh -i -h /root/spark-ec2/slaves grep JAVA_HOME
/root/spark/conf/spark-env.sh

for i in `cat /root/spark-ec2/slaves`;do scp /root/spark/conf/spark-env.sh
$i:/root/spark/conf/spark-env.sh; done

pssh -i -h /root/spark-ec2/slaves grep JAVA_HOME
/root/spark/conf/spark-env.sh

> 




Re: problem with DataFrame df.withColumn() org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

2016-01-06 Thread Andy Davidson
Hi Michael

I am happy to add some documentation.

I forked the repo but am having trouble with the markdown. The code examples
are not rendering correctly. I am on a mac and using
https://itunes.apple.com/us/app/marked-2/id890031187?mt=12

I use a emacs or some other text editor to change the md.

What tools do you use for editing viewing spark markdown files?

Andy



From:  Michael Armbrust 
Date:  Wednesday, January 6, 2016 at 11:09 AM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: problem with DataFrame df.withColumn()
org.apache.spark.sql.AnalysisException: resolved attribute(s) missing

>> I really appreciate your help. I The following code works.
> 
> Glad you got it to work!
> 
>> Is there a way this example can be added to the distribution to make it
>> easier for future java programmers? It look me a long time get to this simple
>> solution.
> 
> I'd welcome a pull request that added UDFs to the programming guide section on
> dataframes:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-opera
> tions




Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-29 Thread Andy Davidson
Hi Michael

https://github.com/apache/spark/archive/v1.6.0.tar.gz

Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before
saving output. Coalesce still fails.

Coalesce(1) spark-1.5.2
   Caused by:

java.io.IOException: Unable to acquire 33554432 bytes of memory


Coalesce(1) spark-1.6.0

   Caused by:

java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory,
got 0


Hope this helps

Andy

From:  Michael Armbrust <mich...@databricks.com>
Date:  Monday, December 28, 2015 at 2:41 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble understanding data frame memory usage
³java.io.IOException: Unable to acquire memory²

> Unfortunately in 1.5 we didn't force operators to spill when ran out of memory
> so there is not a lot you can do.  It would be awesome if you could test with
> 1.6 and see if things are any better?
> 
> On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am using spark 1.5.1. I am running into some memory problems with a java
>> unit test. Yes I could fix it by setting ­Xmx (its set to 1024M) how ever I
>> want to better understand what is going on so I can write better code in the
>> future. The test runs on a Mac, master="Local[2]"
>> 
>> I have a java unit test that starts by reading a 672K ascii file. I my output
>> data file is 152K. Its seems strange that such a small amount of data would
>> cause an out of memory exception. I am running a pretty standard machine
>> learning process
>> 
>> 1. Load data
>> 2. create a ML pipeline
>> 3. transform the data
>> 4. Train a model
>> 5. Make predictions
>> 6. Join the predictions back to my original data set
>> 7. Coalesce(1), I only have a small amount of data and want to save it in a
>> single file
>> 8. Save final results back to disk
>> 
>> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to
>> acquire memory”
>> 
>> To try and figure out what is going I put log messages in to count the number
>> of partitions
>> 
>> Turns out I have 20 input files, each one winds up in a separate partition.
>> Okay so after loading I call coalesce(1) and check to make sure I only have a
>> single partition.
>> 
>> The total number of observations is 1998.
>> 
>> After calling step 7 I count the number of partitions and discovered I have
>> 224 partitions!. Surprising given I called Coalesce(1) before I did anything
>> with the data. My data set should easily fit in memory. When I save them to
>> disk I get 202 files created with 162 of them being empty!
>> 
>> In general I am not explicitly using cache.
>> 
>> Some of the data frames get registered as tables. I find it easier to use
>> sql.
>> 
>> Some of the data frames get converted back to RDDs. I find it easier to
>> create RDD this way
>> 
>> I put calls to unpersist(true). In several places
>> 
>>private void memoryCheck(String name) {
>> 
>> Runtime rt = Runtime.getRuntime();
>> 
>> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size:
>> {}", 
>> 
>> name,
>> 
>> String.format("%,d", rt.totalMemory()),
>> 
>> String.format("%,d", rt.freeMemory()));
>> 
>> }
>> 
>> 
>> Any idea how I can get a better understanding of what is going on? My goal is
>> to learn to write better spark code.
>> 
>> Kind regards
>> 
>> Andy
>> 
>> Memory usages at various points in my unit test
>> name: rawInput totalMemory:   447,741,952 freeMemory:   233,203,184
>> 
>> name: naiveBayesModel totalMemory:   509,083,648 freeMemory:   403,504,128
>> 
>> name: lpRDD totalMemory:   509,083,648 freeMemory:   402,288,104
>> 
>> name: results totalMemory:   509,083,648 freeMemory:   368,011,008
>> 
>> 
>> 
>>DataFrame exploreDF = results.select(results.col("id"),
>> 
>> results.col("label"),
>> 
>> results.col("binomialLabel"),
>> 
>> results.col("labelIndex"),
>> 
>> results.col("prediction"),
>> 
>> results.col("words"));
>> 
>> exploreDF.show(10);
>> 
>> 
>> 
>> Yes I realize its strange to switch styles how ever this should not cause
>> memory problems
>> 
>> 
>> 
>> final String exploreTable = "exploreTable";
>> 
>> exploreDF.registerTempTable(exploreTable);
>> 
>> String fmt = "SELECT * FROM %s where binomialLabel = ’signal'";
>> 
>> String stmt = String.format(fmt, exploreTable);
>> 
>> 
>> 
>> DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100);
>> 
>> 
>> 
>> name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144
>> 
>> 
>> 
>> exploreDF.unpersist(true); does not resolve memory issue
>> 
>> 
>> 
> 




does HashingTF maintain a inverse index?

2015-12-31 Thread Andy Davidson
Hi 

I am working on proof of concept. I am trying to use spark to classify some
documents. I am using tokenizer and hashingTF to convert the documents into
vectors. Is there any easy way to map feature back to words or do I need to
maintain the reverse index my self? I realize there is a chance some words
map to same buck

Kind regards

Andy





Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Andy Davidson
Hi Unk1102

I also had trouble when I used coalesce(). Reparation() worked much better.
Keep in mind if you have a large number of portions you are probably going
have high communication costs.

Also my code works a lot better on 1.6.0. DataFrame memory was not be
spilled in 1.5.2. In 1.6.0 unpersist() actually frees up memory

Another strange thing I noticed in 1.5.1 was that I had thousands of
partitions. Many of them where empty. Have lots of empty partitions really
slowed things down

Andy

From:  unk1102 
Date:  Tuesday, January 5, 2016 at 11:58 AM
To:  "user @spark" 
Subject:  coalesce(1).saveAsTextfile() takes forever?

> hi I am trying to save many partitions of Dataframe into one CSV file and it
> take forever for large data sets of around 5-6 GB.
> 
> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzi
> p").save("/path/hadoop")
> 
> For small data above code works well but for large data it hangs forever
> does not move on because of only one partitions has to shuffle data of GBs
> please help me
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-
> takes-forever-tp25886.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




how to extend java transformer from Scala UnaryTransformer ?

2016-01-01 Thread Andy Davidson
I am trying to write a trivial transformer I use use in my pipeline. I am
using java and spark 1.5.2. It was suggested that I use the Tokenize.scala
class as an example. This should be very easy how ever I do not understand
Scala, I am having trouble debugging the following exception.

Any help would be greatly appreciated.

Happy New Year

Andy

java.lang.IllegalArgumentException: requirement failed: Param null__inputCol
does not belong to Stemmer_2f3aa96d-7919-4eaa-ad54-f7c620b92d1c.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.param.Params$class.shouldOwn(params.scala:557)
at org.apache.spark.ml.param.Params$class.set(params.scala:436)
at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37)
at org.apache.spark.ml.param.Params$class.set(params.scala:422)
at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37)
at org.apache.spark.ml.UnaryTransformer.setInputCol(Transformer.scala:83)
at com.pws.xxx.ml.StemmerTest.test(StemmerTest.java:30)



public class StemmerTest extends AbstractSparkTest {

@Test

public void test() {

Stemmer stemmer = new Stemmer()

.setInputCol("raw²) //line 30

.setOutputCol("filtered");

}

}


/**

 * @ see 
spark-1.5.1/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

 * @ see 
https://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive
-bayes-on-apache-spark-mllib/

 * @ see 
http://www.tonytruong.net/movie-rating-prediction-with-apache-spark-and-hort
onworks/

 * 

 * @author andrewdavidson

 *

 */

public class Stemmer extends UnaryTransformer implements Serializable{

static Logger logger = LoggerFactory.getLogger(Stemmer.class);

private static final long serialVersionUID = 1L;

private static final  ArrayType inputType =
DataTypes.createArrayType(DataTypes.StringType, true);

private final String uid = Stemmer.class.getSimpleName() + "_" +
UUID.randomUUID().toString();



@Override

public String uid() {

return uid;

}



/*

   override protected def validateInputType(inputType: DataType): Unit =
{

require(inputType == StringType, s"Input type must be string type but
got $inputType.")

  }

 */

@Override

public void validateInputType(DataType inputTypeArg) {

String msg = "inputType must be " + inputType.simpleString() + " but
got " + inputTypeArg.simpleString();

assert (inputType.equals(inputTypeArg)) : msg;

}



@Override

public Function1 createTransformFunc() {

// 
http://stackoverflow.com/questions/6545066/using-scala-from-java-passing-fun
ctions-as-parameters

Function1 f = new
AbstractFunction1() {

public List apply(List words) {

for(String word : words) {

logger.error("AEDWIP input word: {}", word);

}

return words;

}

};



return f;

}



@Override

public DataType outputDataType() {

return DataTypes.createArrayType(DataTypes.StringType, true);

}

}




trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-28 Thread Andy Davidson
I am using spark 1.5.1. I am running into some memory problems with a java
unit test. Yes I could fix it by setting ­Xmx (its set to 1024M) how ever I
want to better understand what is going on so I can write better code in the
future. The test runs on a Mac, master="Local[2]"

I have a java unit test that starts by reading a 672K ascii file. I my
output data file is 152K. Its seems strange that such a small amount of data
would cause an out of memory exception. I am running a pretty standard
machine learning process

1. Load data
2. create a ML pipeline
3. transform the data
4. Train a model
5. Make predictions
6. Join the predictions back to my original data set
7. Coalesce(1), I only have a small amount of data and want to save it in a
single file
8. Save final results back to disk

Step 7: I am unable to call Coalesce() ³java.io.IOException: Unable to
acquire memory²

To try and figure out what is going I put log messages in to count the
number of partitions

Turns out I have 20 input files, each one winds up in a separate partition.
Okay so after loading I call coalesce(1) and check to make sure I only have
a single partition.

The total number of observations is 1998.

After calling step 7 I count the number of partitions and discovered I have
224 partitions!. Surprising given I called Coalesce(1) before I did anything
with the data. My data set should easily fit in memory. When I save them to
disk I get 202 files created with 162 of them being empty!

In general I am not explicitly using cache.

Some of the data frames get registered as tables. I find it easier to use
sql.

Some of the data frames get converted back to RDDs. I find it easier to
create RDD this way

I put calls to unpersist(true). In several places

   private void memoryCheck(String name) {

Runtime rt = Runtime.getRuntime();

logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size:
{}", 

name,

String.format("%,d", rt.totalMemory()),

String.format("%,d", rt.freeMemory()));

}


Any idea how I can get a better understanding of what is going on? My goal
is to learn to write better spark code.

Kind regards

Andy

Memory usages at various points in my unit test
name: rawInput totalMemory:   447,741,952 freeMemory:   233,203,184

name: naiveBayesModel totalMemory:   509,083,648 freeMemory:   403,504,128

name: lpRDD totalMemory:   509,083,648 freeMemory:   402,288,104

name: results totalMemory:   509,083,648 freeMemory:   368,011,008



   DataFrame exploreDF = results.select(results.col("id"),

results.col("label"),

results.col("binomialLabel"),

results.col("labelIndex"),

results.col("prediction"),

results.col("words"));

exploreDF.show(10);



Yes I realize its strange to switch styles how ever this should not cause
memory problems



final String exploreTable = "exploreTable";

exploreDF.registerTempTable(exploreTable);

String fmt = "SELECT * FROM %s where binomialLabel = ¹signal'";

String stmt = String.format(fmt, exploreTable);



DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100);



name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144



exploreDF.unpersist(true); does not resolve memory issue







how to debug java.lang.IllegalArgumentException: object is not an instance of declaring class

2015-12-24 Thread Andy Davidson
Hi 

Any idea how I can debug this problem. I suspect the problem has to do with
how I am converting a JavaRDD> to a DataFrame.

Is it boxing problem? I tried to use long and double instead of Long and
Double when ever possible.

Thanks in advance, Happy Holidays.

Andy

allData.printSchema()
root

 |-- label: string (nullable = true)

 |-- text: string (nullable = true)

 |-- id: long (nullable = true)

 |-- createdAt: long (nullable = true)

 |-- binomialLabel: string (nullable = true)

 |-- words: array (nullable = true)

 ||-- element: string (containsNull = true)

 |-- features: vector (nullable = true)

 |-- labelIndex: double (nullable = true)



//

// make predictions using all the data

// The real out of sample error will be higher

//

JavaRDD> predictions =
idLabeledPoingRDD.map((Tuple2 t2) -> {

Long id = t2._1();

LabeledPoint lp = t2._2();

double prediction = naiveBayesModel.predict(lp.features());

return new Tuple2(id, prediction);

});



  

public class Prediction {

double prediction;

long id;

Public Getters and setters Š

}

DataFrame predictionDF = sqlContext.createDataFrame(predictions,
Prediction.class);


predictionDF.printSchema()
root

 |-- id: long (nullable = false)

 |-- prediction: double (nullable = false)



DataFrame results = allData.join(predictionDF, "id");

results.show()

Here is the top of long stack trace. I do not know how it relates back to my
code. I do not see any of my class, colNames, function names, Š

java.lang.IllegalArgumentException: object is not an instance of declaring
class

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_66]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
) ~[na:1.8.0_66]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43) ~[na:1.8.0_66]

at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala
:33) ~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext
.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext
.scala:498) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassM
ergeSortShuffleWriter.java:119) ~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scal
a:73) ~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at org.apache.spark.scheduler.Task.run(Task.scala:88)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42) [na:1.8.0_66]

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17) [na:1.8.0_66]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]




Re: how to debug java.lang.IllegalArgumentException: object is not an instance of declaring class

2015-12-24 Thread Andy Davidson
Problem must be with how I am converting  JavaRDD> to a
DataFrame. 

Any suggestions? Most of my work has been done using pySpark. Tuples are a
lot harder to work with in Java.

  JavaRDD> predictions =
idLabeledPoingRDD.map((Tuple2 t2) -> {

Long id = t2._1();

LabeledPoint lp = t2._2();

double prediction = naiveBayesModel.predict(lp.features());

return new Tuple2(id, prediction);

});



List> debug = predictions.take(3);

for (Tuple2 t : debug) {

logger.warn("prediction: {}", t.toString());

}



//

// evaluate

//

DataFrame predictionDF = sqlContext.createDataFrame(predictions,
Prediction.class);

predictionDF.printSchema();

predictionDF.show();

   



java.lang.IllegalArgumentException: object is not an instance of declaring
class

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_66]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
) ~[na:1.8.0_66]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43) ~[na:1.8.0_66]

at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala
:33) ~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]


From:  Andrew Davidson 
Date:  Thursday, December 24, 2015 at 9:55 AM
To:  "user @spark" 
Subject:  how to debug java.lang.IllegalArgumentException: object is not an
instance of declaring class

> Hi 
> 
> Any idea how I can debug this problem. I suspect the problem has to do with
> how I am converting a JavaRDD> to a DataFrame.
> 
> Is it boxing problem? I tried to use long and double instead of Long and
> Double when ever possible.
> 
> Thanks in advance, Happy Holidays.
> 
> Andy
> 
> allData.printSchema()
> root
> 
>  |-- label: string (nullable = true)
> 
>  |-- text: string (nullable = true)
> 
>  |-- id: long (nullable = true)
> 
>  |-- createdAt: long (nullable = true)
> 
>  |-- binomialLabel: string (nullable = true)
> 
>  |-- words: array (nullable = true)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- features: vector (nullable = true)
> 
>  |-- labelIndex: double (nullable = true)
> 
> 
> 
> //
> 
> // make predictions using all the data
> 
> // The real out of sample error will be higher
> 
> //
> 
> JavaRDD> predictions =
> idLabeledPoingRDD.map((Tuple2 t2) -> {
> 
> Long id = t2._1();
> 
> LabeledPoint lp = t2._2();
> 
> double prediction = naiveBayesModel.predict(lp.features());
> 
> return new Tuple2(id, prediction);
> 
> });
> 
> 
> 
>   
> 
> public class Prediction {
> 
> double prediction;
> 
> long id;
> 
> Public Getters and setters Š
> 
> }
> 
> DataFrame predictionDF = sqlContext.createDataFrame(predictions,
> Prediction.class);
> 
> 
> predictionDF.printSchema()
> root
> 
>  |-- id: long (nullable = false)
> 
>  |-- prediction: double (nullable = false)
> 
> 
> 
> DataFrame results = allData.join(predictionDF, "id");
> 
> results.show()
> 
> Here is the top of long stack trace. I do not know how it relates back to my
> code. I do not see any of my class, colNames, function names, Š
> 
> java.lang.IllegalArgumentException: object is not an instance of declaring
> class
> 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_66]
> 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:1.8.0_66]
> 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.j
> ava:43) ~[na:1.8.0_66]
> 
> at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]
> 
> at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.a
> 

Re: how to use sparkR or spark MLlib load csv file on hdfs then calculate covariance

2015-12-28 Thread Andy Davidson
Hi Yanbo

I use spark.csv to load my data set. I work with both Java and Python. I
would recommend you print the first couple of rows and also print the schema
to make sure your data is loaded as you expect. You might find the following
code example helpful. You may need to programmatically set the schema
depending on what you data looks like


public class LoadTidyDataFrame {

static  DataFrame fromCSV(SQLContext sqlContext, String file) {

DataFrame df = sqlContext.read()

.format("com.databricks.spark.csv")

.option("inferSchema", "true")

.option("header", "true")

.load(file);



return df;

}

}




From:  Yanbo Liang 
Date:  Monday, December 28, 2015 at 2:30 AM
To:  zhangjp <592426...@qq.com>
Cc:  "user @spark" 
Subject:  Re: how to use sparkR or spark MLlib load csv file on hdfs then
calculate covariance

> Load csv file:
> df <- read.df(sqlContext, "file-path", source = "com.databricks.spark.csv",
> header = "true")
> Calculate covariance:
> cov <- cov(df, "col1", "col2")
> 
> Cheers
> Yanbo
> 
> 
> 2015-12-28 17:21 GMT+08:00 zhangjp <592426...@qq.com>:
>> hi  all,
>> I want  to use sparkR or spark MLlib  load csv file on hdfs then
>> calculate  covariance, how to do it .
>> thks.
> 




what is the difference between coalese() and repartition() ?Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-28 Thread Andy Davidson
Hi Michael

I’ll try 1.6 and report back.

The java doc does not say much about coalesce() or repartition(). When I use
reparation() just before I save my output everything runs as expected

I though coalesce() is an optimized version of reparation() and should be
used when ever we know we are reducing the number of partitions.

Kind regards

Andy

From:  Michael Armbrust <mich...@databricks.com>
Date:  Monday, December 28, 2015 at 2:41 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble understanding data frame memory usage
³java.io.IOException: Unable to acquire memory²

> Unfortunately in 1.5 we didn't force operators to spill when ran out of memory
> so there is not a lot you can do.  It would be awesome if you could test with
> 1.6 and see if things are any better?
> 
> On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I am using spark 1.5.1. I am running into some memory problems with a java
>> unit test. Yes I could fix it by setting ­Xmx (its set to 1024M) how ever I
>> want to better understand what is going on so I can write better code in the
>> future. The test runs on a Mac, master="Local[2]"
>> 
>> I have a java unit test that starts by reading a 672K ascii file. I my output
>> data file is 152K. Its seems strange that such a small amount of data would
>> cause an out of memory exception. I am running a pretty standard machine
>> learning process
>> 
>> 1. Load data
>> 2. create a ML pipeline
>> 3. transform the data
>> 4. Train a model
>> 5. Make predictions
>> 6. Join the predictions back to my original data set
>> 7. Coalesce(1), I only have a small amount of data and want to save it in a
>> single file
>> 8. Save final results back to disk
>> 
>> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to
>> acquire memory”
>> 
>> To try and figure out what is going I put log messages in to count the number
>> of partitions
>> 
>> Turns out I have 20 input files, each one winds up in a separate partition.
>> Okay so after loading I call coalesce(1) and check to make sure I only have a
>> single partition.
>> 
>> The total number of observations is 1998.
>> 
>> After calling step 7 I count the number of partitions and discovered I have
>> 224 partitions!. Surprising given I called Coalesce(1) before I did anything
>> with the data. My data set should easily fit in memory. When I save them to
>> disk I get 202 files created with 162 of them being empty!
>> 
>> In general I am not explicitly using cache.
>> 
>> Some of the data frames get registered as tables. I find it easier to use
>> sql.
>> 
>> Some of the data frames get converted back to RDDs. I find it easier to
>> create RDD this way
>> 
>> I put calls to unpersist(true). In several places
>> 
>>private void memoryCheck(String name) {
>> 
>> Runtime rt = Runtime.getRuntime();
>> 
>> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size:
>> {}", 
>> 
>> name,
>> 
>> String.format("%,d", rt.totalMemory()),
>> 
>> String.format("%,d", rt.freeMemory()));
>> 
>> }
>> 
>> 
>> Any idea how I can get a better understanding of what is going on? My goal is
>> to learn to write better spark code.
>> 
>> Kind regards
>> 
>> Andy
>> 
>> Memory usages at various points in my unit test
>> name: rawInput totalMemory:   447,741,952 freeMemory:   233,203,184
>> 
>> name: naiveBayesModel totalMemory:   509,083,648 freeMemory:   403,504,128
>> 
>> name: lpRDD totalMemory:   509,083,648 freeMemory:   402,288,104
>> 
>> name: results totalMemory:   509,083,648 freeMemory:   368,011,008
>> 
>> 
>> 
>>DataFrame exploreDF = results.select(results.col("id"),
>> 
>> results.col("label"),
>> 
>> results.col("binomialLabel"),
>> 
>> results.col("labelIndex"),
>> 
>> results.col("prediction"),
>> 
>> results.col("words"));
>> 
>> exploreDF.show(10);
>> 
>> 
>> 
>> Yes I realize its strange to switch styles how ever this should not cause
>> memory problems
>> 
>> 
>> 
>> final String exploreTable = "exploreTable";
>> 
>> exploreDF.registerTempTable(exploreTable);
>> 
>> String fmt = "SELECT * FROM %s where binomialLabel = ’signal'";
>> 
>> String stmt = String.format(fmt, exploreTable);
>> 
>> 
>> 
>> DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100);
>> 
>> 
>> 
>> name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144
>> 
>> 
>> 
>> exploreDF.unpersist(true); does not resolve memory issue
>> 
>> 
>> 
> 




trouble implementing Transformer and calling DataFrame.withColumn()

2015-12-21 Thread Andy Davidson

I am trying to port the following python function to Java 8. I would like my
java implementation to implement Transformer so I can use it in a pipeline.

I am having a heck of a time trying to figure out how to create a Column
variable I can pass to DataFrame.withColumn(). As far as I know withColumn()
the only way to append a column to a data frame.

Any comments or suggestions would be greatly appreciated

Andy


def convertMultinomialLabelToBinary(dataFrame):
newColName = "binomialLabel"
binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else
³signal", StringType())
ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
return ret

trainingDF2 = convertMultinomialLabelToBinary(trainingDF1)


public class LabelToBinaryTransformer extends Transformer {

private static final long serialVersionUID = 4202800448830968904L;

private  final UUID uid = UUID.randomUUID();

public String inputCol;

public String outputCol;



@Override

public String uid() {

return uid.toString();

}



@Override

public Transformer copy(ParamMap pm) {

Params xx = defaultCopy(pm);

return ???;

}



@Override

public DataFrame transform(DataFrame df) {

MyUDF myUDF = new MyUDF(myUDF, null, null);

Column c = df.col(inputCol);

??? UDF apply does not take a col

Column col = myUDF.apply(df.col(inputCol));

DataFrame ret = df.withColumn(outputCol, col);

return ret;

}



@Override

public StructType transformSchema(StructType arg0) {

   ??? What is this function supposed to do???

  ???Is this the type of the new output column

}



class MyUDF extends UserDefinedFunction {

public MyUDF(Object f, DataType dataType, Seq inputTypes)
{

super(f, dataType, inputTypes);

??? Why do I have to implement this constructor ???

??? What are the arguments ???

}



@Override

public

Column apply(scala.collection.Seq exprs) {

What do you do with a scala seq?

return ???;

}

}

}







is Kafka Hard to configure? Does it have a high cost of ownership?

2015-12-21 Thread Andy Davidson
Hi 

I realize this is a little off topic. My project needs to install something
like Kafka. The engineer working on that part of the system has been having
a lot of trouble configuring a single node implementation. He has lost a lot
of time and wants to switch to something else. Our team does not have any
Kafka experience.

What has your experience been? Does Kafka have a high cost of ownership?
I.E. Is it hard to configure, maintain and operate?

Kind regards

Andy




Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Sabarish

I am but a simple padawan :-) I do not understand your answer. Why would
Spark be creating so many empty partitions? My real problem is my
application is very slow. I happened to notice thousands of empty files
being created. I thought this is a hint to why my app is slow.

My program calls sample( 0.01).filter(not null).saveAsTextFile(). This takes
about 35 min, to scan 500,000 JSON strings and write 5000 to disk. The total
data writing in 38M.

The data is read from HDFS. My understanding is Spark can not know in
advance how HDFS partitioned the data. Spark knows I have a master and 3
slaves machines. It knows how many works/executors are assigned to my Job. I
would expect spark would be smart enough not create more partitions than I
have worker machines?

Also given I am not using any key/value operations like Join() or doing
multiple scans I would assume my app would not benefit from partitioning.


Kind regards

Andy


From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
Date:  Saturday, November 21, 2015 at 7:20 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> 
> Those are empty partitions. I don't see the number of partitions specified in
> code. That then implies the default parallelism config is being used and is
> set to a very high number, the sum of empty + non empty files.
> 
> Regards
> Sab
> 
> On 21-Nov-2015 11:59 pm, "Andy Davidson" <a...@santacruzintegration.com>
> wrote:
>> I start working on a very simple ETL pipeline for a POC. It reads a in a data
>> set of tweets stored as JSON strings on in HDFS and randomly selects 1% of
>> the observations and writes them to HDFS. It seems to run very slowly. E.G.
>> To write 4720 observations takes 1:06:46.577795. I
>> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>> 
>> I assume creating all these empty files must be slowing down the system. Any
>> idea why this is happening? Do I have write a script to periodical remove
>> empty files?
>> 
>> 
>> Kind regards
>> 
>> Andy
>> 
>> tweetStrings = sc.textFile(inputDataURL)
>> 
>> 
>> def removeEmptyLines(line) :
>> if line:
>> return True
>> else :
>> emptyLineCount.add(1);
>> return False
>> 
>> emptyLineCount = sc.accumulator(0)
>> sample = (tweetStrings.filter(removeEmptyLines)
>>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>> 
>> startTime = datetime.datetime.now()
>> sample.saveAsTextFile(saveDataURL)
>> 
>> endTime = datetime.datetime.now()
>> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>> 
>> elapsed time:1:06:46.577795
>> 
>> 
>> Total number of empty files
>> $ hadoop fs -du {saveDataURL} | grep '^0' | wc ­l
>> 223515
>> Total number of files with data
>> $ hadoop fs -du {saveDataURL} | grep ­v '^0' | wc ­l
>> 4642
>> 
>> 
>> 
>> I randomly pick a part file. It¹s size is 9251




Re: Adding more slaves to a running cluster

2015-11-25 Thread Andy Davidson
Hi Dillian and Nicholas

If you figure out how to do this please post your recipe. It would be very
useful 

andy

From:  Nicholas Chammas 
Date:  Wednesday, November 25, 2015 at 11:36 AM
To:  Dillian Murphey , "user @spark"

Subject:  Re: Adding more slaves to a running cluster

> spark-ec2 does not directly support adding instances to an existing cluster,
> apart from the special case of adding slaves to a cluster with a master but no
> slaves. There is an open issue to track adding this support, SPARK-2008
>  , but it doesn't have any
> momentum at the moment.
> 
> Your best bet currently is to do what you did and hack your way through using
> spark-ec2's various scripts.
> 
> You probably already know this, but to be clear, note that Spark itself
> supports adding slaves to a running cluster. It's just that spark-ec2 hasn't
> implemented a feature to do this work for you.
> 
> Nick
> 
> On Wed, Nov 25, 2015 at 2:27 PM Dillian Murphey 
> wrote:
>> It appears start-slave.sh works on a running cluster.  I'm surprised I can't
>> find more info on this. Maybe I'm not looking hard enough?
>> 
>> Using AWS and spot instances is incredibly more efficient, which begs for the
>> need of dynamically adding more nodes while the cluster is up, yet everything
>> I've found so far seems to indicate it isn't supported yet.
>> 
>> But yet here I am with 1.5 and it at least appears to be working. Am I
>> missing something?
>> 
>> On Tue, Nov 24, 2015 at 4:40 PM, Dillian Murphey 
>> wrote:
>>> What's the current status on adding slaves to a running cluster?  I want to
>>> leverage spark-ec2 and autoscaling groups.  I want to launch slaves as spot
>>> instances when I need to do some heavy lifting, but I don't want to bring
>>> down my cluster in order to add nodes.
>>> 
>>> Can this be done by just running start-slave.sh??
>>> 
>>> What about using Mesos?
>>> 
>>> I just want to create an AMI for a slave and on some trigger launch it and
>>> have it automatically add itself to the cluster.
>>> 
>>> thanks
>> 




Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-24 Thread Andy Davidson
Hi Sabarish


Thanks for the suggestion. I did not know about wholeTextFiles()

By the way once your suggestion about repartitioning was spot on!. My run
time for count() when from elapsed time:0:56:42.902407 to elapsed
time:0:00:03.215143 on a data set of about 34M of 4720 records.

Andy

From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
Date:  Monday, November 23, 2015 at 7:57 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  Xiao Li <gatorsm...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> 
> Hi Andy
> 
> You can try sc.wholeTextFiles() instead of sc.textFile()
> 
> Regards
> Sab
> 
> On 24-Nov-2015 4:01 am, "Andy Davidson" <a...@santacruzintegration.com> wrote:
>> Hi Xiao and Sabarish
>> 
>> Using the Stage tab on the UI. It turns out you can see how many
>> partitions there are. If I did nothing I would have 228155 partition.
>> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
>> fails. I though given I have 3 workers and 1/3 of the data would easily
>> fit into memory this would be a good choice.
>> 
>> If I use coalesce(30) count works. How ever it still seems slow. It took
>> 2.42 min to read 4720 records. My total data set size is 34M.
>> 
>> Any suggestions how to choose the number of partitions.?
>> 
>>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>> 
>> 
>> The data was originally collected using spark stream. I noticed that the
>> number of default partitions == the number of files create on hdfs. I bet
>> each file is one spark streaming mini-batchI suspect if I concatenate
>> these into a small number of files things will run much faster. I suspect
>> I would not need to call coalesce() and that coalesce() is taking a lot of
>> time. Any suggestions how to choose the file number of files.
>> 
>> Kind regards
>> 
>> Andy
>> 
>> 
>> From:  Xiao Li <gatorsm...@gmail.com>
>> Date:  Monday, November 23, 2015 at 12:21 PM
>> To:  Andrew Davidson <a...@santacruzintegration.com>
>> Cc:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark"
>> <user@spark.apache.org>
>> Subject:  Re: newbie : why are thousands of empty files being created on
>> HDFS?
>> 
>> 
>>> >In your case, maybe you can try to call the function coalesce?
>>> >Good luck,
>>> >
>>> >Xiao Li
>>> >
>>> >2015-11-23 12:15 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>:
>>> >
>>> >Hi Sabarish
>>> >
>>> >I am but a simple padawan :-) I do not understand your answer. Why would
>>> >Spark be creating so many empty partitions? My real problem is my
>>> >application is very slow. I happened to notice thousands of empty files
>>> >being created. I thought this is a hint to why my app is slow.
>>> >
>>> >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
>>> >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
>>> >The total data writing in 38M.
>>> >
>>> >The data is read from HDFS. My understanding is Spark can not know in
>>> >advance how HDFS partitioned the data. Spark knows I have a master and 3
>>> >slaves machines. It knows how many works/executors are assigned to my
>>> >Job. I would expect spark would be smart enough not create more
>>> >partitions than I have worker machines?
>>> >
>>> >Also given I am not using any key/value operations like Join() or doing
>>> >multiple scans I would assume my app would not benefit from partitioning.
>>> >
>>> >
>>> >Kind regards
>>> >
>>> >Andy
>>> >
>>> >
>>> >From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
>>> >Date:  Saturday, November 21, 2015 at 7:20 PM
>>> >To:  Andrew Davidson <a...@santacruzintegration.com>
>>> >Cc:  "user @spark" <user@spark.apache.org>
>>> >Subject:  Re: newbie : why are thousands of empty files being created on
>>> >HDFS?
>>> >
>>> >
>>> >
>>> >Those are empty partitions. I don't see the number of partitions
>>> >specified in code. That then implies the default parallelism config is
>>> >being used and is set to a very high number, the sum of empty + non

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-24 Thread Andy Davidson
Hi Don

I went to a presentation given by Professor Ion Stoica. He mentioned that
Python was a little slower in general because of the type system. I do not
remember all of his comments. I think the context had to do with spark SQL
and data frames.

I wonder if the python issue is similar to the boxing/unboxing issue in
Java?

Andy


From:  Don Drake <dondr...@gmail.com>
Date:  Monday, November 23, 2015 at 7:10 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  Xiao Li <gatorsm...@gmail.com>, Sabarish Sasidharan
<sabarish.sasidha...@manthan.com>, "user @spark" <user@spark.apache.org>
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> I'm seeing similar slowness in saveAsTextFile(), but only in Python.
> 
> I'm sorting data in a dataframe, then transform it and get a RDD, and then
> coalesce(1).saveAsTextFile().
> 
> I converted the Python to Scala and the run-times were similar, except for the
> saveAsTextFile() stage.  The scala version was much faster.
> 
> When looking at the executor logs during that stage, I see the following when
> running the Scala code:
> 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
> non-empty blocks out of 600 blocks
> 
> 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184 remote
> fetches in 64 ms
> 
> 15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (0  time so far)
> 
> 15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (1  time so far)
> 
> 15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (2  times so far)
> 
> 15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (3  times so far)
> 
> 15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (4  times so far)
> 
> 15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (5  times so far)
> 
> 15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (6  times so far)
> 
> 15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (7  times so far)
> 
> 15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (8  times so far)
> 
>  
> 
> When running the Python version during the saveAsTextFile() stage, I see:
> 
> 15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
> init = 144, finish = 16041
> 
> 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 82 ms
> 
> 15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot = -415,
> init = 447, finish = 12148
> 
> 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 129 ms
> 
> 15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot = -372,
> init = 398, finish = 11424
> 
> 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 70 ms
> 
> 15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot = -378,
> init = 403, finish = 14455
> 
> 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 62 ms
> 
> 15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot = -366,
> init = 381, finish = 11853
> 
> 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 59 ms
> 
> 15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot = -392,
> init = 403, finish = 15364
> 
> 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 48 ms
> 
> 
> 
> The python version is approximately 10 times slower than the Scala version.
> Any ideas why?
> 
> 
> 
> -

possible bug spark/python/pyspark/rdd.py portable_hash()

2015-11-26 Thread Andy Davidson
I am using spark-1.5.1-bin-hadoop2.6. I used
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2 to create a cluster and configured
spark-env to use python3. I get and exception 'Randomness of hash of string
should be disabled via PYTHONHASHSEED¹. Is there any reason rdd.py should
not just set PYTHONHASHSEED ?

Should I file a bug?

Kind regards

Andy

details

http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtra
ct#pyspark.RDD.subtract

Example does not work out of the box

Subtract(other, numPartitions=None)


Return each value in self that is not contained in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
It raises 

if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
raise Exception("Randomness of hash of string should be disabled via
PYTHONHASHSEED")


The following script fixes the problem

Sudo printf "\n# set PYTHONHASHSEED so python3 will not generate
Exception'Randomness of hash of string should be disabled via
PYTHONHASHSEED'\nexport PYTHONHASHSEED=123\n" >>
/root/spark/conf/spark-env.sh

sudo pssh -i -h /root/spark-ec2/slaves cp /root/spark/conf/spark-env.sh
/root/spark/conf/spark-env.sh-`date "+%Y-%m-%d:%H:%M"`

Sudo for i in `cat slaves` ; do scp spark-env.sh
root@$i:/root/spark/conf/spark-env.sh; done







Re: WARN LoadSnappy: Snappy native library not loaded

2015-11-17 Thread Andy Davidson
I forgot to mention. I am using spark-1.5.1-bin-hadoop2.6

From:  Andrew Davidson 
Date:  Tuesday, November 17, 2015 at 2:26 PM
To:  "user @spark" 
Subject:  Re: WARN LoadSnappy: Snappy native library not loaded

> FYI
> 
> After 17 min. only 26112/228155 have succeeded
> 
> This seems very slow
> 
> Kind regards
> 
> Andy
> 
> 
> 
> From:  Andrew Davidson 
> Date:  Tuesday, November 17, 2015 at 2:22 PM
> To:  "user @spark" 
> Subject:  WARN LoadSnappy: Snappy native library not loaded
> 
> 
>> I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I
>> have 3 slaves. In general I am running into trouble even with small work
>> loads. I am using IPython notebooks running on my spark cluster.
>> Everything is painfully slow. I am using the standAlone cluster manager.
>> I noticed that I am getting the following warning on my driver console.
>> Any idea what the problem might be?
>> 
>> 
>> 
>> 15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded
>> 
>> 
>> 
>> Here is an overview of my POS app. I have a file on hdfs containing about
>> 5000 twitter status strings.
>> 
>> tweetStrings = sc.textFile(dataURL)
>> 
>> jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10))
>> 
>> 
>> Generated the following error ³error occurred while calling
>> o78.partitions.: java.lang.OutOfMemoryError: GC overhead limit exceeded²
>> 
>> Any idea what we need to do to improve new spark user¹s out of the box
>> experience?
>> 
>> Kind regards
>> 
>> Andy
>> 
>> export PYSPARK_PYTHON=python3.4
>> export PYSPARK_DRIVER_PYTHON=python3.4
>> export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"
>> 
>> MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077
>> 
>> 
>> numCores=2
>> $SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores
>> $numCores $*




Re: WARN LoadSnappy: Snappy native library not loaded

2015-11-17 Thread Andy Davidson
FYI

After 17 min. only 26112/228155 have succeeded

This seems very slow

Kind regards

Andy



From:  Andrew Davidson 
Date:  Tuesday, November 17, 2015 at 2:22 PM
To:  "user @spark" 
Subject:  WARN LoadSnappy: Snappy native library not loaded


>I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I
>have 3 slaves. In general I am running into trouble even with small work
>loads. I am using IPython notebooks running on my spark cluster.
>Everything is painfully slow. I am using the standAlone cluster manager.
>I noticed that I am getting the following warning on my driver console.
>Any idea what the problem might be?
>
>
>
>15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for
>source because spark.app.id is not set.
>15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop
>library for your platform... using builtin-java classes where applicable
>15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded
>
>
>
>Here is an overview of my POS app. I have a file on hdfs containing about
>5000 twitter status strings.
>
>tweetStrings = sc.textFile(dataURL)
>
>jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10))
>
>
>Generated the following error ³error occurred while calling
>o78.partitions.: java.lang.OutOfMemoryError: GC overhead limit exceeded²
>
>Any idea what we need to do to improve new spark user¹s out of the box
>experience?
>
>Kind regards
>
>Andy
>
>export PYSPARK_PYTHON=python3.4
>export PYSPARK_DRIVER_PYTHON=python3.4
>export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"
>
>MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077
>
>
>numCores=2
>$SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores
>$numCores $*



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



WARN LoadSnappy: Snappy native library not loaded

2015-11-17 Thread Andy Davidson
I started a spark POC. I created a ec2 cluster on AWS using spark-c2. I have
3 slaves. In general I am running into trouble even with small work loads. I
am using IPython notebooks running on my spark cluster. Everything is
painfully slow. I am using the standAlone cluster manager. I noticed that I
am getting the following warning on my driver console. Any idea what the
problem might be?



15/11/17 22:01:59 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.

15/11/17 22:03:05 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

15/11/17 22:03:05 WARN LoadSnappy: Snappy native library not loaded



Here is an overview of my POS app. I have a file on hdfs containing about
5000 twitter status strings.

tweetStrings = sc.textFile(dataURL)
jTweets = (tweetStrings.map(lambda x: json.loads(x)).take(10))

Generated the following error ³error occurred while calling o78.partitions.:
java.lang.OutOfMemoryError: GC overhead limit exceeded²

Any idea what we need to do to improve new spark user¹s out of the box
experience?

Kind regards

Andy

export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

MASTER_URL=spark://ec2-55-218-207-122.us-west-1.compute.amazonaws.com:7077


numCores=2

$SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores
$numCores $*






newbie: unable to use all my cores and memory

2015-11-19 Thread Andy Davidson
I am having a heck of a time figuring out how to utilize my cluster
effectively. I am using the stand alone cluster manager. I have a master
and 3 slaves. Each machine has 2 cores.

I am trying to run a streaming app in cluster mode and pyspark at the same
time.

t1) On my console I see

* Alive Workers: 3
* Cores in use: 6 Total, 0 Used
* Memory in use: 18.8 GB Total, 0.0 B Used
* Applications: 0 Running, 15 Completed
* Drivers: 0 Running, 2 Completed
* Status: ALIVE

t2) I start my streaming app

$SPARK_ROOT/bin/spark-submit \
--class "com.pws.spark.streaming.IngestDriver" \
--master $MASTER_URL \
--total-executor-cores 2 \
--deploy-mode cluster \
$jarPath --clusterMode  $*

t3) on my console I see

* Alive Workers: 3
* Cores in use: 6 Total, 3 Used
* Memory in use: 18.8 GB Total, 13.0 GB Used
* Applications: 1 Running, 15 Completed
* Drivers: 1 Running, 2 Completed
* Status: ALIVE

Looks like pyspark should be able to use the 3 remaining cores and 5.8 GB
of memory

t4) I start pyspark

export PYSPARK_PYTHON=python3.4
export PYSPARK_DRIVER_PYTHON=python3.4
export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

$SPARK_ROOT/bin/pyspark --master $MASTER_URL --total-executor-cores 3
--executor-memory 2g

t5) on my console I see

* Alive Workers: 3
* Cores in use: 6 Total, 4 Used
* Memory in use: 18.8 GB Total, 15.0 GB Used
* Applications: 2 Running, 18 Completed
* Drivers: 1 Running, 2 Completed
* Status: ALIVE


I have 2 unused cores and a lot of memory left over. My pyspark
application is going getting 1 core. If streaming app is not running
pyspark would be assigned 2 cores each on a different worker. I have tried
using various combinations of --executor-cores and --total-executor-cores.
Any idea how to get pyspark to use more cores and memory?


Kind regards

Andy

P.s.  Using different values I have wound up with  pyspark status ==
³waiting² I think this is because there are not enough cores available?




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark streaming problem saveAsTextFiles() does not write valid JSON to HDFS

2015-11-19 Thread Andy Davidson
I am working on a simple POS. I am running into a really strange problem. I
wrote a java streaming app that collects tweets using the spark twitter
package and stores the to disk in JSON format. I noticed that when I run the
code on my mac. The file are written to the local files system as I expect
I.E. In valid JSON format. The key names are double quoted. Boolean values
are the works true or false in lower case.



When I run in my cluster the only difference is I call
data.saveAsTextFiles() using an hdfs: URI instead of using file:/// . When
the files are written to HDFS the JSON is not valid. E.G. Key names are
single quoted not double quoted. Boolean values are the string False or
True, notice they start with upper case. I suspect there will be other
problems. Any idea what I am doing wrong?



I am using spark-1.5.1-bin-hadoop2.6



import twitter4j.Status;

import com.fasterxml.jackson.databind.ObjectMapper;




   private static ObjectMapper mapper = new ObjectMapper();

static {

mapper.setVisibility(PropertyAccessor.FIELD,
JsonAutoDetect.Visibility.ANY);

mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);

}


   JavaDStream jsonTweets = tweets.map(mapStatusToJson);

DStream data = jsonTweets.dstream();

data.saveAsTextFiles(outputUri, null);



class MapStatusToJson implements Function {

private static final long serialVersionUID = -2882921625604816575L;



@Override

public String call(Status status) throws Exception {

return mapper.writeValueAsString(status);



}



I have been using pyspark to explore the data.



dataURL = "hdfs:///smallSample"

tweetStrings = sc.textFile(dataURL) # I looked at source it decoded as UTF-8

def str2JSON(statusStr):

"""If string is not valid JSON return 'none' and creates a log.warn()"""

try:

ret = json.loads(statusStr)

return ret

except Exception as exc :

logging.warning("bad JSON")

logging.warning(exc)

logging.warning(statusStr)

numJSONExceptions.add(1)

return None #null value





#jTweets = tweetStrings.map(lambda x: json.loads(x)).take(10)

jTweets = tweetStrings.map(str2JSON).take(10)



If I call print tweetStrings.take(1)

I would get back the following string. (its really long only provided part
of)


{'favorited': False, 'inReplyToStatusId': -1, 'inReplyToScreenName': None,
'urlentities': [{'end': 126, 'expandedURL': 'http://panth.rs/pZ9Cvv',


If I copy one of the hdfs part files locally I would see something similar.
So I think the problem has something to do with DStream.saveAsTextFiles().



I do not know if this is the problem or not, How ever it looks like the
system might depend of several version of jackson and fasterxml.jackson



Has anyone else run into this problem?



Kind regards



Andy



provided

+--- org.apache.spark:spark-streaming_2.10:1.5.1

|+--- org.apache.spark:spark-core_2.10:1.5.1

||+--- org.apache.avro:avro-mapred:1.7.7

|||+--- org.apache.avro:avro-ipc:1.7.7

||||+--- org.apache.avro:avro:1.7.7

|||||+--- org.codehaus.jackson:jackson-core-asl:1.9.13

|||||+--- org.codehaus.jackson:jackson-mapper-asl:1.9.13

||||||\---
org.codehaus.jackson:jackson-core-asp:1.9.13





||+--- org.apache.hadoop:hadoop-client:2.2.0

|||+--- org.apache.hadoop:hadoop-common:2.2.0

||||+--- org.slf4j:slf4j-api:1.7.5 -> 1.7.10

||||+--- org.codehaus.jackson:jackson-core-asl:1.8.8 ->
1.9.13





|||\--- com.fasterxml.jackson.core:jackson-databind:2.3.1 ->
2.4.4

||| +---
com.fasterxml.jackson.core:jackson-annotations:2.4.0 -> 2.4.4

||| \--- com.fasterxml.jackson.core:jackson-core:2.4.4

||+--- com.sun.jersey:jersey-server:1.9 (*)

||+--- com.sun.jersey:jersey-core:1.9

||+--- org.apache.mesos:mesos:0.21.1

||+--- io.netty:netty-all:4.0.29.Final

||+--- com.clearspring.analytics:stream:2.7.0

||+--- io.dropwizard.metrics:metrics-core:3.1.2

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-jvm:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-json:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||+--- com.fasterxml.jackson.core:jackson-databind:2.4.2 ->
2.4.4 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- io.dropwizard.metrics:metrics-graphite:3.1.2

|||+--- io.dropwizard.metrics:metrics-core:3.1.2 (*)

|||\--- org.slf4j:slf4j-api:1.7.7 -> 1.7.10

||+--- com.fasterxml.jackson.core:jackson-databind:2.4.4 (*)

||+--- 

Re: thought experiment: use spark ML to real time prediction

2015-11-22 Thread Andy Davidson
rms and versioning. i.e. you're re-creating a new
>>> standard like PMML
>>> 
>>> Option (a) is do-able, but I'm a bit concerned that it may be too "Spark
>>> specific", or even too "Scala / Java" specific. But it is still potentially
>>> very useful to Spark users to build this out and have a somewhat standard
>>> production serving framework and/or library (there are obviously existing
>>> options like PredictionIO etc).
>>> 
>>> Option (b) is really building out the existing PMML support within Spark, so
>>> a lot of the initial work has already been done. I know some folks had (or
>>> have) licensing issues with some components of JPMML (e.g. the evaluator and
>>> REST server). But perhaps the solution here is to build an Apache2-licensed
>>> evaluator framework.
>>> 
>>> Option (c) is obviously interesting - "let's build a better PMML (that uses
>>> JSON or whatever instead of XML!)". But it also seems like a huge amount of
>>> reinventing the wheel, and like any new standard would take time to garner
>>> wide support (if at all).
>>> 
>>> It would be really useful to start to understand what the main missing
>>> pieces are in PMML - perhaps the lowest-hanging fruit is simply to
>>> contribute improvements or additions to PMML.
>>> 
>>> 
>>> 
>>> On Fri, Nov 13, 2015 at 11:46 AM, Sabarish Sasidharan
>>> <sabarish.sasidha...@manthan.com> wrote:
>>>> That may not be an issue if the app using the models runs by itself (not
>>>> bundled into an existing app), which may actually be the right way to
>>>> design it considering separation of concerns.
>>>> 
>>>> Regards
>>>> Sab 
>>>> 
>>>> On Fri, Nov 13, 2015 at 9:59 AM, DB Tsai <dbt...@dbtsai.com> wrote:
>>>>> This will bring the whole dependencies of spark will may break the web
>>>>> app.
>>>>> 
>>>>> 
>>>>> Sincerely,
>>>>> 
>>>>> DB Tsai
>>>>> --
>>>>> Web: https://www.dbtsai.com
>>>>> PGP Key ID: 0xAF08DF8D
>>>>> 
>>>>> On Thu, Nov 12, 2015 at 8:15 PM, Nirmal Fernando <nir...@wso2.com> wrote:
>>>>>> 
>>>>>> 
>>>>>> On Fri, Nov 13, 2015 at 2:04 AM, darren <dar...@ontrenet.com> wrote:
>>>>>>>  
>>>>>>> I agree 100%. Making the model requires large data and many cpus.
>>>>>>> 
>>>>>>> Using it does not.
>>>>>>> 
>>>>>>> This is a very useful side effect of ML models.
>>>>>>> 
>>>>>>> If mlib can't use models outside spark that's a real shame.
>>>>>> 
>>>>>> Well you can as mentioned earlier. You don't need Spark runtime for
>>>>>> predictions, save the serialized model and deserialize to use. (you need
>>>>>> the Spark Jars in the classpath though)
>>>>>>> 
>>>>>>> 
>>>>>>> Sent from my Verizon Wireless 4G LTE smartphone
>>>>>>> 
>>>>>>> 
>>>>>>>  Original message 
>>>>>>> From: "Kothuvatiparambil, Viju"
>>>>>>> <viju.kothuvatiparam...@bankofamerica.com>
>>>>>>> Date: 11/12/2015  3:09 PM  (GMT-05:00)
>>>>>>> To: DB Tsai <dbt...@dbtsai.com>, Sean Owen <so...@cloudera.com>
>>>>>>> Cc: Felix Cheung <felixcheun...@hotmail.com>, Nirmal Fernando
>>>>>>> <nir...@wso2.com>, Andy Davidson <a...@santacruzintegration.com>, Adrian
>>>>>>> Tanase <atan...@adobe.com>, "user @spark" <user@spark.apache.org>,
>>>>>>> Xiangrui Meng <men...@gmail.com>, hol...@pigscanfly.ca
>>>>>>> Subject: RE: thought experiment: use spark ML to real time prediction
>>>>>>> 
>>>>>>> I am glad to see DB¹s comments, make me feel I am not the only one
>>>>>>> facing these issues. If we are able to use MLLib to load the model in
>>>>>>> web applications (outside the spark cluster), that would have solved the
>>>>>>> issue.  I understand Spark is manly for processing big data in a
>>&

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Xiao and Sabarish

Using the Stage tab on the UI. It turns out you can see how many
partitions there are. If I did nothing I would have 228155 partition.
(This confirms what Sabarish said). I tried coalesce(3). RDD.count()
fails. I though given I have 3 workers and 1/3 of the data would easily
fit into memory this would be a good choice.

If I use coalesce(30) count works. How ever it still seems slow. It took
2.42 min to read 4720 records. My total data set size is 34M.

Any suggestions how to choose the number of partitions.?

 ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')


The data was originally collected using spark stream. I noticed that the
number of default partitions == the number of files create on hdfs. I bet
each file is one spark streaming mini-batchI suspect if I concatenate
these into a small number of files things will run much faster. I suspect
I would not need to call coalesce() and that coalesce() is taking a lot of
time. Any suggestions how to choose the file number of files.

Kind regards

Andy


From:  Xiao Li <gatorsm...@gmail.com>
Date:  Monday, November 23, 2015 at 12:21 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark"
<user@spark.apache.org>
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?


>In your case, maybe you can try to call the function coalesce?
>Good luck, 
>
>Xiao Li
>
>2015-11-23 12:15 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>:
>
>Hi Sabarish
>
>I am but a simple padawan :-) I do not understand your answer. Why would
>Spark be creating so many empty partitions? My real problem is my
>application is very slow. I happened to notice thousands of empty files
>being created. I thought this is a hint to why my app is slow.
>
>My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
>takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
>The total data writing in 38M.
>
>The data is read from HDFS. My understanding is Spark can not know in
>advance how HDFS partitioned the data. Spark knows I have a master and 3
>slaves machines. It knows how many works/executors are assigned to my
>Job. I would expect spark would be smart enough not create more
>partitions than I have worker machines?
>
>Also given I am not using any key/value operations like Join() or doing
>multiple scans I would assume my app would not benefit from partitioning.
>
>
>Kind regards
>
>Andy
>
>
>From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
>Date:  Saturday, November 21, 2015 at 7:20 PM
>To:  Andrew Davidson <a...@santacruzintegration.com>
>Cc:  "user @spark" <user@spark.apache.org>
>Subject:  Re: newbie : why are thousands of empty files being created on
>HDFS?
>
>
>
>Those are empty partitions. I don't see the number of partitions
>specified in code. That then implies the default parallelism config is
>being used and is set to a very high number, the sum of empty + non empty
>files.
>Regards
>Sab
>On 21-Nov-2015 11:59 pm, "Andy Davidson" <a...@santacruzintegration.com>
>wrote:
>
>I start working on a very simple ETL pipeline for a POC. It reads a in a
>data set of tweets stored as JSON strings on in HDFS and randomly selects
>1% of the observations and writes them to HDFS. It seems to run very
>slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>Also noticed that RDD saveAsTextFile is creating thousands of empty
>files. 
>
>I assume creating all these empty files must be slowing down the system.
>Any idea why this is happening? Do I have write a script to periodical
>remove empty files?
>
>
>Kind regards
>
>Andy
>
>tweetStrings = sc.textFile(inputDataURL)
>
>
>def removeEmptyLines(line) :
>if line:
>return True
>else :
>emptyLineCount.add(1);
>return False
>
>emptyLineCount = sc.accumulator(0)
>sample = (tweetStrings.filter(removeEmptyLines)
>  .sample(withReplacement=False, fraction=0.01, seed=345678))
>
>
>startTime = datetime.datetime.now()
>sample.saveAsTextFile(saveDataURL)
>
>endTime = datetime.datetime.now()
>print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
>
>elapsed time:1:06:46.577795
>
>Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc
>­l223515
>Total number of files with data$ hadoop fs -du {saveDataURL} | grep ­v
>'^0' | wc ­l4642
>
>I randomly pick a part file. It¹s size is 9251
>
>
>
>
>
>
>
>
>
>
>
>
>
>



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ml.classification.NaiveBayesModel how to reshape theta

2016-01-12 Thread Andy Davidson
I am trying to debug my trained model by exploring theta
Theta is a Matrix. The java Doc for Matrix says that it is column major
formate

I have trained a NaiveBayesModel. Is the number of classes == to the number
of rows? 

int numRows = nbModel.numClasses();

int numColumns = nbModel.numFeatures();



Kind regards



Andy




Re: pre-install 3-party Python package on spark cluster

2016-01-11 Thread Andy Davidson
I use https://code.google.com/p/parallel-ssh/ to upgrade all my slaves



From:  "taotao.li" 
Date:  Sunday, January 10, 2016 at 9:50 PM
To:  "user @spark" 
Subject:  pre-install 3-party Python package on spark cluster

> I have a spark cluster, from machine-1 to machine 100, and machine-1 acts as
> the master.
> 
> Then one day my program need use a 3-party python package which is not
> installed on every machine of the cluster.
> 
> so here comes my problem: to make that 3-party python package usable on
> master and slaves, should I manually ssh to every machine and use pip to
> install that package?
> 
> I believe there should be some deploy scripts or other things to make this
> grace, but I can't find anything after googling.
> 
> 
> 
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/pre-install-3-party-Python
> -package-on-spark-cluster-tp25930.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 




has any one implemented TF_IDF using ML transformers?

2016-01-15 Thread Andy Davidson
I wonder if I am missing something? TF-IDF is very popular. Spark ML has a
lot of transformers how ever it TF_IDF is not supported directly.

Spark provide a HashingTF and IDF transformer. The java doc
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf

Mentions you can implement TFIDF as follows

TFIDF(t,d,D)=TF(t,d)・IDF(t,D).

The problem I am running into is both HashingTF and IDF return a sparse
vector.

Ideally the spark code  to implement TFIDF would be one line.

 DataFrame ret = tmp.withColumn("features",
tmp.col("tf").multiply(tmp.col("idf")));

org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
data type mismatch: '(tf * idf)' requires numeric type, not vector;

I could implement my own UDF to do member wise multiplication how ever given
how common TF-IDF is I wonder if this code already exists some where

I found  org.apache.spark.util.Vector.Multiplier. There is no documentation
how ever give the argument is double, my guess is it just does scalar
multiplication. 

I guess I could do something like

Double[] v = mySparkVector.toArray();
 Then use JBlas to do member wise multiplication

I assume sparceVectors are not distributed so there  would not be any
additional communication cost


If this code is truly missing. I would be happy to write it and donate it

Andy


From:  Andrew Davidson 
Date:  Wednesday, January 13, 2016 at 2:52 PM
To:  "user @spark" 
Subject:  trouble calculating TF-IDF data type mismatch: '(tf * idf)'
requires numeric type, not vector;

> Bellow is a little snippet of my Java Test Code. Any idea how I implement
> member wise vector multiplication?
> 
> Kind regards
> 
> Andy
> 
> transformed df printSchema()
> 
> root
> 
>  |-- id: integer (nullable = false)
> 
>  |-- label: double (nullable = false)
> 
>  |-- words: array (nullable = false)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- tf: vector (nullable = true)
> 
>  |-- idf: vector (nullable = true)
> 
> 
> 
> +---+-++-+
> ---+
> 
> |id |label|words   |tf   |idf
> |
> 
> +---+-++-+
> ---+
> 
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
> 
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
> 
> |2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
> |(7,[1,6],[0.0,0.9162907318741551]) |
> 
> |3  |1.0  |[Tokyo, Japan, Chinese]
> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.91629073187415
> 51])|
> 
> +---+-++-+
> ---+
> 
> 
> @Test
> 
> public void test() {
> 
> DataFrame rawTrainingDF = createTrainingData();
> 
> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);
> 
> . . .
> 
> }
> 
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
> 
> HashingTF hashingTF = new HashingTF()
> 
> .setInputCol("words")
> 
> .setOutputCol("tf")
> 
> .setNumFeatures(dictionarySize);
> 
> 
> 
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
> 
> 
> 
> termFrequenceDF.cache(); // idf needs to make 2 passes over data set
> 
> IDFModel idf = new IDF()
> 
> //.setMinDocFreq(1) // our vocabulary has 6 words we
> hash into 7
> 
> .setInputCol(hashingTF.getOutputCol())
> 
> .setOutputCol("idf")
> 
> .fit(termFrequenceDF);
> 
> 
> 
> DataFrame tmp = idf.transform(termFrequenceDF);
> 
> 
> 
> DataFrame ret = tmp.withColumn("features",
> tmp.col("tf").multiply(tmp.col("idf")));
> 
> logger.warn("\ntransformed df printSchema()");
> 
> ret.printSchema();
> 
> ret.show(false);
> 
> 
> 
> return ret;
> 
> }
> 
> 
> 
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' requires numeric type, not vector;
> 
> 
> 
> 
> 
> private DataFrame createTrainingData() {
> 
> // make sure we only use dictionarySize words
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
> // 0 is Chinese
> 
> // 1 in notChinese
> 
> RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing",
> "Chinese")),
> 
> RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese",
> "Shanghai")),
> 
>   

Re: trouble calculating TF-IDF data type mismatch: '(tf * idf)' requires numeric type, not vector;

2016-01-13 Thread Andy Davidson
you¹ll need the following function if you want to run the test code

Kind regards 

Andy

private DataFrame createData(JavaRDD rdd) {

StructField id = null;

id = new StructField("id", DataTypes.IntegerType, false,
Metadata.empty());



StructField label = null;

label = new StructField("label", DataTypes.DoubleType, false,
Metadata.empty());

   

StructField words = null;

words = new StructField("words",
DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty());



StructType schema = new StructType(new StructField[] { id, label,
words });

DataFrame ret = sqlContext.createDataFrame(rdd, schema);



return ret;

}


From:  Andrew Davidson 
Date:  Wednesday, January 13, 2016 at 2:52 PM
To:  "user @spark" 
Subject:  trouble calculating TF-IDF data type mismatch: '(tf * idf)'
requires numeric type, not vector;

> Bellow is a little snippet of my Java Test Code. Any idea how I implement
> member wise vector multiplication?
> 
> Also notice the idf value for ŒChinese¹ is 0.0? The calculation is ln((4+1) /
> (6/4 + 1)) = ln(2) = 0.6931 ??
> 
> Also any idea if this code would work in a pipe line? I.E. Is the pipeline
> smart about using cache() ?
> 
> Kind regards
> 
> Andy
> 
> transformed df printSchema()
> 
> root
> 
>  |-- id: integer (nullable = false)
> 
>  |-- label: double (nullable = false)
> 
>  |-- words: array (nullable = false)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- tf: vector (nullable = true)
> 
>  |-- idf: vector (nullable = true)
> 
> 
> 
> +---+-++-+
> ---+
> 
> |id |label|words   |tf   |idf
> |
> 
> +---+-++-+
> ---+
> 
> |0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
> |(7,[1,2],[0.0,0.9162907318741551]) |
> 
> |1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
> |(7,[1,4],[0.0,0.9162907318741551]) |
> 
> |2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
> |(7,[1,6],[0.0,0.9162907318741551]) |
> 
> |3  |1.0  |[Tokyo, Japan, Chinese]
> |(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.91629073187415
> 51])|
> 
> +---+-++-+
> ---+
> 
> 
> @Test
> 
> public void test() {
> 
> DataFrame rawTrainingDF = createTrainingData();
> 
> DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);
> 
> . . .
> 
> }
> 
>private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {
> 
> HashingTF hashingTF = new HashingTF()
> 
> .setInputCol("words")
> 
> .setOutputCol("tf")
> 
> .setNumFeatures(dictionarySize);
> 
> 
> 
> DataFrame termFrequenceDF = hashingTF.transform(rawDF);
> 
> 
> 
> termFrequenceDF.cache(); // idf needs to make 2 passes over data set
> 
> IDFModel idf = new IDF()
> 
> //.setMinDocFreq(1) // our vocabulary has 6 words we
> hash into 7
> 
> .setInputCol(hashingTF.getOutputCol())
> 
> .setOutputCol("idf")
> 
> .fit(termFrequenceDF);
> 
> 
> 
> DataFrame tmp = idf.transform(termFrequenceDF);
> 
> 
> 
> DataFrame ret = tmp.withColumn("features",
> tmp.col("tf").multiply(tmp.col("idf")));
> 
> logger.warn("\ntransformed df printSchema()");
> 
> ret.printSchema();
> 
> ret.show(false);
> 
> 
> 
> return ret;
> 
> }
> 
> 
> 
> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
> data type mismatch: '(tf * idf)' requires numeric type, not vector;
> 
> 
> 
> 
> 
> private DataFrame createTrainingData() {
> 
> // make sure we only use dictionarySize words
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
> // 0 is Chinese
> 
> // 1 in notChinese
> 
> RowFactory.create(0, 0.0, Arrays.asList("Chinese", "Beijing",
> "Chinese")),
> 
> RowFactory.create(1, 0.0, Arrays.asList("Chinese", "Chinese",
> "Shanghai")),
> 
> RowFactory.create(2, 0.0, Arrays.asList("Chinese", "Macao")),
> 
> RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
> "Chinese";
> 
>
> 
> return createData(rdd);
> 
> }
> 
> 
> 
> private DataFrame createTestData() {
> 
> JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(
> 
>   

trouble calculating TF-IDF data type mismatch: '(tf * idf)' requires numeric type, not vector;

2016-01-13 Thread Andy Davidson
Bellow is a little snippet of my Java Test Code. Any idea how I implement
member wise vector multiplication?

Also notice the idf value for ŒChinese¹ is 0.0? The calculation is ln((4+1)
/ (6/4 + 1)) = ln(2) = 0.6931 ??

Also any idea if this code would work in a pipe line? I.E. Is the pipeline
smart about using cache() ?

Kind regards

Andy

transformed df printSchema()

root

 |-- id: integer (nullable = false)

 |-- label: double (nullable = false)

 |-- words: array (nullable = false)

 ||-- element: string (containsNull = true)

 |-- tf: vector (nullable = true)

 |-- idf: vector (nullable = true)



+---+-++-+--
-+

|id |label|words   |tf   |idf
|

+---+-++-+--
-+

|0  |0.0  |[Chinese, Beijing, Chinese] |(7,[1,2],[2.0,1.0])
|(7,[1,2],[0.0,0.9162907318741551]) |

|1  |0.0  |[Chinese, Chinese, Shanghai]|(7,[1,4],[2.0,1.0])
|(7,[1,4],[0.0,0.9162907318741551]) |

|2  |0.0  |[Chinese, Macao]|(7,[1,6],[1.0,1.0])
|(7,[1,6],[0.0,0.9162907318741551]) |

|3  |1.0  |[Tokyo, Japan, Chinese]
|(7,[1,3,5],[1.0,1.0,1.0])|(7,[1,3,5],[0.0,0.9162907318741551,0.916290731874
1551])|

+---+-++-+--
-+


@Test

public void test() {

DataFrame rawTrainingDF = createTrainingData();

DataFrame trainingDF = runPipleLineTF_IDF(rawTrainingDF);

. . .

}

   private DataFrame runPipleLineTF_IDF(DataFrame rawDF) {

HashingTF hashingTF = new HashingTF()

.setInputCol("words")

.setOutputCol("tf")

.setNumFeatures(dictionarySize);



DataFrame termFrequenceDF = hashingTF.transform(rawDF);



termFrequenceDF.cache(); // idf needs to make 2 passes over data set

IDFModel idf = new IDF()

//.setMinDocFreq(1) // our vocabulary has 6 words we
hash into 7

.setInputCol(hashingTF.getOutputCol())

.setOutputCol("idf")

.fit(termFrequenceDF);



DataFrame tmp = idf.transform(termFrequenceDF);



DataFrame ret = tmp.withColumn("features",
tmp.col("tf").multiply(tmp.col("idf")));

logger.warn("\ntransformed df printSchema()");

ret.printSchema();

ret.show(false);



return ret;

}



org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
data type mismatch: '(tf * idf)' requires numeric type, not vector;





private DataFrame createTrainingData() {

// make sure we only use dictionarySize words

JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(

// 0 is Chinese

// 1 in notChinese

RowFactory.create(0, 0.0, Arrays.asList("Chinese",
"Beijing", "Chinese")),

RowFactory.create(1, 0.0, Arrays.asList("Chinese",
"Chinese", "Shanghai")),

RowFactory.create(2, 0.0, Arrays.asList("Chinese",
"Macao")),

RowFactory.create(3, 1.0, Arrays.asList("Tokyo", "Japan",
"Chinese";

   

return createData(rdd);

}



private DataFrame createTestData() {

JavaRDD rdd = javaSparkContext.parallelize(Arrays.asList(

// 0 is Chinese

// 1 in notChinese

// "bernoulli" requires label to be IntegerType

RowFactory.create(4, 1.0, Arrays.asList("Chinese",
"Chinese", "Chinese", "Tokyo", "Japan";

return createData(rdd);

}




Re: ml.classification.NaiveBayesModel how to reshape theta

2016-01-13 Thread Andy Davidson
Thanks

Andy



From:  Yanbo Liang <yblia...@gmail.com>
Date:  Wednesday, January 13, 2016 at 6:29 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: ml.classification.NaiveBayesModel how to reshape theta

> Yep, row of Matrix theta is the number of classes and column of theta is the
> number of features.
> 
> 2016-01-13 10:47 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>:
>> I am trying to debug my trained model by exploring theta
>> Theta is a Matrix. The java Doc for Matrix says that it is column major
>> formate
>> 
>> I have trained a NaiveBayesModel. Is the number of classes == to the number
>> of rows? 
>> 
>> int numRows = nbModel.numClasses();
>> 
>> int numColumns = nbModel.numFeatures();
>> 
>> 
>> 
>> Kind regards
>> 
>> 
>> 
>> Andy
> 




Re: How To Save TF-IDF Model In PySpark

2016-01-15 Thread Andy Davidson
Are you using 1.6.0 or an older version?

I think I remember something in 1.5.1 saying save was not implemented in
python.


The current doc does not say anything about save()
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf

http://spark.apache.org/docs/latest/ml-guide.html#saving-and-loading-pipelin
es
"Often times it is worth it to save a model or a pipeline to disk for later
use. In Spark 1.6, a model import/export functionality was added to the
Pipeline API. Most basic transformers are supported as well as some of the
more basic ML models. Please refer to the algorithm¹s API documentation to
see if saving and loading is supported."

andy




From:  Asim Jalis 
Date:  Friday, January 15, 2016 at 4:02 PM
To:  "user @spark" 
Subject:  How To Save TF-IDF Model In PySpark

> Hi,
> 
> I am trying to save a TF-IDF model in PySpark. Looks like this is not
> supported. 
> 
> Using `model.save()` causes:
> 
> AttributeError: 'IDFModel' object has no attribute 'save'
> 
> Using `pickle` causes:
> 
> TypeError: can't pickle lock objects
> 
> Does anyone have suggestions
> 
> Thanks!
> 
> Asim
> 
> Here is the full repro. Start pyspark shell and then run this code in
> it.
> 
> ```
> # Imports
> from pyspark import SparkContext
> from pyspark.mllib.feature import HashingTF
> 
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.regression import Vectors
> from pyspark.mllib.feature import IDF
> 
> # Create some data
> n = 4
> freqs = [
> Vectors.sparse(n, (1, 3), (1.0, 2.0)),
> Vectors.dense([0.0, 1.0, 2.0, 3.0]),
> Vectors.sparse(n, [1], [1.0])]
> data = sc.parallelize(freqs)
> idf = IDF()
> model = idf.fit(data)
> tfidf = model.transform(data)
> 
> # View
> for r in tfidf.collect(): print(r)
> 
> # Try to save it
> model.save("foo.model")
> 
> # Try to save it with Pickle
> import pickle
> pickle.dump(model, open("model.p", "wb"))
> pickle.dumps(model)
> ```




Re: trouble using eclipse to view spark source code

2016-01-18 Thread Andy Davidson
Many thanks. I was using a different scala plug in. this one seems to work
better I no longer get compile error how ever I get the following stack
trace when I try to run my unit tests with mllib open

I am still using eclipse luna.

Andy

java.lang.NoSuchMethodError:
scala.collection.mutable.ArrayOps.$colon$plus(Ljava/lang/Object;Lscala/refle
ct/ClassTag;)Ljava/lang/Object;
at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:73)
at org.apache.spark.ml.feature.HashingTF.transformSchema(HashingTF.scala:76)
at org.apache.spark.ml.feature.HashingTF.transform(HashingTF.scala:64)
at com.pws.fantasySport.ml.TDIDFTest.runPipleLineTF_IDF(TDIDFTest.java:52)
at com.pws.fantasySport.ml.TDIDFTest.test(TDIDFTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.
java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j
ava:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja
va:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav
a:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26
)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef
erence.java:50)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3
8)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:459)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:675)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.
java:382)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner
.java:192)


From:  Jakob Odersky <joder...@gmail.com>
Date:  Monday, January 18, 2016 at 3:20 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: trouble using eclipse to view spark source code

> Have you followed the guide on how to import spark into eclipse
> https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#Usefu
> lDeveloperTools-Eclipse ?
> 
> On 18 January 2016 at 13:04, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> Hi 
>> 
>> My project is implemented using Java 8 and Python. Some times its handy to
>> look at the spark source code. For unknown reason if I open a spark project
>> my java projects show tons of compiler errors. I think it may have something
>> to do with Scala. If I close the projects my java code is fine.
>> 
>> I typically I only want to import the machine learning and streaming
>> projects.
>> 
>> I am not sure if this is an issue or not but my java projects are built using
>> gradel
>> 
>> In eclipse preferences -> scala -> installations I selected Scala: 2.10.6
>> (built in)
>> 
>> Any suggestions would be greatly appreciate
>> 
>> Andy
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

trouble using eclipse to view spark source code

2016-01-18 Thread Andy Davidson
Hi 

My project is implemented using Java 8 and Python. Some times its handy to
look at the spark source code. For unknown reason if I open a spark project
my java projects show tons of compiler errors. I think it may have something
to do with Scala. If I close the projects my java code is fine.

I typically I only want to import the machine learning and streaming
projects.

I am not sure if this is an issue or not but my java projects are built
using gradel

In eclipse preferences -> scala -> installations I selected Scala: 2.10.6
(built in)

Any suggestions would be greatly appreciate

Andy








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: has any one implemented TF_IDF using ML transformers?

2016-01-18 Thread Andy Davidson
hecked")

JavaRDD<List> documents = sc.parallelize(Arrays.asList(

  Arrays.asList("this is a sentence".split(" ")),

  Arrays.asList("this is another sentence".split(" ")),

  Arrays.asList("this is still a sentence".split(" "))), 2);

JavaRDD termFreqs = tf.transform(documents);

termFreqs.collect();

IDF idf = new IDF();

JavaRDD tfIdfs = idf.fit(termFreqs).transform(termFreqs);

List localTfIdfs = tfIdfs.collect();

int indexOfThis = tf.indexOf("this");

System.err.println("AEDWIP: indexOfThis: " + indexOfThis);



int indexOfSentence = tf.indexOf("sentence");

System.err.println("AEDWIP: indexOfSentence: " + indexOfSentence);



int indexOfAnother = tf.indexOf("another");

System.err.println("AEDWIP: indexOfAnother: " + indexOfAnother);



for (Vector v: localTfIdfs) {

System.err.println("AEDWIP: V.toString() " + v.toString());

  Assert.assertEquals(0.0, v.apply(indexOfThis), 1e-15);

}

  }



$ mvn test -DwildcardSuites=none
-Dtest=org.apache.spark.mllib.feature.JavaTfIdfSuite


AEDWIP: indexOfThis: 413342

AEDWIP: indexOfSentence: 251491

AEDWIP: indexOfAnother: 263939

AEDWIP: V.toString()
(1048576,[97,3370,251491,413342],[0.28768207245178085,0.0,0.0,0.0])

AEDWIP: V.toString()
(1048576,[3370,251491,263939,413342],[0.0,0.0,0.6931471805599453,0.0])

AEDWIP: V.toString()
(1048576,[97,3370,251491,413342,713128],[0.28768207245178085,0.0,0.0,0.0,0.6
931471805599453])

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.908 sec -
in org.apache.spark.mllib.feature.JavaTfIdfSuite


From:  Yanbo Liang <yblia...@gmail.com>
Date:  Sunday, January 17, 2016 at 12:34 AM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: has any one implemented TF_IDF using ML transformers?

> Hi Andy,
> 
> Actually, the output of ML IDF model is the TF-IDF vector of each instance
> rather than IDF vector.
> So it's unnecessary to do member wise multiplication to calculate TF-IDF
> value. You can refer the code at here:
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/sp
> ark/ml/feature/IDF.scala#L121
> I found the document of IDF is not very clear, we need to update it.
> 
> Thanks
> Yanbo
> 
> 2016-01-16 6:10 GMT+08:00 Andy Davidson <a...@santacruzintegration.com>:
>> I wonder if I am missing something? TF-IDF is very popular. Spark ML has a
>> lot of transformers how ever it TF_IDF is not supported directly.
>> 
>> Spark provide a HashingTF and IDF transformer. The java doc
>> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
>> 
>> Mentions you can implement TFIDF as follows
>> 
>> TFIDF(t,d,D)=TF(t,d)・IDF(t,D).
>> 
>> The problem I am running into is both HashingTF and IDF return a sparse
>> vector.
>> 
>> Ideally the spark code  to implement TFIDF would be one line.
>> 
>>  DataFrame ret = tmp.withColumn("features",
>> tmp.col("tf").multiply(tmp.col("idf")));
>> 
>> org.apache.spark.sql.AnalysisException: cannot resolve '(tf * idf)' due to
>> data type mismatch: '(tf * idf)' requires numeric type, not vector;
>> 
>> I could implement my own UDF to do member wise multiplication how ever given
>> how common TF-IDF is I wonder if this code already exists some where
>> 
>> I found  org.apache.spark.util.Vector.Multiplier. There is no documentation
>> how ever give the argument is double, my guess is it just does scalar
>> multiplication. 
>> 
>> I guess I could do something like
>> 
>> Double[] v = mySparkVector.toArray();
>>  Then use JBlas to do member wise multiplication
>> 
>> I assume sparceVectors are not distributed so there  would not be any
>> additional communication cost
>> 
>> 
>> If this code is truly missing. I would be happy to write it and donate it
>> 
>> Andy
>> 
>> 
>> From:  Andrew Davidson <a...@santacruzintegration.com>
>> Date:  Wednesday, January 13, 2016 at 2:52 PM
>> To:  "user @spark" <user@spark.apache.org>
>> Subject:  trouble calculating TF-IDF data type mismatch: '(tf * idf)'
>> requires numeric type, not vector;
>> 
>>> Bellow is a little snippet of my Java Test Code. Any idea how I implement
>>> member wise vector multiplication?
>>> 
>>> Kind regards
>>> 
>>> Andy
>>> 
>>> transformed df printSchema()
>>> 
>>> root
>>> 
>>>  |-- id: integer (nullable = false)

  1   2   3   >