Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Hi Deenar,
As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
use the below to start pyspark, but the error is the exact same as before.

$ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
$SPARK_HOME/bin/pyspark --deploy-mode client

Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>> sqlContext2 = HiveContext(sc)
>>> sqlContext2 = HiveContext(sc)
>>> sqlContext2.sql("show databases").first()
15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying to
get groups for user biapp: id: biapp: No such user

15/10/29 09:07:35 WARN UserGroupInformation: No groups available for user
biapp
Traceback (most recent call last):
  File "", line 1, in 
  File
"/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
  File
"/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
line 660, in _ssql_ctx
"build/sbt assembly", e)
Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
>>>


On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar 
wrote:

> *Hi Zoltan*
>
> Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf
>
> Deenar
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
> On 28 October 2015 at 14:28, Zoltan Fedor 
> wrote:
>
>> Hi,
>> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it in
>> yarn client mode with Hive.
>>
>> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
>> able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.
>>
>> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and
>> also in $SPARK_HOME/conf/hive-site.xml
>>
>> When I start pyspark with the below command and then run some simple
>> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>>
>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>
>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>> Type "help", "copyright", "credits" or "license" for more information.
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/28 10:22:33 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/28 10:22:35 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/28 10:22:59 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 

Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
One more question, if i have a function which takes RDD as a parameter, how
do we mock an RDD ??

On Thu, Oct 29, 2015 at 5:20 PM, Priya Ch 
wrote:

> How do we do it for Cassandra..can we use the same Mocking ?
> EmbeddedCassandra Server is available with CassandraUnit. Can this be used
> in Spark Code as well ? I mean with Scala code ?
>
> On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий  > wrote:
>
>> there is example how i mock mysql
>> import org.scalamock.scalatest.MockFactory
>>  val connectionMock = mock[java.sql.Connection]
>>  val statementMock = mock[PreparedStatement]
>> (conMock.prepareStatement(_:
>> String)).expects(sql.toString).returning(statementMock)
>> (statementMock.executeUpdate _).expects()
>>
>>
>> On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch 
>> wrote:
>>
>>> Hi All,
>>>
>>>   For my  Spark Streaming code, which writes the results to Cassandra
>>> DB, I need to write Unit test cases. what are the available test frameworks
>>> to mock the connection to Cassandra DB ?
>>>
>>
>>
>


Re: Packaging a jar for a jdbc connection using sbt assembly and scala.

2015-10-29 Thread Dean Wood
Hi

Actually, I'm using spark 1.5.1. I have it in standalone mode on my laptop
for testing purposes at the moment. I have no doubt that if I were on a
cluster, I'd have to put the jar on all the workers although I will point
out that is really irritating to have to do that having built a fat jar
containing all dependencies.

Anyway, I have solved my issue and it turned out to be something utterly
trivial. The spark-submit script requires settings, like --jars and
--driver-class-path, to come as the first parameter. So my submit command
had to be:

 ./bin/spark-submit --driver-class-path ~/path/to/mysql-connector-
java-5.1.37-bin.jar ~/path/to/scala/project/target/scala-2.10/complete.jar

Thanks

Dean

On 29 October 2015 at 11:14, Deenar Toraskar 
wrote:

> Hi Dean
>
> I guess you are using Spark 1.3.
>
>
>- The JDBC driver class must be visible to the primordial class loader
>on the client session and on all executors. This is because Java’s
>DriverManager class does a security check that results in it ignoring all
>drivers not visible to the primordial class loader when one goes to open a
>connection. One convenient way to do this is to modify compute_classpath.sh
>on all worker nodes to include your driver JARs.
>
> Take a look at this https://issues.apache.org/jira/browse/SPARK-6913 and
> see
> http://stackoverflow.com/questions/30221677/spark-sql-postgresql-jdbc-classpath-issues
> .
>
>
> Regards
> Deenar
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 29 October 2015 at 10:34, dean.wood  wrote:
>
>> I'm having a problem building a spark jar with scala. It's a really simple
>> thing, I want to programatically access a mysql server via JDBC and load
>> it
>>
>
> On 29 October 2015 at 10:34, dean.wood  wrote:
>
>> I'm having a problem building a spark jar with scala. It's a really simple
>> thing, I want to programatically access a mysql server via JDBC and load
>> it
>> in to a spark data frame. I can get this to work in the spark shell but I
>> cannot package a jar that works with spark submit. It will package but
>> when
>> running, fails with
>>
>> Exception in thread "main" java.sql.SQLException: No suitable driver found
>> for jdbc:mysql://localhost:3310/100million
>> My spark-submit command is
>>
>> ./bin/spark-submit ~/path/to/scala/project/target/scala-2.10/complete.jar
>> --driver-class-path ~/path/to/mysql-connector-java-5.1.37-bin.jar
>>
>> My build.sbt looks like
>>
>> name := "sql_querier"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>> sbtVersion := "0.13.7"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>> "provided"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" %
>> "provided"
>>
>> libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.37"
>>
>> assemblyJarName in assembly := "complete.jar"
>>
>> mainClass in assembly := Some("sql_querier")
>>
>> offline := true
>> and my very simple code is
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkConf
>> import org.apache.spark.sql.SQLContext
>>
>> object sql_querier{
>>
>> def main(args: Array[String]) {
>>
>> val sc = new org.apache.spark.SparkContext()
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> val url="jdbc:mysql://databaseurl:portno/database"
>>
>> val prop = new java.util.Properties
>> prop.setProperty("user","myuser")
>> prop.setProperty("password","mydatabase")
>> val cats=sqlContext.read.jdbc(url, "categories", prop)
>> cats.show
>>  }
>>  }
>> Where I've hidden the real values for user password and database url. I've
>> also got a file in projects that adds the sbt assembly plugin and there is
>> nothing wrong with this. I've successfully used sbt assembly before with
>> this configuration. When starting a spark shell with the
>> --driver-class-path
>> option pointing to the mysql jar, I can run the commands and extract data
>> from the mysql database.
>>
>> I've tried version 5.1.34 and 5.0.8 and neither have worked. I've also
>> tried
>> changing --driver-class-path for --jar in the spark submit command and
>> adding the lines
>>
>>
>>
>> sc.addJar("/Users/dean.wood/data_science/scala/sqlconn/mysql-connector-java-5.0.8-bin.jar")
>> Class.forName("com.mysql.jdbc.Driver")
>>
>> to the scala code.
>>
>> Any clue what I am doing wrong with the build would be greatly
>> appreciated.
>>
>> Dean
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-jar-for-a-jdbc-connection-using-sbt-assembly-and-scala-tp25225.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> 

RE: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Sabarish Sasidharan
If you are writing to S3, also make sure that you are using the direct
output committer. I don't have streaming jobs but it helps in my machine
learning jobs. Also, though more partitions help in processing faster, they
do slow down writes to S3. So you might want to coalesce before writing to
S3.

Regards
Sab
On 29-Oct-2015 6:21 pm, "Afshartous, Nick"  wrote:

> < Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> Thanks for you input.  The 3 minute window was chosen because we write the
> output of each batch into S3.  And with smaller batch time intervals there
> were many small files being written to S3, something to avoid.  That was
> the explanation of the developer who made this decision (who's no longer on
> the team).   We're in the process of re-evaluating.
> --
>  Nick
>
> -Original Message-
> From: Adrian Tanase [mailto:atan...@adobe.com]
> Sent: Wednesday, October 28, 2015 4:53 PM
> To: Afshartous, Nick 
> Cc: user@spark.apache.org
> Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
> > onsToPair(JavaDStreamLike.scala:43)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
> > erDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
> > ava:57)
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> > orImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
> > Master.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> > additional commands, e-mail: user-h...@spark.apache.org
> >
>
> Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Packaging a jar for a jdbc connection using sbt assembly and scala.

2015-10-29 Thread Kai Wei
Submiting your app in client mode may help with your problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-jar-for-a-jdbc-connection-using-sbt-assembly-and-scala-tp25225p25228.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
Are you using Spark built with hive ?

# Apache Hadoop 2.4.X with Hive 13 support
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive
-Phive-thriftserver -DskipTests clean package


On 29 October 2015 at 13:08, Zoltan Fedor  wrote:

> Hi Deenar,
> As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
> ($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
> use the below to start pyspark, but the error is the exact same as before.
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client
>
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2.sql("show databases").first()
> 15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/29 09:07:35 WARN UserGroupInformation: No groups available for user
> biapp
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 552, in sql
> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 660, in _ssql_ctx
> "build/sbt assembly", e)
> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
> >>>
>
>
> On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> *Hi Zoltan*
>>
>> Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf
>>
>> Deenar
>>
>> *Think Reactive Ltd*
>> deenar.toras...@thinkreactive.co.uk
>> 07714140812
>>
>> On 28 October 2015 at 14:28, Zoltan Fedor 
>> wrote:
>>
>>> Hi,
>>> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it
>>> in yarn client mode with Hive.
>>>
>>> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
>>> able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.
>>>
>>> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and
>>> also in $SPARK_HOME/conf/hive-site.xml
>>>
>>> When I start pyspark with the below command and then run some simple
>>> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>>
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/28 10:22:33 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/28 10:22:35 WARN NativeCodeLoader: 

Re: spark-1.5.1 application detail ui url

2015-10-29 Thread Jean-Baptiste Onofré

Hi,

The running application UI should be available on the worker IP (on 4040 
default port), right ?


So, basically, the problem is on the link of the master UI, correct ?

Regards
JB

On 10/29/2015 01:45 PM, carlilek wrote:

I administer an HPC cluster that runs Spark clusters as jobs. We run Spark
over the backend network (typically used for MPI), which is not accessible
outside the cluster. Until we upgraded to 1.5.1 (from 1.3.1), this did not
present a problem. Now the Application Detail UI link is returning the IP
address of the backend network of the driver machine rather than that
machine's hostname. Consequentially, users cannot access that page.  I am
unsure what might have changed or how I might change the behavior back.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-5-1-application-detail-ui-url-tp25226.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
How do we do it for Cassandra..can we use the same Mocking ?
EmbeddedCassandra Server is available with CassandraUnit. Can this be used
in Spark Code as well ? I mean with Scala code ?

On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий 
wrote:

> there is example how i mock mysql
> import org.scalamock.scalatest.MockFactory
>  val connectionMock = mock[java.sql.Connection]
>  val statementMock = mock[PreparedStatement]
> (conMock.prepareStatement(_:
> String)).expects(sql.toString).returning(statementMock)
> (statementMock.executeUpdate _).expects()
>
>
> On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch 
> wrote:
>
>> Hi All,
>>
>>   For my  Spark Streaming code, which writes the results to Cassandra DB,
>> I need to write Unit test cases. what are the available test frameworks to
>> mock the connection to Cassandra DB ?
>>
>
>


Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Cody Koeninger
Consuming from kafka is inherently limited to using a number of consumer
nodes less than or equal to the number of kafka partitions.  If you think
about it, you're going to be paying some network cost to repartition that
data from a consumer to different processing nodes, regardless of what
Spark consumer library you use.

If you really need finer grained parallelism, and want to do it in a more
efficient manner, you need to move that partitioning to the producer (i.e.
add more partitions to kafka).

On Thu, Oct 29, 2015 at 6:11 AM, Adrian Tanase  wrote:

> You can call .repartition on the Dstream created by the Kafka direct
> consumer. You take the one-time hit of a shuffle but gain the ability to
> scale out processing beyond your number of partitions.
>
> We’re doing this to scale up from 36 partitions / topic to 140 partitions
> (20 cores * 7 nodes) and it works great.
>
> -adrian
>
> From: varun sharma
> Date: Thursday, October 29, 2015 at 8:27 AM
> To: user
> Subject: Need more tasks in KafkaDirectStream
>
> Right now, there is one to one correspondence between kafka partitions and
> spark partitions.
> I dont have a requirement of one to one semantics.
> I need more tasks to be generated in the job so that it can be
> parallelised and batch can be completed fast. In the previous Receiver
> based approach number of tasks created were independent of kafka
> partitions, I need something like that only.
> Any config available if I dont need one to one semantics?
> Is there any way I can repartition without incurring any additional cost.
>
> Thanks
> *VARUN SHARMA*
>
>


Re: nested select is not working in spark sql

2015-10-29 Thread Deenar Toraskar
You can try the following syntax

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries

SELECT *
FROM A
WHERE A.a IN (SELECT foo FROM B);

Regards
Deenar
*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 28 October 2015 at 14:37, Richard Hillegas  wrote:

> On 28 October 2015 at 14:37, Richard Hillegas  wrote:

>

Hi Kishor,
>
> Spark doesn't currently support subqueries in the WHERE clause. However,
> it looks as though someone is working on this right now:
> https://issues.apache.org/jira/browse/SPARK-4226
>
> Hope this helps,
> Rick Hillegas
>
>
>
> Kishor Bachhav  wrote on 10/28/2015 05:52:50 AM:
>
> > From: Kishor Bachhav 
> > To: user@spark.apache.org
> > Date: 10/28/2015 05:53 AM
> > Subject: nested select is not working in spark sql
>
> >
> > Hi,
>
> > I am trying to execute below query in spark sql but throws exception
> >
> > select n_name from NATION where n_regionkey = (select r_regionkey
> > from REGION where r_name='ASIA')
>
> > Exception:
> > Exception in thread "main" java.lang.RuntimeException: [1.55]
> > failure: ``)'' expected but identifier r_regionkey found
> >
> > select n_name from NATION where n_regionkey = (select r_regionkey
> > from REGION where r_name='ASIA')
> >   ^
> > at scala.sys.package$.error(package.scala:27)
> > at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse
> > (AbstractSparkSQLParser.scala:36)
> > at
> org.apache.spark.sql.SnappyParserDialect.parse(snappyParsers.scala:65)
> > at
> org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
> > at
> org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
> > at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark
> > $sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)
> > at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark
> > $sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
> > at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> > at
> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map
> > $1.apply(Parsers.scala:242)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map
> > $1.apply(Parsers.scala:242)
> > at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1$$anonfun$apply$2.apply(Parsers.scala:254)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1$$anonfun$apply$2.apply(Parsers.scala:254)
> > at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1.apply(Parsers.scala:254)
> > at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append
> > $1.apply(Parsers.scala:254)
> > at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> > at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply
> > $14.apply(Parsers.scala:891)
> > at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply
> > $14.apply(Parsers.scala:891)
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> > at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> > at scala.util.parsing.combinator.PackratParsers$$anon$1.apply
> > (PackratParsers.scala:110)
> > at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse
> > (AbstractSparkSQLParser.scala:34)
> >
>
> > Same is working in mysql as well as memsql.
>
> > Expected Result is
> >
> > memsql> select n_name from NATION where n_regionkey = (select
> > r_regionkey from REGION where r_name='ASIA');
> > +---+
> > | n_name|
> > +---+
> > | INDIA |
> > | INDONESIA |
> > | JAPAN |
> > | CHINA |
> > | VIETNAM   |
> > +---+
> > 5 rows in set (0.71 sec)
>
> > How can I make this work in spark sql?
>
> > Actually above query is one simplified version of Minimum cost
> > supplier query (Q2) of TPCH which has this nested select nature. I
> > am working on these TPCH queries. If anybody has the modified set of
> > TPCH queries for spark sql, kindly let me know. It will be very useful
> for me.
> >
> > select
> > s_acctbal,
> > s_name,
> > n_name,
> > p_partkey,
> > p_mfgr,
> > s_address,
> > s_phone,
> > s_comment
> > from
> > part,
> > supplier,
> > partsupp,
> > nation,
> > region
> > where
> > p_partkey = ps_partkey
> > and s_suppkey = ps_suppkey
> > and p_size = [SIZE]
> > and p_type like '%[TYPE]'
> > and s_nationkey = n_nationkey
> > and n_regionkey = r_regionkey
> > and r_name = '[REGION]'
> > and ps_supplycost = (
> >   select
> > min(ps_supplycost)
> > from
> > partsupp, supplier,
> > nation, region
> > 

Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Priya Ch
Hi All,

  For my  Spark Streaming code, which writes the results to Cassandra DB, I
need to write Unit test cases. what are the available test frameworks to
mock the connection to Cassandra DB ?


[Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-29 Thread Yifan LI
Hey,

I was just trying to scan a large RDD sortedRdd, ~1billion elements, using 
toLocalIterator api, but an exception returned as it was almost finished:

java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 

Pivot Data in Spark and Scala

2015-10-29 Thread Ascot Moss
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks


Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Василец Дмитрий
there is example how i mock mysql
import org.scalamock.scalatest.MockFactory
 val connectionMock = mock[java.sql.Connection]
 val statementMock = mock[PreparedStatement]
(conMock.prepareStatement(_:
String)).expects(sql.toString).returning(statementMock)
(statementMock.executeUpdate _).expects()


On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch 
wrote:

> Hi All,
>
>   For my  Spark Streaming code, which writes the results to Cassandra DB,
> I need to write Unit test cases. what are the available test frameworks to
> mock the connection to Cassandra DB ?
>


Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-29 Thread Yifan LI
I have a guess that before scanning that RDD, I sorted it and set partitioning, 
so the result is not balanced:

sortBy[S](f: Function 
[T,
 S], ascending: Boolean, numPartitions: Int)

I will try to repartition it to see if it helps.

Best,
Yifan LI





> On 29 Oct 2015, at 12:52, Yifan LI  wrote:
> 
> Hey,
> 
> I was just trying to scan a large RDD sortedRdd, ~1billion elements, using 
> toLocalIterator api, but an exception returned as it was almost finished:
> 
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>   at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>   at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>   at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>   at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>   at java.lang.Thread.run(Thread.java:745)
> 
>   at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
>   at 
> 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-29 Thread Deenar Toraskar
Hi Bryan

For your use case you don't need to have multiple metastores. The default
metastore uses embedded Derby
.
This cannot be shared amongst multiple processes. Just switch to a
metastore that supports multiple connections viz. Networked Derby or mysql.
see https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode

Deenar


*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


On 29 October 2015 at 00:56, Bryan  wrote:

> Yana,
>
> My basic use-case is that I want to process streaming data, and publish it
> to a persistent spark table. After that I want to make the published data
> (results) available via JDBC and spark SQL to drive a web API. That would
> seem to require two drivers starting separate HiveContexts (one for
> sparksql/jdbc, one for streaming)
>
> Is there a way to share a hive context between the driver for the thrift
> spark SQL instance and the streaming spark driver? A better method to do
> this?
>
> An alternate option might be to create the table in two separate
> metastores and simply use the same storage location for the data. That
> seems very hacky though, and likely to result in maintenance issues.
>
> Regards,
>
> Bryan Jeffrey
> --
> From: Yana Kadiyska 
> Sent: ‎10/‎28/‎2015 8:32 PM
> To: Bryan Jeffrey 
> Cc: Susan Zhang ; user 
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
>
> For this issue in particular ( ERROR XSDB6: Another instance of Derby may
> have already booted the database /spark/spark-1.4.1/metastore_db) -- I
> think it depends on where you start your application and HiveThriftserver
> from. I've run into a similar issue running a driver app first, which would
> create a directory called metastore_db. If I then try to start SparkShell
> from the same directory, I will see this exception. So it is like
> SPARK-9776. It's not so much that the two are in the same process (as the
> bug resolution states) I think you can't run 2 drivers which start a
> HiveConext from the same directory.
>
>
> On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey 
> wrote:
>
>> All,
>>
>> One issue I'm seeing is that I start the thrift server (for jdbc access)
>> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
>> spark://master:7077 --hiveconf "spark.cores.max=2"
>>
>> After about 40 seconds the Thrift server is started and available on
>> default port 1.
>>
>> I then submit my application - and the application throws the following
>> error:
>>
>> Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
>> with class loader
>> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
>> see the next exception for details.
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>> Source)
>> ... 86 more
>> Caused by: java.sql.SQLException: Another instance of Derby may have
>> already booted the database /spark/spark-1.4.1/metastore_db.
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>> Source)
>> at
>> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
>> Source)
>> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
>> Source)
>> ... 83 more
>> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
>> the database /spark/spark-1.4.1/metastore_db.
>>
>> This also happens if I do the opposite (submit the application first, and
>> then start the thrift server).
>>
>> It looks similar to the following issue -- but not quite the same:
>> https://issues.apache.org/jira/browse/SPARK-9776
>>
>> It seems like this set of steps works fine if the metadata database is
>> not yet created - but once it's created this happens every time.  Is this a
>> known issue? Is there a workaround?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
>> wrote:
>>
>>> Susan,
>>>
>>> I did give that a shot -- I'm seeing a number of oddities:
>>>
>>> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
>>> It will work for 'machinename', but not 'machineName' or 'machine_name'.
>>> (2) When partitioning with maps included in the data I get odd string
>>> conversion issues
>>> (3) When partitioning without maps I see frequent out of memory issues
>>>
>>> I'll update this email when I've got a more concrete example of problems.
>>>
>>> 

Re: Mock Cassandra DB Connection in Unit Testing

2015-10-29 Thread Adrian Tanase
Does it need to be a mock? Can you use sc.parallelize(data)?

From: Priya Ch
Date: Thursday, October 29, 2015 at 2:00 PM
To: Василец Дмитрий
Cc: "user@spark.apache.org", 
"spark-connector-u...@lists.datastax.com"
Subject: Re: Mock Cassandra DB Connection in Unit Testing

One more question, if i have a function which takes RDD as a parameter, how do 
we mock an RDD ??

On Thu, Oct 29, 2015 at 5:20 PM, Priya Ch 
> wrote:
How do we do it for Cassandra..can we use the same Mocking ? EmbeddedCassandra 
Server is available with CassandraUnit. Can this be used in Spark Code as well 
? I mean with Scala code ?

On Thu, Oct 29, 2015 at 5:03 PM, Василец Дмитрий 
> wrote:
there is example how i mock mysql
import org.scalamock.scalatest.MockFactory
 val connectionMock = mock[java.sql.Connection]
 val statementMock = mock[PreparedStatement]
(conMock.prepareStatement(_: 
String)).expects(sql.toString).returning(statementMock)
(statementMock.executeUpdate _).expects()


On Thu, Oct 29, 2015 at 12:27 PM, Priya Ch 
> wrote:
Hi All,

  For my  Spark Streaming code, which writes the results to Cassandra DB, I 
need to write Unit test cases. what are the available test frameworks to mock 
the connection to Cassandra DB ?





Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Adrian Tanase
You can decouple the batch interval and the window sizes. If during processing 
you’re aggregating data and your operations benefit of an inverse function, 
then you can optimally process windows of data.

E.g. You could set a global batch interval of 10 seconds. You can process the 
incoming data from Kafka, aggregating the input.
Then you can create a window of 3 minutes (both length and slide) over the 
partial results. In this case the inverse function is not helpful as all the 
data is new in every window.

You can even coalesce the final Dstream to avoid writing many small files. For 
example you could be writing LESS files MORE OFTEN and achieve a similar effect.

All of this is of course hypothetical since I don’t know what processing you 
are applying to the data coming from Kafka. More like food for thought.

-adrian





On 10/29/15, 2:50 PM, "Afshartous, Nick"  wrote:

>< Does it work as expected with smaller batch or smaller load? Could it be 
>that it's accumulating too many events over 3 minutes?
>
>Thanks for you input.  The 3 minute window was chosen because we write the 
>output of each batch into S3.  And with smaller batch time intervals there 
>were many small files being written to S3, something to avoid.  That was the 
>explanation of the developer who made this decision (who's no longer on the 
>team).   We're in the process of re-evaluating.
>--
> Nick
>
>-Original Message-
>From: Adrian Tanase [mailto:atan...@adobe.com]
>Sent: Wednesday, October 28, 2015 4:53 PM
>To: Afshartous, Nick 
>Cc: user@spark.apache.org
>Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>
>Does it work as expected with smaller batch or smaller load? Could it be that 
>it's accumulating too many events over 3 minutes?
>
>You could also try increasing the parallelism via repartition to ensure 
>smaller tasks that can safely fit in working memory.
>
>Sent from my iPhone
>
>> On 28 Oct 2015, at 17:45, Afshartous, Nick  wrote:
>>
>>
>> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job 
>> and seeing a problem.  This is running in AWS/Yarn and the streaming batch 
>> interval is set to 3 minutes and this is a ten node cluster.
>>
>> Testing at 30,000 events per second we are seeing the streaming job get 
>> stuck (stack trace below) for over an hour.
>>
>> Thanks on any insights or suggestions.
>> --
>>  Nick
>>
>> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
>> onsToPair(JavaDStreamLike.scala:43)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
>> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> erDriver.main(StreamingKafkaConsumerDriver.java:71)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
>> ava:57)
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
>> orImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:606)
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
>> Master.scala:480)
>>
>> Notice: This communication is for the intended recipient(s) only and may 
>> contain confidential, proprietary, legally protected or privileged 
>> information of Turbine, Inc. If you are not the intended recipient(s), 
>> please notify the sender at once and delete this communication. Unauthorized 
>> use of the information in this communication is strictly prohibited and may 
>> be unlawful. For those recipients under contract with Turbine, Inc., the 
>> information in this communication is subject to the terms and conditions of 
>> any applicable contracts or agreements.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>> additional commands, e-mail: user-h...@spark.apache.org
>>
>
>Notice: This communication is for the intended recipient(s) only and may 
>contain confidential, proprietary, legally protected or privileged information 
>of Turbine, Inc. If you are not the intended recipient(s), please notify the 
>sender at once and delete this communication. Unauthorized use of the 
>information in this communication is strictly prohibited and may be unlawful. 
>For those recipients under contract with Turbine, Inc., the information in 
>this communication is subject to the terms and conditions of any applicable 
>contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread Cody Koeninger
If you're writing to s3, want to avoid small files, and don't actually need
3 minute latency... you may want to consider just running a regular spark
job (using KafkaUtils.createRDD) at scheduled intervals rather than a
streaming job.

On Thu, Oct 29, 2015 at 8:16 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> If you are writing to S3, also make sure that you are using the direct
> output committer. I don't have streaming jobs but it helps in my machine
> learning jobs. Also, though more partitions help in processing faster, they
> do slow down writes to S3. So you might want to coalesce before writing to
> S3.
>
> Regards
> Sab
> On 29-Oct-2015 6:21 pm, "Afshartous, Nick" 
> wrote:
>
>> < Does it work as expected with smaller batch or smaller load? Could it
>> be that it's accumulating too many events over 3 minutes?
>>
>> Thanks for you input.  The 3 minute window was chosen because we write
>> the output of each batch into S3.  And with smaller batch time intervals
>> there were many small files being written to S3, something to avoid.  That
>> was the explanation of the developer who made this decision (who's no
>> longer on the team).   We're in the process of re-evaluating.
>> --
>>  Nick
>>
>> -Original Message-
>> From: Adrian Tanase [mailto:atan...@adobe.com]
>> Sent: Wednesday, October 28, 2015 4:53 PM
>> To: Afshartous, Nick 
>> Cc: user@spark.apache.org
>> Subject: Re: Spark/Kafka Streaming Job Gets Stuck
>>
>> Does it work as expected with smaller batch or smaller load? Could it be
>> that it's accumulating too many events over 3 minutes?
>>
>> You could also try increasing the parallelism via repartition to ensure
>> smaller tasks that can safely fit in working memory.
>>
>> Sent from my iPhone
>>
>> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
>> wrote:
>> >
>> >
>> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
>> job and seeing a problem.  This is running in AWS/Yarn and the streaming
>> batch interval is set to 3 minutes and this is a ten node cluster.
>> >
>> > Testing at 30,000 events per second we are seeing the streaming job get
>> stuck (stack trace below) for over an hour.
>> >
>> > Thanks on any insights or suggestions.
>> > --
>> >  Nick
>> >
>> > org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartiti
>> > onsToPair(JavaDStreamLike.scala:43)
>> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> > erDriver.runStream(StreamingKafkaConsumerDriver.java:125)
>> > com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsum
>> > erDriver.main(StreamingKafkaConsumerDriver.java:71)
>> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
>> > ava:57)
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
>> > orImpl.java:43)
>> > java.lang.reflect.Method.invoke(Method.java:606)
>> > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(Application
>> > Master.scala:480)
>> >
>> > Notice: This communication is for the intended recipient(s) only and
>> may contain confidential, proprietary, legally protected or privileged
>> information of Turbine, Inc. If you are not the intended recipient(s),
>> please notify the sender at once and delete this communication.
>> Unauthorized use of the information in this communication is strictly
>> prohibited and may be unlawful. For those recipients under contract with
>> Turbine, Inc., the information in this communication is subject to the
>> terms and conditions of any applicable contracts or agreements.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>> > additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> Notice: This communication is for the intended recipient(s) only and may
>> contain confidential, proprietary, legally protected or privileged
>> information of Turbine, Inc. If you are not the intended recipient(s),
>> please notify the sender at once and delete this communication.
>> Unauthorized use of the information in this communication is strictly
>> prohibited and may be unlawful. For those recipients under contract with
>> Turbine, Inc., the information in this communication is subject to the
>> terms and conditions of any applicable contracts or agreements.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


spark-1.5.1 application detail ui url

2015-10-29 Thread carlilek
I administer an HPC cluster that runs Spark clusters as jobs. We run Spark
over the backend network (typically used for MPI), which is not accessible
outside the cluster. Until we upgraded to 1.5.1 (from 1.3.1), this did not
present a problem. Now the Application Detail UI link is returning the IP
address of the backend network of the driver machine rather than that
machine's hostname. Consequentially, users cannot access that page.  I am
unsure what might have changed or how I might change the behavior back.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-5-1-application-detail-ui-url-tp25226.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Exception while reading from kafka stream

2015-10-29 Thread Ramkumar V
Hi,

I'm trying to read from kafka stream and printing it textfile. I'm using
java over spark. I dont know why i'm getting the following exception.
Also exception message is very abstract.  can anyone please help me ?

Log Trace :

15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



*Thanks*,



Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Yes, I am. It was compiled with the following:

export SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
export SPARK_YARN=true
export SPARK_HIVE=true
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
-XX:ReservedCodeCacheSize=512m"
mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0-cdh5.3.3 -Phive
-Phive-thriftserver -DskipTests clean package

On Thu, Oct 29, 2015 at 10:16 AM, Deenar Toraskar  wrote:

> Are you using Spark built with hive ?
>
> # Apache Hadoop 2.4.X with Hive 13 support
> mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
> -DskipTests clean package
>
>
> On 29 October 2015 at 13:08, Zoltan Fedor 
> wrote:
>
>> Hi Deenar,
>> As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
>> ($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
>> use the below to start pyspark, but the error is the exact same as before.
>>
>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>
>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>> Type "help", "copyright", "credits" or "license" for more information.
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>> SparkContext available as sc, HiveContext available as sqlContext.
>> >>> sqlContext2 = HiveContext(sc)
>> >>> sqlContext2 = HiveContext(sc)
>> >>> sqlContext2.sql("show databases").first()
>> 15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> 15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying
>> to get groups for user biapp: id: biapp: No such user
>>
>> 15/10/29 09:07:35 WARN UserGroupInformation: No groups available for user
>> biapp
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>> line 552, in sql
>> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>>   File
>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>> line 660, in _ssql_ctx
>> "build/sbt assembly", e)
>> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
>> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
>> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
>> >>>
>>
>>
>> On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> *Hi Zoltan*
>>>
>>> Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf
>>>
>>> Deenar
>>>
>>> *Think Reactive Ltd*
>>> deenar.toras...@thinkreactive.co.uk
>>> 07714140812
>>>
>>> On 28 October 2015 at 14:28, Zoltan Fedor 
>>> wrote:
>>>
 Hi,
 We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it
 in yarn client mode with Hive.

 I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
 able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.

 hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and
 also in $SPARK_HOME/conf/hive-site.xml

 When I start pyspark with the below command and then run some simple
 SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml

 $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
 YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
 $SPARK_HOME/bin/pyspark --deploy-mode client

 Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
 [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
 Type "help", "copyright", "credits" or "license" for more information.
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Yes, I have the hive-site.xml in $SPARK_HOME/conf, also in yarn-conf, in
/etc/hive/conf, etc

On Thu, Oct 29, 2015 at 10:46 AM, Kai Wei  wrote:

> Did you try copy it to spark/conf dir?
> On 30 Oct 2015 1:42 am, "Zoltan Fedor"  wrote:
>
>> There is /user/biapp in hdfs. The problem is that the hive-site.xml is
>> being ignored, so it is looking for it locally.
>>
>> On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei  wrote:
>>
>>> Create /user/biapp in hdfs manually first.
>>> On 30 Oct 2015 1:36 am, "Zoltan Fedor"  wrote:
>>>
 Sure, I did it with spark-shell, which seems to be showing the same
 error - not using the hive-site.xml


 $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
 YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
 $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
 $HIVE_CLASSPATH
 Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
 [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
 Type "help", "copyright", "credits" or "license" for more information.
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
 for source because spark.app.id is not set.
 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
 does not exist
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
   /_/

 Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
 SparkContext available as sc, HiveContext available as sqlContext.
 >>>
 biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
 YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
 $SPARK_HOME/bin/spark-shell --deploy-mode client
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
   /_/

 Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
 Type in expressions to have them evaluated.
 Type :help for more information.
 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler
 for source because spark.app.id is not set.
 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 Spark context available as sc.
 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
 does not exist
 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception
 trying to get groups for user biapp: id: biapp: No such user

 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for
 user biapp
 java.lang.RuntimeException:
 org.apache.hadoop.security.AccessControlException: Permission denied:
 user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
 at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
 at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
 at
 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Kai Wei
Failed to see a hadoop-2.5 profile in pom. Maybe that's the problem.
On 30 Oct 2015 1:51 am, "Zoltan Fedor"  wrote:

> The funny thing is, that with Spark 1.2.0 on the same machine (Spark 1.2.0
> is the default shipped with CDH 5.3.3) the same hive-site.xml is being
> picked up and I have no problem whatsoever.
>
> On Thu, Oct 29, 2015 at 10:48 AM, Zoltan Fedor 
> wrote:
>
>> Yes, I have the hive-site.xml in $SPARK_HOME/conf, also in yarn-conf, in
>> /etc/hive/conf, etc
>>
>> On Thu, Oct 29, 2015 at 10:46 AM, Kai Wei  wrote:
>>
>>> Did you try copy it to spark/conf dir?
>>> On 30 Oct 2015 1:42 am, "Zoltan Fedor"  wrote:
>>>
 There is /user/biapp in hdfs. The problem is that the hive-site.xml is
 being ignored, so it is looking for it locally.

 On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei  wrote:

> Create /user/biapp in hdfs manually first.
> On 30 Oct 2015 1:36 am, "Zoltan Fedor" 
> wrote:
>
>> Sure, I did it with spark-shell, which seems to be showing the same
>> error - not using the hive-site.xml
>>
>>
>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>> $HIVE_CLASSPATH
>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>> Type "help", "copyright", "credits" or "license" for more information.
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
>> for source because spark.app.id is not set.
>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name
>> hive.metastore.local does not exist
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>> SparkContext available as sc, HiveContext available as sqlContext.
>> >>>
>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler
>> for source because spark.app.id is not set.
>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> Spark context available as sc.
>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name
>> hive.metastore.local does not exist
>> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception
>> trying to get groups for user biapp: id: biapp: No such user
>>
>> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for
>> user biapp
>> java.lang.RuntimeException:
>> org.apache.hadoop.security.AccessControlException: Permission denied:
>> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>> at
>> 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Possible. Let me try to recompile with

export SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
export SPARK_YARN=true
export SPARK_HIVE=true
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
-XX:ReservedCodeCacheSize=512m"
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.3 -Phive
-Phive-thriftserver -DskipTests clean package

On Thu, Oct 29, 2015 at 11:05 AM, Kai Wei  wrote:

> Failed to see a hadoop-2.5 profile in pom. Maybe that's the problem.
> On 30 Oct 2015 1:51 am, "Zoltan Fedor"  wrote:
>
>> The funny thing is, that with Spark 1.2.0 on the same machine (Spark
>> 1.2.0 is the default shipped with CDH 5.3.3) the same hive-site.xml is
>> being picked up and I have no problem whatsoever.
>>
>> On Thu, Oct 29, 2015 at 10:48 AM, Zoltan Fedor 
>> wrote:
>>
>>> Yes, I have the hive-site.xml in $SPARK_HOME/conf, also in yarn-conf, in
>>> /etc/hive/conf, etc
>>>
>>> On Thu, Oct 29, 2015 at 10:46 AM, Kai Wei  wrote:
>>>
 Did you try copy it to spark/conf dir?
 On 30 Oct 2015 1:42 am, "Zoltan Fedor" 
 wrote:

> There is /user/biapp in hdfs. The problem is that the hive-site.xml is
> being ignored, so it is looking for it locally.
>
> On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei 
> wrote:
>
>> Create /user/biapp in hdfs manually first.
>> On 30 Oct 2015 1:36 am, "Zoltan Fedor" 
>> wrote:
>>
>>> Sure, I did it with spark-shell, which seems to be showing the same
>>> error - not using the hive-site.xml
>>>
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>>> $HIVE_CLASSPATH
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more
>>> information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name
>>> DAGScheduler for source because spark.app.id is not set.
>>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes 
>>> where
>>> applicable
>>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name
>>> hive.metastore.local does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>>
>>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name
>>> DAGScheduler for source because spark.app.id is not set.
>>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes 
>>> where
>>> applicable
>>> Spark context available as sc.
>>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name
>>> hive.metastore.local does not exist

Re: SparkSQL: What is the cost of DataFrame.registerTempTable(String)? Can I have multiple tables referencing to the same DataFrame?

2015-10-29 Thread Michael Armbrust
Its super cheap.  Its just a hashtable stored on the driver.  Yes you can
have more than one name for the same DF.

On Wed, Oct 28, 2015 at 6:17 PM, Anfernee Xu  wrote:

> Hi,
>
> I just want to understand the cost of DataFrame.registerTempTable(String),
> is it just a trivial operation(like creating a object reference) in
> master(Driver) JVM? And Can I have multiple tables with different name
> referencing to the same DataFrame?
>
> Thanks
>
> --
> --Anfernee
>


[SPARK STREAMING ] Sending data to ElasticSearch

2015-10-29 Thread Nipun Arora
Hi,

I am sending data to an elasticsearch deployment. The printing to file
seems to work fine, but I keep getting no-node found for ES when I send
data to it. I suspect there is some special way to handle the connection
object? Can anyone explain what should be changed here?

Thanks
Nipun

The following is the code block where I send the data




addMTSUnmatched.foreach(
new Function() {
@Override
public Void call(JavaRDD stringJavaRDD) throws Exception {
List list = stringJavaRDD.collect();
for (String str : list){
if(OnlineUtils.ESFlag) {
OnlineUtils.printToFile(str, 1,
type1_outputFile, OnlineUtils.client);
}else{
OnlineUtils.printToFile(str, 1, type1_outputFile);
}
}
return null;
}
}
);


public static void printToFile(String str, int type, File fileID,
Client client) throws IOException {
JSONObject obj = new JSONObject(str);
if (obj.has("message")) {
String message = obj.get("message").toString();
obj.put("message", message.replace("'", ""));
}
obj.put("anomaly_type", type);

ESUtils.putMessage(client,"necla","demo",obj.toString());

Files.append(str + "\n", fileID, Charset.defaultCharset());
}


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Kai Wei
Create /user/biapp in hdfs manually first.
On 30 Oct 2015 1:36 am, "Zoltan Fedor"  wrote:

> Sure, I did it with spark-shell, which seems to be showing the same error
> - not using the hive-site.xml
>
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
> $HIVE_CLASSPATH
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>>
> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/spark-shell --deploy-mode client
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Spark context available as sc.
> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for user
> biapp
> java.lang.RuntimeException:
> org.apache.hadoop.security.AccessControlException: Permission denied:
> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6221)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4088)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4058)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4031)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:788)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Kai Wei
Did you try copy it to spark/conf dir?
On 30 Oct 2015 1:42 am, "Zoltan Fedor"  wrote:

> There is /user/biapp in hdfs. The problem is that the hive-site.xml is
> being ignored, so it is looking for it locally.
>
> On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei  wrote:
>
>> Create /user/biapp in hdfs manually first.
>> On 30 Oct 2015 1:36 am, "Zoltan Fedor"  wrote:
>>
>>> Sure, I did it with spark-shell, which seems to be showing the same
>>> error - not using the hive-site.xml
>>>
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>>> $HIVE_CLASSPATH
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>>
>>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> Spark context available as sc.
>>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
>>> to get groups for user biapp: id: biapp: No such user
>>>
>>> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for
>>> user biapp
>>> java.lang.RuntimeException:
>>> org.apache.hadoop.security.AccessControlException: Permission denied:
>>> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)
>>> at
>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6221)
>>> at
>>> 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
I dont know a lot about how pyspark works. Can you possibly try running
spark-shell and do the same?

sqlContext.sql("show databases").collect

Deenar

On 29 October 2015 at 14:18, Zoltan Fedor  wrote:

> Yes, I am. It was compiled with the following:
>
> export SPARK_HADOOP_VERSION=2.5.0-cdh5.3.3
> export SPARK_YARN=true
> export SPARK_HIVE=true
> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
> -XX:ReservedCodeCacheSize=512m"
> mvn -Pyarn -Phadoop-2.5 -Dhadoop.version=2.5.0-cdh5.3.3 -Phive
> -Phive-thriftserver -DskipTests clean package
>
> On Thu, Oct 29, 2015 at 10:16 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Are you using Spark built with hive ?
>>
>> # Apache Hadoop 2.4.X with Hive 13 support
>> mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
>> -DskipTests clean package
>>
>>
>> On 29 October 2015 at 13:08, Zoltan Fedor 
>> wrote:
>>
>>> Hi Deenar,
>>> As suggested, I have moved the hive-site.xml from HADOOP_CONF_DIR
>>> ($SPARK_HOME/hadoop-conf) to YARN_CONF_DIR ($SPARK_HOME/conf/yarn-conf) and
>>> use the below to start pyspark, but the error is the exact same as before.
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/conf/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client
>>>
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 09:06:36 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 09:06:38 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/10/29 09:07:03 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>> sqlContext2 = HiveContext(sc)
>>> >>> sqlContext2 = HiveContext(sc)
>>> >>> sqlContext2.sql("show databases").first()
>>> 15/10/29 09:07:34 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> 15/10/29 09:07:35 WARN ShellBasedUnixGroupsMapping: got exception trying
>>> to get groups for user biapp: id: biapp: No such user
>>>
>>> 15/10/29 09:07:35 WARN UserGroupInformation: No groups available for
>>> user biapp
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>>> line 552, in sql
>>> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>>>   File
>>> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
>>> line 660, in _ssql_ctx
>>> "build/sbt assembly", e)
>>> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true'
>>> and run build/sbt assembly", Py4JJavaError(u'An error occurred while
>>> calling None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
>>> >>>
>>>
>>>
>>> On Thu, Oct 29, 2015 at 7:20 AM, Deenar Toraskar <
>>> deenar.toras...@gmail.com> wrote:
>>>
 *Hi Zoltan*

 Add hive-site.xml to your YARN_CONF_DIR. i.e.
 $SPARK_HOME/conf/yarn-conf

 Deenar

 *Think Reactive Ltd*
 deenar.toras...@thinkreactive.co.uk
 07714140812

 On 28 October 2015 at 14:28, Zoltan Fedor 
 wrote:

> Hi,
> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it
> in yarn client mode with Hive.
>
> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am
> not able to make SparkSQL to pick up the hive-site.xml when runnig 
> pyspark.
>
> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and
> also in $SPARK_HOME/conf/hive-site.xml
>
> When I start pyspark with the below command and then run some simple
> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Sure, I did it with spark-shell, which seems to be showing the same error -
not using the hive-site.xml


$ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
$SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
$HIVE_CLASSPATH
Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>>
biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
$SPARK_HOME/bin/spark-shell --deploy-mode client
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Spark context available as sc.
15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying to
get groups for user biapp: id: biapp: No such user

15/10/29 10:34:46 WARN UserGroupInformation: No groups available for user
biapp
java.lang.RuntimeException:
org.apache.hadoop.security.AccessControlException: Permission denied:
user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6221)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4088)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4058)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4031)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:788)
at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:297)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:594)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
The funny thing is, that with Spark 1.2.0 on the same machine (Spark 1.2.0
is the default shipped with CDH 5.3.3) the same hive-site.xml is being
picked up and I have no problem whatsoever.

On Thu, Oct 29, 2015 at 10:48 AM, Zoltan Fedor 
wrote:

> Yes, I have the hive-site.xml in $SPARK_HOME/conf, also in yarn-conf, in
> /etc/hive/conf, etc
>
> On Thu, Oct 29, 2015 at 10:46 AM, Kai Wei  wrote:
>
>> Did you try copy it to spark/conf dir?
>> On 30 Oct 2015 1:42 am, "Zoltan Fedor"  wrote:
>>
>>> There is /user/biapp in hdfs. The problem is that the hive-site.xml is
>>> being ignored, so it is looking for it locally.
>>>
>>> On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei  wrote:
>>>
 Create /user/biapp in hdfs manually first.
 On 30 Oct 2015 1:36 am, "Zoltan Fedor" 
 wrote:

> Sure, I did it with spark-shell, which seems to be showing the same
> error - not using the hive-site.xml
>
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
> $HIVE_CLASSPATH
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
> for source because spark.app.id is not set.
> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>>
> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/spark-shell --deploy-mode client
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler
> for source because spark.app.id is not set.
> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Spark context available as sc.
> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception
> trying to get groups for user biapp: id: biapp: No such user
>
> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for
> user biapp
> java.lang.RuntimeException:
> org.apache.hadoop.security.AccessControlException: Permission denied:
> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> at
> 

Re: Collect Column as Array in Grouped DataFrame

2015-10-29 Thread Michael Armbrust
You can use a Hive UDF.

import org.apache.spark.sql.functions._
callUDF("collect_set", $"columnName")

or just SELECT collect_set(columnName) FROM ...

Note that in 1.5 I think this actually does not use tungsten.  In 1.6 it
should though.

I'll add that the experimental Dataset API (preview in 1.6) might have a
better implementation of what you are looking for in the form of mapGroups.


On Thu, Oct 29, 2015 at 2:19 AM, saurfang  wrote:

> Sorry if this functionality already exists or has been asked before, but
> I'm
> looking for an aggregate function in SparkSQL that allows me to collect a
> column into array per group in a grouped dataframe.
>
> For example, if I have the following table
> user, score
> 
> user1, 1
> user2, 2
> user1, 3
> user2, 4
> user1, 5
>
> I want to produce a dataframe that is like
>
> user1, [1, 3]
> user2, [2, 4, 5]
>
> (possibly via select collect(score) from table group by user)
>
>
> I realize I can probably implement this as a UDAF but just want to double
> check if such thing already exists. If not, would there be interests to
> have
> this in SparkSQL?
>
> To give some context, I am trying to do a cogroup on datasets and then
> persist as parquet but want to take advantage of Tungsten. So the plan is
> to, collapse row into key, value of struct => group by key and collect
> value
> as array[struct] => outer join dataframes.
>
> p.s. Here are some resources I have found so far but all of them concerns
> top K per key instead:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-td20370.html
> and
> https://issues.apache.org/jira/browse/SPARK-5954 (where Reynold mentioned
> this could be an API in dataframe)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Collect-Column-as-Array-in-Grouped-DataFrame-tp25223.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Inconsistent Persistence of DataFrames in Spark 1.5

2015-10-29 Thread Michael Armbrust
There were several bugs in Spark 1.5 and we strongly recommend you upgrade
to 1.5.1.  If the issue persists it would be helpful to see the result of
calling explain.

On Wed, Oct 28, 2015 at 4:46 PM,  wrote:

> Hi, just a couple cents.
>
>
>
> are your joining columns StringTypes (id field)? I have recently reported
> a bug where having inconsistent results when filtering String fields in
> group operations.
>
>
>
> Saif
>
>
>
> *From:* Colin Alstad [mailto:colin.als...@pokitdok.com]
> *Sent:* Wednesday, October 28, 2015 12:39 PM
> *To:* user@spark.apache.org
> *Subject:* Inconsistent Persistence of DataFrames in Spark 1.5
>
>
>
> We recently switched to Spark 1.5.0 from 1.4.1 and have noticed some
> inconsistent behavior in persisting DataFrames.
>
>
>
> df1 = sqlContext.read.parquet(“df1.parquet”)
>
> df1.count()
>
> > 161,100,982
>
>
>
> df2 = sqlContext.read.parquet(“df2.parquet”)
>
> df2.count()
>
> > 67,498,706
>
>
>
> join_df = df1.join(df2, ‘id’)
>
> join_df.count()
>
> > 160,608,147
>
>
>
> join_df.write.parquet(“join.parquet”)
>
> join_parquet = sqlContext.read.parquet(“join.parquet”)
>
> join_parquet.count()
>
> > 67,698,892
>
>
>
> join_df.write.json(“join.json”)
>
> join_json = sqlContext.read.parquet(“join.json”)
>
> join_son.count()
>
> > 67,695,663
>
>
>
> The first major issue is that there is an order of magnitude difference
> between the count of the join DataFrame and the persisted join DataFrame.
> Secondly, persisting the same DataFrame into 2 different formats yields
> different results.
>
>
>
> Does anyone have any idea on what could be going on here?
>
>
>
> --
>
> Colin Alstad
>
> Data Scientist
>
> colin.als...@pokitdok.com
>
>
>
> 
>


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
There is /user/biapp in hdfs. The problem is that the hive-site.xml is
being ignored, so it is looking for it locally.

On Thu, Oct 29, 2015 at 10:40 AM, Kai Wei  wrote:

> Create /user/biapp in hdfs manually first.
> On 30 Oct 2015 1:36 am, "Zoltan Fedor"  wrote:
>
>> Sure, I did it with spark-shell, which seems to be showing the same error
>> - not using the hive-site.xml
>>
>>
>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>> $HIVE_CLASSPATH
>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>> Type "help", "copyright", "credits" or "license" for more information.
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>> SparkContext available as sc, HiveContext available as sqlContext.
>> >>>
>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> Spark context available as sc.
>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
>> to get groups for user biapp: id: biapp: No such user
>>
>> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for user
>> biapp
>> java.lang.RuntimeException:
>> org.apache.hadoop.security.AccessControlException: Permission denied:
>> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6221)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4088)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4058)
>> at
>> 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
i don't have spark-defaults.conf and spark-env.sh, so if you have a working
Spark 1.5.1 with Hive metastore access on CDH 5.3 then could you please
send over the settings you are having in your spark-defaults.conf and
spark-env.sh?
Thanks

On Thu, Oct 29, 2015 at 11:14 AM, Deenar Toraskar  wrote:

> Here is what I did, maybe that will help you.
>
> 1) Downloaded spark-1.5.1 (With HAdoop 2.6.0) spark-1.5.1-bin-hadoop2.6
> and extracted it on the edge node, set SPARK_HOME to this location
> 2) Copied the existing configuration (spark-defaults.conf and
> spark-env.sh) from your spark install
> (/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf on our environment) to
> $SPARK_HOME/conf
> 3) updated spark.yarn.jar in spark-defaults.conf
> 4) copied over all the configuration files from
> /opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf to
> $SPARK_HOME/conf/yarn-conf
>
> and it worked. You may be better off with a custom build for CDH 5.3.3
> hadoop, which you already have done.
>
> Deenar
>
> On 29 October 2015 at 14:35, Zoltan Fedor 
> wrote:
>
>> Sure, I did it with spark-shell, which seems to be showing the same error
>> - not using the hive-site.xml
>>
>>
>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>> $HIVE_CLASSPATH
>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>> Type "help", "copyright", "credits" or "license" for more information.
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>> SparkContext available as sc, HiveContext available as sqlContext.
>> >>>
>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>   /_/
>>
>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler for
>> source because spark.app.id is not set.
>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> Spark context available as sc.
>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
>> does not exist
>> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
>> to get groups for user biapp: id: biapp: No such user
>>
>> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for user
>> biapp
>> java.lang.RuntimeException:
>> org.apache.hadoop.security.AccessControlException: Permission denied:
>> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
>> at
>> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
>> at
>> 

submitting custom metrics.properties file

2015-10-29 Thread Radu Brumariu
Hi,
I am trying to submit a custom metrics.properties file to enable the
collection of spark metrics, but I am having a hard time even starting it
in local mode.

spark-submit \
...
--files "./metrics.properties"
--conf "spark.metrics.conf=metrics.properties"
...

However I am getting the following error :

java.io.FileNotFoundException: metrics.properties (No such file or
directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)

I tried dropping the spark.metrics.conf just to see if the file gets moved
ok, but that results in an error as well :

...
15/10/29 11:58:13 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/10/29 11:58:13 INFO SparkUI: Started SparkUI at http://192.168.0.2:4040
...
Exception in thread "main" java.io.FileNotFoundException: Added file
file:/Users/radu/dev/sparkTest/metrics.properties does not exist.

Is there a special handshake that I need to do to get this working ? I've
found some anecdotal reference online where people claim that they've got
this running using only the 2 flags mentioned above.

Thanks,
Radu


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
Here is what I did, maybe that will help you.

1) Downloaded spark-1.5.1 (With HAdoop 2.6.0) spark-1.5.1-bin-hadoop2.6 and
extracted it on the edge node, set SPARK_HOME to this location
2) Copied the existing configuration (spark-defaults.conf and spark-env.sh)
from your spark install (/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf
on our environment) to
$SPARK_HOME/conf
3) updated spark.yarn.jar in spark-defaults.conf
4) copied over all the configuration files from
/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf to
$SPARK_HOME/conf/yarn-conf

and it worked. You may be better off with a custom build for CDH 5.3.3
hadoop, which you already have done.

Deenar

On 29 October 2015 at 14:35, Zoltan Fedor  wrote:

> Sure, I did it with spark-shell, which seems to be showing the same error
> - not using the hive-site.xml
>
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
> $HIVE_CLASSPATH
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>>
> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/spark-shell --deploy-mode client
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Spark context available as sc.
> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for user
> biapp
> java.lang.RuntimeException:
> org.apache.hadoop.security.AccessControlException: Permission denied:
> user=biapp, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)
> at
> 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
Zoltan

you should have these in your existing CDH 5.3, that's the best place to
get them. Find where spark is running from and should should have them

My versions are here

https://gist.github.com/deenar/08fc4ac0da3bdaff10fb

Deenar

On 29 October 2015 at 15:29, Zoltan Fedor  wrote:

> i don't have spark-defaults.conf and spark-env.sh, so if you have a
> working Spark 1.5.1 with Hive metastore access on CDH 5.3 then could you
> please send over the settings you are having in your spark-defaults.conf
> and spark-env.sh?
> Thanks
>
> On Thu, Oct 29, 2015 at 11:14 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Here is what I did, maybe that will help you.
>>
>> 1) Downloaded spark-1.5.1 (With HAdoop 2.6.0) spark-1.5.1-bin-hadoop2.6
>> and extracted it on the edge node, set SPARK_HOME to this location
>> 2) Copied the existing configuration (spark-defaults.conf and
>> spark-env.sh) from your spark install
>> (/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf on our environment) to
>> $SPARK_HOME/conf
>> 3) updated spark.yarn.jar in spark-defaults.conf
>> 4) copied over all the configuration files from
>> /opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf to
>> $SPARK_HOME/conf/yarn-conf
>>
>> and it worked. You may be better off with a custom build for CDH 5.3.3
>> hadoop, which you already have done.
>>
>> Deenar
>>
>> On 29 October 2015 at 14:35, Zoltan Fedor 
>> wrote:
>>
>>> Sure, I did it with spark-shell, which seems to be showing the same
>>> error - not using the hive-site.xml
>>>
>>>
>>> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
>>> $HIVE_CLASSPATH
>>> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
>>> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 15/10/29 10:33:20 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 10:33:22 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 15/10/29 10:33:50 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
>>> SparkContext available as sc, HiveContext available as sqlContext.
>>> >>>
>>> biapps@biapps-qa01:~> HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
>>> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
>>> $SPARK_HOME/bin/spark-shell --deploy-mode client
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Welcome to
>>>     __
>>>  / __/__  ___ _/ /__
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.1
>>>   /_/
>>>
>>> Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_91)
>>> Type in expressions to have them evaluated.
>>> Type :help for more information.
>>> 15/10/29 10:34:15 WARN MetricsSystem: Using default name DAGScheduler
>>> for source because spark.app.id is not set.
>>> 15/10/29 10:34:16 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> Spark context available as sc.
>>> 15/10/29 10:34:46 WARN HiveConf: HiveConf of name hive.metastore.local
>>> does not exist
>>> 15/10/29 10:34:46 WARN ShellBasedUnixGroupsMapping: got exception trying
>>> to get groups for user biapp: id: biapp: No such user
>>>
>>> 15/10/29 10:34:46 WARN UserGroupInformation: No groups available for
>>> user biapp
>>> java.lang.RuntimeException:
>>> org.apache.hadoop.security.AccessControlException: Permission denied:
>>> user=biapp, 

Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Zoltan Fedor
Thanks. I didn't have a spark-defaults.conf, nor a spark-env.sh, so I
copied yours and modified the references, so now I am back to where I
started. Exact same error as before


$ HADOOP_USER_NAME=biapp MASTER=yarn $SPARK_HOME/bin/pyspark --deploy-mode
client
Error: JAVA_HOME is not set and could not be found.
Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/parquet/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/29 11:56:55 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/10/29 11:57:26 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>> sqlContext2 = HiveContext(sc)
>>> sqlContext2.sql("show databases").first()
15/10/29 11:57:43 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
15/10/29 11:57:43 WARN ShellBasedUnixGroupsMapping: got exception trying to
get groups for user biapp: id: biapp: No such user

15/10/29 11:57:43 WARN UserGroupInformation: No groups available for user
biapp
Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/lib/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/context.py",
line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
  File "/usr/lib/spark-1.5.1-bin-hadoop2.6/python/pyspark/sql/context.py",
line 660, in _ssql_ctx
"build/sbt assembly", e)
Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o24))
>>>


On Thu, Oct 29, 2015 at 11:44 AM, Deenar Toraskar  wrote:

>
> Zoltan
>
> you should have these in your existing CDH 5.3, that's the best place to
> get them. Find where spark is running from and should should have them
>
> My versions are here
>
> https://gist.github.com/deenar/08fc4ac0da3bdaff10fb
>
> Deenar
>
> On 29 October 2015 at 15:29, Zoltan Fedor 
> wrote:
>
>> i don't have spark-defaults.conf and spark-env.sh, so if you have a
>> working Spark 1.5.1 with Hive metastore access on CDH 5.3 then could you
>> please send over the settings you are having in your spark-defaults.conf
>> and spark-env.sh?
>> Thanks
>>
>> On Thu, Oct 29, 2015 at 11:14 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Here is what I did, maybe that will help you.
>>>
>>> 1) Downloaded spark-1.5.1 (With HAdoop 2.6.0) spark-1.5.1-bin-hadoop2.6
>>> and extracted it on the edge node, set SPARK_HOME to this location
>>> 2) Copied the existing configuration (spark-defaults.conf and
>>> spark-env.sh) from your spark install
>>> (/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf on our environment) to
>>> $SPARK_HOME/conf
>>> 3) updated spark.yarn.jar in spark-defaults.conf
>>> 4) copied over all the configuration files from
>>> /opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf to
>>> $SPARK_HOME/conf/yarn-conf
>>>
>>> and it worked. You may be better off with a custom build for CDH 5.3.3
>>> hadoop, which you already have done.
>>>
>>> Deenar
>>>
>>> On 29 October 2015 at 14:35, Zoltan Fedor 
>>> wrote:
>>>
 Sure, I did it with spark-shell, which seems to be showing the same
 error - not using the hive-site.xml


 $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
 YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
 $SPARK_HOME/bin/pyspark --deploy-mode client --driver-class-path
 $HIVE_CLASSPATH
 Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
 [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
 Type "help", "copyright", "credits" or "license" for more information.
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding 

SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
I noticed when querying struct data in spark sql, we are requesting the
whole column from parquet files. Is this intended or is there some kind of
config to control this behaviour? Wouldn't it be better to request just the
struct field?


RE: RDD's filter() or using 'where' condition in SparkSQL

2015-10-29 Thread java8964
Won't you be able to use case statement to generate a virtual column (like 
partition_num), then use analytic SQL partition by this virtual column?
In this case, the full dataset will be just scanned once.

Yong

Date: Thu, 29 Oct 2015 10:51:53 -0700
Subject: RDD's filter() or using 'where' condition in SparkSQL
From: anfernee...@gmail.com
To: user@spark.apache.org

Hi,
I have a pretty large data set(2M entities) in my RDD, the data has already 
been partitioned by a specific key, the key has a range(type in long), now I 
want to create a bunch of key buckets, for example, the key has range 
1 -> 100,
I will break the whole range into below buckets   1 ->  1011 -> 20
...90 -> 100
 I want to run some analytic SQL functions over the data that owned by each key 
range, so I come up with 2 approaches,
1) run RDD's filter() on the full data set RDD, the filter will create the RDD 
corresponding to each key bucket, and with each RDD, I can create DataFrame and 
run the sql.

2) create a DataFrame for the whole RDD, and using a buch of SQL's to do my job.
SELECT * from  where key>=key1 AND key 

How to properly read the first number lines of file into a RDD

2015-10-29 Thread Zhiliang Zhu
Hi All,
There is some file with line number N + M,, as I need to read the first N lines 
into one RDD .
1. i) read all the N + M lines as one RDD, ii) select the RDD's top N rows, may 
be some one solution;2. if introduced some broadcast variable set N, then it is 
used to decide while map the file RDD. Only map its first N rows, this may 
notwork, however.
Is there some better solution?
Thank you,Zhiliang


Loading dataframes to vertica database

2015-10-29 Thread spakle
http://www.sparkexpert.com/2015/04/17/save-apache-spark-dataframe-to-database/

Hi i tried to load dataframes(parquet files) using the above link into mysql
it worked. But when i tried to load it into vertica database this is the
error i am facing

Exception in thread “main” java.sql.SQLSyntaxErrorException:
[Vertica][VJDBC](5108) ERROR: Type “TEXT” does not exist
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.io.ProtocolStream.readExpectedMessage(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepareImpl(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.(Unknown Source)
at com.vertica.jdbc.jdbc4.S4PreparedStatement.(Unknown Source)
at com.vertica.jdbc.VerticaJdbc4PreparedStatementImpl.(Unknown Source)
at com.vertica.jdbc.VJDBCObjectFactory.createPreparedStatement(Unknown
Source)
at com.vertica.jdbc.common.SConnection.prepareStatement(Unknown Source)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:275)
at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1611)
at com.sparkread.SparkVertica.JdbctoVertica.main(JdbctoVertica.java:51)
Caused by: com.vertica.support.exceptions.SyntaxErrorException:
[Vertica][VJDBC](5108) ERROR: Type “TEXT” does not exist
… 13 more

This error is because vertica db doesn’t support the datatypes(String) which
is in the dataframes(parquet file). I do not wanted to type cast the columns
since its going to be a performance issue. we are looking to load around 280
million rows. Could you please suggest the best way to load the data into
vertica db.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-dataframes-to-vertica-database-tp25229.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Michael Armbrust
Yeah, this is unfortunate.  It would be good to fix this, but its a
non-trivial change.

Tracked here if you'd like to vote on the issue:
https://issues.apache.org/jira/browse/SPARK-4502

On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood  wrote:

> I noticed when querying struct data in spark sql, we are requesting the
> whole column from parquet files. Is this intended or is there some kind of
> config to control this behaviour? Wouldn't it be better to request just the
> struct field?
>


RDD's filter() or using 'where' condition in SparkSQL

2015-10-29 Thread Anfernee Xu
Hi,

I have a pretty large data set(2M entities) in my RDD, the data has already
been partitioned by a specific key, the key has a range(type in long), now
I want to create a bunch of key buckets, for example, the key has range

1 -> 100,

I will break the whole range into below buckets

1 ->  10
11 -> 20
...
90 -> 100

 I want to run some analytic SQL functions over the data that owned by each
key range, so I come up with 2 approaches,

1) run RDD's filter() on the full data set RDD, the filter will create the
RDD corresponding to each key bucket, and with each RDD, I can create
DataFrame and run the sql.


2) create a DataFrame for the whole RDD, and using a buch of SQL's to do my
job.

SELECT * from  where key>=key1 AND key 

Re: newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-29 Thread Andy Davidson
Hi Robin and Sabarish

I figure out what the problem

To submit my java app so that it runs in cluster mode (ie. I can close my
laptop and go home) I need to do the following


1. make sure my jar file is available on all the slaves. Spark-submit will
cause my driver to run on a slave, It will not automatically copy my jar
file to slaves. I found placing the jar in hdfs the easiest way to handle
this
2. I needed to pass the command argument ‹deploy-mode cluster
3. Use specify the path the jar file as a url hdfs://

Here are two tricks to figure out the correct URL for master
1. If you know the name of your cluster you can find the public DNS name for
your master. By default use port 7077
cd spark-1.5.1-bin-hadoop2.6/ec2

$ spark-ec2 get-master --region=us-west-1 streamingDC
Searching for existing cluster streamingDC in region us-west-1...
Found 1 master, 3 slaves.
ec2-54-251-207-123.us-west-1.compute.amazonaws.com
$ 
2. If you know the public DNS name of the master go to
http://mastername..compute.amazonaws.com/8080 . The Title should be the
correct url (ie. Port 7077)


On master
/root/ephemeral-hdfs/bin/hadoop fs -mkdir /home/ec2-user/sparkExamples
/root/ephemeral-hdfs/bin/hadoop fs -put sparkPi-1.0-SNAPSHOT.jar
/home/ec2-user/sparkExamples
/root/ephemeral-hdfs/bin/hadoop fs -ls /home/ec2-user/sparkExamples
$SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077
--deploy-mode cluster
hdfs:///home/ec2-user/sparkExamples/sparkPi-1.0-SNAPSHOT.jar 100

Running Spark using the REST application submission protocol.
15/10/29 16:39:08 INFO rest.RestSubmissionClient: Submitting a request to
launch an application in
spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077.
15/10/29 16:39:09 WARN rest.RestSubmissionClient: Unable to connect to
server spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077.
Warning: Master endpoint
spark://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:7077 was not a
REST server. Falling back to legacy submission gateway instead.
[ec2-user@ip-172-31-29-60 ~]$
I really appreciate everyone¹s help

Andy




sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

2015-10-29 Thread tstewart
I have the following script in a file named test.R:

library(SparkR)
sc <- sparkR.init(master="yarn-client")
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, faithful)
showDF(df)
sparkR.stop()
q(save="no")

If I submit this with "sparkR test.R" or "R  CMD BATCH test.R" or "Rscript
test.R" it fails with this error:
15/10/29 08:08:49 INFO r.BufferedStreamThread: Fatal error: cannot open file
'/mnt/hdfs9/yarn/nm-local-dir/usercache/hadoop/appcache/application_1446058618330_0171/container_e805_1446058618330_0171_01_05/sparkr/SparkR/worker/daemon.R':
No such file or directory
15/10/29 08:08:59 ERROR executor.Executor: Exception in task 0.0 in stage
1.0 (TID 1)
java.net.SocketTimeoutException: Accept timed out


However, if I launch just an interactive sparkR shell and cut/paste those
commands - it runs fine.
It also runs fine on the same Hadoop cluster with Spark 1.4.1.
And, it runs fine from batch mode if I just use sparkR.init() and not
sparkR.init(master="yarn-client")



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sparkR-1-5-1-batch-yarn-client-mode-failing-on-daemon-R-not-found-tp25230.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Aster Functions equivalent in spark : cfilter, npath and sessionize

2015-10-29 Thread didier vila
Good morning all, 
I am interesting to know if there is some Aster equivalent functions in Spark . 
In particular, I would like to sesionize or create some sessions based.
I would to create some npath or sequence based based on a specific 
pattern.https://aster-community.teradata.com/docs/DOC-1544
I would like to derive some 
affinitiehttps://aster-community.teradata.com/docs/DOC-1552
I am python user of Spark and I would to demonstrate that Spark is  a very good 
candidate instead of Aster ( Aster will run in Hadoop soon). 
do you have any Spark python code  to demonstrate my colleague that Spark is 
the best options ? 
thanks for this. 
sparkthewolf



  

Re: RDD's filter() or using 'where' condition in SparkSQL

2015-10-29 Thread Anfernee Xu
Thanks Yong for your response.

Let me see if I can understand what you're suggesting, so the whole data
set, when I load them into Spark(I'm using custom Hadoop InputFormat), I
will add an extra field to each element in RDD, like bucket_id.

For example

Key:

1 - 10, bucket_id=1
11-20, bucket_id=2
...
90-100, butcket_id =10

then I can re-partition the RDD with a partitioner that will put all
records with the same bucket_id in the same partition, after I get
DataFrame from the RDD, the partition is still preserved(is it correct?)

then reset of work is only issue SQL query like

SELECT * from XXX where bucket_id=1
SELECT * from XXX where bucket_id=2

..

Am I right?

Thanks

Anfernee

On Thu, Oct 29, 2015 at 11:07 AM, java8964  wrote:

> Won't you be able to use case statement to generate a virtual column (like
> partition_num), then use analytic SQL partition by this virtual column?
>
> In this case, the full dataset will be just scanned once.
>
> Yong
>
> --
> Date: Thu, 29 Oct 2015 10:51:53 -0700
> Subject: RDD's filter() or using 'where' condition in SparkSQL
> From: anfernee...@gmail.com
> To: user@spark.apache.org
>
>
> Hi,
>
> I have a pretty large data set(2M entities) in my RDD, the data has
> already been partitioned by a specific key, the key has a range(type in
> long), now I want to create a bunch of key buckets, for example, the key
> has range
>
> 1 -> 100,
>
> I will break the whole range into below buckets
>
> 1 ->  10
> 11 -> 20
> ...
> 90 -> 100
>
>  I want to run some analytic SQL functions over the data that owned by
> each key range, so I come up with 2 approaches,
>
> 1) run RDD's filter() on the full data set RDD, the filter will create the
> RDD corresponding to each key bucket, and with each RDD, I can create
> DataFrame and run the sql.
>
>
> 2) create a DataFrame for the whole RDD, and using a buch of SQL's to do
> my job.
>
> SELECT * from  where key>=key1 AND key 
> So my question is which one is better from performance perspective?
>
> Thanks
>
> --
> --Anfernee
>



-- 
--Anfernee


Re: Aster Functions equivalent in spark : cfilter, npath and sessionize

2015-10-29 Thread Peyman Mohajerian
Some of the Aster functions you are referring to can be done using Window
functions in SparkSQL:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

On Thu, Oct 29, 2015 at 12:16 PM, didier vila 
wrote:

> Good morning all,
>
> I am interesting to know if there is some Aster equivalent functions in
> Spark .
>
>
>- In particular, I would like to sesionize or create some sessions
>based.
>
>
>
>- I would to create some npath or sequence based based on a specific
>pattern.
>
> https://aster-community.teradata.com/docs/DOC-1544
>
>
>- I would like to derive some affinitie
>
> https://aster-community.teradata.com/docs/DOC-1552
>
> I am python user of Spark and I would to demonstrate that Spark is  a very
> good candidate instead of Aster ( Aster will run in Hadoop soon).
>
> do you have any Spark python code  to demonstrate my colleague that Spark
> is the best options ?
>
> thanks for this.
>
> sparkthewolf
>
>
>
>
>


Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread varun sharma
Cody, adding partitions to kafka is there as a last resort, I was wondering
if I can decrease the processing time by not touching my Kafka cluster.
Adrian, repartition looks like a good option and let me check if I can gain
performance.
Dibyendu, will surely try out this consumer.

Thanks all, will share my findings..

On Thu, Oct 29, 2015 at 7:16 PM, Cody Koeninger  wrote:

> Consuming from kafka is inherently limited to using a number of consumer
> nodes less than or equal to the number of kafka partitions.  If you think
> about it, you're going to be paying some network cost to repartition that
> data from a consumer to different processing nodes, regardless of what
> Spark consumer library you use.
>
> If you really need finer grained parallelism, and want to do it in a more
> efficient manner, you need to move that partitioning to the producer (i.e.
> add more partitions to kafka).
>
> On Thu, Oct 29, 2015 at 6:11 AM, Adrian Tanase  wrote:
>
>> You can call .repartition on the Dstream created by the Kafka direct
>> consumer. You take the one-time hit of a shuffle but gain the ability to
>> scale out processing beyond your number of partitions.
>>
>> We’re doing this to scale up from 36 partitions / topic to 140 partitions
>> (20 cores * 7 nodes) and it works great.
>>
>> -adrian
>>
>> From: varun sharma
>> Date: Thursday, October 29, 2015 at 8:27 AM
>> To: user
>> Subject: Need more tasks in KafkaDirectStream
>>
>> Right now, there is one to one correspondence between kafka partitions
>> and spark partitions.
>> I dont have a requirement of one to one semantics.
>> I need more tasks to be generated in the job so that it can be
>> parallelised and batch can be completed fast. In the previous Receiver
>> based approach number of tasks created were independent of kafka
>> partitions, I need something like that only.
>> Any config available if I dont need one to one semantics?
>> Is there any way I can repartition without incurring any additional cost.
>>
>> Thanks
>> *VARUN SHARMA*
>>
>>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required

2015-10-29 Thread Jerry Wong
I used the spark 1.3.1 to populate the event logs to Cassandra. But there
is an exception that I could not find out any clauses. Can anybody give me
any helps?

Exception in thread "main" java.lang.IllegalArgumentException: Positive
number of slices required
 at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
 at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4$$anonfun$apply$3.apply$mcV$sp(EventLogClusterIngestor.scala:155)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:145)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:144)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(EventLogClusterIngestor.scala:144)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:139)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:132)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(EventLogClusterIngestor.scala:132)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:125)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:115)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:115)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:107)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.processEventLogMapStreamDiff2(EventLogClusterIngestor.scala:107)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.main(EventLogClusterIngestor.scala:573)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor.main(EventLogClusterIngestor.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This does happen in a RDD foreach. I pasted the broken lines as follows:

132 difFileRDD.filter(a=>a.size>0).collect().foreach(file => {
. //...
135val lines = sc.textFile("file:///" + file)
136val elogs = lines.flatMap(_.split("\n"))
137val numOfel = elogs.count()
138 //...
139breakable {
140if(numOfel <= 0) {
141 //..
142  break
143}else{
144 

RE: RDD's filter() or using 'where' condition in SparkSQL

2015-10-29 Thread java8964
You can do the SQL like following:
select *, case when key >= 1 and key <=10 then 1 when key >= 11 and key <= 20 
then 2 .. else 10 end as bucket_idfrom your table
See the conditional functions "case" in the HIVE.
After you have "bucket_id" column, then you can do whatever analytic function 
you want.
Yong

Date: Thu, 29 Oct 2015 12:53:35 -0700
Subject: Re: RDD's filter() or using 'where' condition in SparkSQL
From: anfernee...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

Thanks Yong for your response.
Let me see if I can understand what you're suggesting, so the whole data set, 
when I load them into Spark(I'm using custom Hadoop InputFormat), I will add an 
extra field to each element in RDD, like bucket_id.
For example
Key:
1 - 10, bucket_id=111-20, bucket_id=2...90-100, butcket_id =10
then I can re-partition the RDD with a partitioner that will put all records 
with the same bucket_id in the same partition, after I get DataFrame from the 
RDD, the partition is still preserved(is it correct?)
then reset of work is only issue SQL query like
SELECT * from XXX where bucket_id=1SELECT * from XXX where bucket_id=2

..
Am I right?
Thanks
Anfernee
On Thu, Oct 29, 2015 at 11:07 AM, java8964  wrote:



Won't you be able to use case statement to generate a virtual column (like 
partition_num), then use analytic SQL partition by this virtual column?
In this case, the full dataset will be just scanned once.

Yong

Date: Thu, 29 Oct 2015 10:51:53 -0700
Subject: RDD's filter() or using 'where' condition in SparkSQL
From: anfernee...@gmail.com
To: user@spark.apache.org

Hi,
I have a pretty large data set(2M entities) in my RDD, the data has already 
been partitioned by a specific key, the key has a range(type in long), now I 
want to create a bunch of key buckets, for example, the key has range 
1 -> 100,
I will break the whole range into below buckets   1 ->  1011 -> 20
...90 -> 100
 I want to run some analytic SQL functions over the data that owned by each key 
range, so I come up with 2 approaches,
1) run RDD's filter() on the full data set RDD, the filter will create the RDD 
corresponding to each key bucket, and with each RDD, I can create DataFrame and 
run the sql.

2) create a DataFrame for the whole RDD, and using a buch of SQL's to do my job.
SELECT * from  where key>=key1 AND key 

Running FPGrowth over a JavaPairRDD?

2015-10-29 Thread Fernando Paladini
Hello guys!

First of all, if you want to take a look in a more readable question, take
a look in my StackOverflow question

(I've made the same question there).

I want to test Spark machine learning algorithms and I have some questions
on how to run these algorithms with non-native data types. I'm going to run
FPGrowth algorithm over the input because I want to get the most frequent
itemsets for this input.

*My data is disposed as the following:*

[timestamp, sensor1value, sensor2value] # id: 0[timestamp,
sensor1value, sensor2value] # id: 1[timestamp, sensor1value,
sensor2value] # id: 2[timestamp, sensor1value, sensor2value] # id:
3...

As I need to use Java (because Python doesn't have a lot of machine
learning algorithms from Spark), this data structure isn't very easy to
handle / create.

*To achieve this data structure in Java I can visualize two approaches:*

   1. Use existing Java classes and data types to structure the input (I
   think some problems can occur in Spark depending on how complex is my data).
   2. Create my own class (don't know if it works with Spark algorithms)

1. Existing Java classes and data types

In order to do that I've created a* List>*, so I
can keep my data structured and also can create a RDD:

List> algorithm_data = new
ArrayList>();
populate(algorithm_data);JavaPairRDD transactions
= sc.parallelizePairs(algorithm_data);

I don't feel okay with JavaPairRDD because FPGrowth algorithm seems to
be not available for this data structure, as I will show you later in
this post.

2. Create my own class

I could also create a new class to store the input properly:

public class PointValue {

private long timestamp;
private double sensorMeasure1;
private double sensorMeasure2;

// Constructor, getters and setters omitted...
}

However, I don't know if I can do that and still use it with Spark
algorithms without any problems (in other words, running Spark algorithms
without headaches). I'll focus in the first approach, but if you see that
the second one is easier to achieve, please tell me.
The solution (based on approach #1):

// Initializing SparkSparkConf conf = new
SparkConf().setAppName("FP-growth Example");JavaSparkContext sc = new
JavaSparkContext(conf);
// Getting data for ML algorithmList>
algorithm_data = new ArrayList>();
populate(algorithm_data);JavaPairRDD transactions
= sc.parallelizePairs(algorithm_data);
// Running FPGrowthFPGrowth fpg = new
FPGrowth().setMinSupport(0.2).setNumPartitions(10);FPGrowthModel> model = fpg.run(transactions);
// Printing everythingfor (FPGrowth.FreqItemset> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());}

But then I got:

*The method run(JavaRDD) in the type FPGrowth is not
applicable for the arguments (JavaPairRDD)*

*What can I do in order to solve my problem (run FPGrowth over
JavaPairRDD)?*

I'm available to give you more information, just tell me exactly what you
need.
Thank you!
Fernando Paladini


Maintaining overall cumulative data in Spark Streaming

2015-10-29 Thread Sandeep Giri
Dear All,

If a continuous stream of text is coming in and you have to keep publishing
the overall word count so far since 0:00 today, what would you do?

Publishing the results for a window is easy but if we have to keep
aggregating the results, how to go about it?

I have tried to keep an StreamRDD with aggregated count and keep doing a
fullouterjoin but didn't work. Seems like the StreamRDD gets reset.

Kindly help.

Regards,
Sandeep Giri


Issue on spark.driver.maxResultSize

2015-10-29 Thread karthik kadiyam
Hi,

In spark streaming job i had the following setting 

this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
and i got the error in the job as below

User class threw exception: Job aborted due to stage failure: Total size of 
serialized results of 120 tasks (1082.2 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) 

Basically i realized that as default value is 1 GB. I changed the configuration 
as below.

this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);

and when i ran the job it gave the error 

User class threw exception: Job aborted due to stage failure: Total size of 
serialized results of 120 tasks (1082.2 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) 

So, basically the change i made is not been considered in the job. so my 
question is

- "spark.driver.maxResultSize", “2g” is this the right way to change or any 
other way to do it.
- Is this a bug in spark 1.3 or something or any one had this issue before? 

RE: Maintaining overall cumulative data in Spark Streaming

2015-10-29 Thread Silvio Fiorito
You could use updateStateByKey. There's a stateful word count example on Github.

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

From: Sandeep Giri
Sent: ‎10/‎29/‎2015 6:08 PM
To: user; dev
Subject: Maintaining overall cumulative data in Spark Streaming

Dear All,

If a continuous stream of text is coming in and you have to keep publishing the 
overall word count so far since 0:00 today, what would you do?

Publishing the results for a window is easy but if we have to keep aggregating 
the results, how to go about it?

I have tried to keep an StreamRDD with aggregated count and keep doing a 
fullouterjoin but didn't work. Seems like the StreamRDD gets reset.

Kindly help.

Regards,
Sandeep Giri



Re: SPARK SQL- Parquet projection pushdown for nested data

2015-10-29 Thread Sadhan Sood
Thanks Michael, I will upvote this.

On Thu, Oct 29, 2015 at 10:29 AM, Michael Armbrust 
wrote:

> Yeah, this is unfortunate.  It would be good to fix this, but its a
> non-trivial change.
>
> Tracked here if you'd like to vote on the issue:
> https://issues.apache.org/jira/browse/SPARK-4502
>
> On Thu, Oct 29, 2015 at 6:00 PM, Sadhan Sood 
> wrote:
>
>> I noticed when querying struct data in spark sql, we are requesting the
>> whole column from parquet files. Is this intended or is there some kind of
>> config to control this behaviour? Wouldn't it be better to request just the
>> struct field?
>>
>
>


Save data to different S3

2015-10-29 Thread William Li
Hi - I have a simple app running fine with Spark, it reads data from S3 and 
performs calculation.

When reading data from S3, I use hadoopConfiguration.set for both 
fs.s3n.awsAccessKeyId, and the fs.s3n.awsSecretAccessKey to it has permissions 
to load the data from customer sources.

However, after I complete the analysis, how do I save the results (it's a 
org.apache.spark.rdd.RDD[String]) into my own s3 bucket which requires 
different access key and secret? It seems one option is that I could save the 
results as local file to the spark cluster, then create a new SQLContext with 
the different access, then load the data from the local file.

Is there any other options without requiring save and re-load files?


Thanks,

William.


Re: Save data to different S3

2015-10-29 Thread Zhang, Jingyu
Try s3://aws_key:aws_secret@bucketName/folderName with your access key and
secret to save the data.

On 30 October 2015 at 10:55, William Li  wrote:

> Hi – I have a simple app running fine with Spark, it reads data from S3
> and performs calculation.
>
> When reading data from S3, I use hadoopConfiguration.set for both
> fs.s3n.awsAccessKeyId, and the fs.s3n.awsSecretAccessKey to it has
> permissions to load the data from customer sources.
>
> However, after I complete the analysis, how do I save the results (it’s a
> org.apache.spark.rdd.RDD[String]) into my own s3 bucket which requires
> different access key and secret? It seems one option is that I could save
> the results as local file to the spark cluster, then create a new
> SQLContext with the different access, then load the data from the local
> file.
>
> Is there any other options without requiring save and re-load files?
>
>
> Thanks,
>
> William.
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Need more tasks in KafkaDirectStream

2015-10-29 Thread varun sharma
Right now, there is one to one correspondence between kafka partitions and
spark partitions.
I dont have a requirement of one to one semantics.
I need more tasks to be generated in the job so that it can be parallelised
and batch can be completed fast. In the previous Receiver based approach
number of tasks created were independent of kafka partitions, I need
something like that only.
Any config available if I dont need one to one semantics?
Is there any way I can repartition without incurring any additional cost.

Thanks
*VARUN SHARMA*


Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Dibyendu Bhattacharya
If you do not need one to one semantics and does not want strict ordering
guarantee , you can very well use the Receiver based approach, and this
consumer from Spark-Packages (
https://github.com/dibbhatt/kafka-spark-consumer) can give much better
alternatives in terms of performance and reliability  for Receiver based
approach.

Regards,
Dibyendu

On Thu, Oct 29, 2015 at 11:57 AM, varun sharma 
wrote:

> Right now, there is one to one correspondence between kafka partitions and
> spark partitions.
> I dont have a requirement of one to one semantics.
> I need more tasks to be generated in the job so that it can be
> parallelised and batch can be completed fast. In the previous Receiver
> based approach number of tasks created were independent of kafka
> partitions, I need something like that only.
> Any config available if I dont need one to one semantics?
> Is there any way I can repartition without incurring any additional cost.
>
> Thanks
> *VARUN SHARMA*
>
>


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Romi Kuntsman
Did you try to cache a DataFrame with just a single row?
Do you rows have any columns with null values?
Can you post a code snippet here on how you load/generate the dataframe?
Does dataframe.rdd.cache work?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu 
wrote:

> It is not a problem to use JavaRDD.cache() for 200M data (all Objects read
> form Json Format). But when I try to use DataFrame.cache(), It shown
> exception in below.
>
> My machine can cache 1 G data in Avro format without any problem.
>
> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>
> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
> 27.832369 ms
>
> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 1)
>
> java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(
> IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:500)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:498)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:127)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:120)
>
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278
> )
>
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
>
> Thanks,
>
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-29 Thread srungarapu vamsi
Other than @Adrian suggestions, check if the processing delay is more than
the batch processing time.

On Thu, Oct 29, 2015 at 2:23 AM, Adrian Tanase  wrote:

> Does it work as expected with smaller batch or smaller load? Could it be
> that it's accumulating too many events over 3 minutes?
>
> You could also try increasing the parallelism via repartition to ensure
> smaller tasks that can safely fit in working memory.
>
> Sent from my iPhone
>
> > On 28 Oct 2015, at 17:45, Afshartous, Nick 
> wrote:
> >
> >
> > Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)
> job and seeing a problem.  This is running in AWS/Yarn and the streaming
> batch interval is set to 3 minutes and this is a ten node cluster.
> >
> > Testing at 30,000 events per second we are seeing the streaming job get
> stuck (stack trace below) for over an hour.
> >
> > Thanks on any insights or suggestions.
> > --
> >  Nick
> >
> >
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> >
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > java.lang.reflect.Method.invoke(Method.java:606)
> >
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> >
> > Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
/Vamsi


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Zhang, Jingyu
Thanks Romi,

I resize the dataset to 7MB, however, the code show NullPointerException
 exception as well.

Did you try to cache a DataFrame with just a single row?

Yes, I tried. But, Same problem.
.
Do you rows have any columns with null values?

No, I had filter out null values before cache the dataframe.

Can you post a code snippet here on how you load/generate the dataframe?

Sure, Here is the working code 1:

JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache();

System.out.println(pixels.count()); // 3000-4000 rows

Working code 2:

JavaRDD pixels = pixelsStr.map(new PixelGenerator());

DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class
);

DataFrame totalDF1 =
schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
is not null").limit(500);

System.out.println(totalDF1.count());


BUT, after change limit(500) to limit(1000). The code report
NullPointerException.


JavaRDD pixels = pixelsStr.map(new PixelGenerator());

DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.class
);

DataFrame totalDF =
schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
is not null").limit(*1000*);

System.out.println(totalDF.count()); // problem at this line

15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool

15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0

15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at
X.java:113) failed in 3.764 s

15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113,
took 3.862207 s

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
0.0 (TID 0, localhost): java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
Does dataframe.rdd.cache work?

No, I tried but same exception.

Thanks,

Jingyu

On 29 October 2015 at 17:38, Romi Kuntsman  wrote:

> Did you try to cache a DataFrame with just a single row?
> Do you rows have any columns with null values?
> Can you post a code snippet here on how you load/generate the dataframe?
> Does dataframe.rdd.cache work?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu 
> wrote:
>
>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects
>> read form Json Format). But when I try to use DataFrame.cache(), It shown
>> exception in below.
>>
>> My machine can cache 1 G data in Avro format without any problem.
>>
>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>>
>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
>> 27.832369 ms
>>
>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
>> 1)
>>
>> java.lang.NullPointerException
>>
>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>> SQLContext.scala:500)
>>
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>> SQLContext.scala:500)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:244)
>>
>> at scala.collection.IndexedSeqOptimized$class.foreach(
>> IndexedSeqOptimized.scala:33)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>> SQLContext.scala:500)
>>
>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>> SQLContext.scala:498)
>>
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
>>
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
>> InMemoryColumnarTableScan.scala:127)
>>
>> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
>> InMemoryColumnarTableScan.scala:120)
>>
>> at org.apache.spark.storage.MemoryStore.unrollSafely(
>> MemoryStore.scala:278)
>>
>> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171
>> )
>>
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(
>> MapPartitionsRDD.scala:38)
>>
>> at 

Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Romi Kuntsman
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
I had a similar situation, and the problem was with a certain record.
Try to find which records are returned when you limit to 1000 but not
returned when you limit to 500.

Could it be a NPE thrown from PixelObject?
Are you running spark with master=local, so it's running inside your IDE
and you can see the errors from the driver and worker?


*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 10:04 AM, Zhang, Jingyu 
wrote:

> Thanks Romi,
>
> I resize the dataset to 7MB, however, the code show NullPointerException
>  exception as well.
>
> Did you try to cache a DataFrame with just a single row?
>
> Yes, I tried. But, Same problem.
> .
> Do you rows have any columns with null values?
>
> No, I had filter out null values before cache the dataframe.
>
> Can you post a code snippet here on how you load/generate the dataframe?
>
> Sure, Here is the working code 1:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache();
>
> System.out.println(pixels.count()); // 3000-4000 rows
>
> Working code 2:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF1 = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(500);
>
> System.out.println(totalDF1.count());
>
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(*1000*);
>
> System.out.println(totalDF.count()); // problem at this line
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0
>
> 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at
> X.java:113) failed in 3.764 s
>
> 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113,
> took 3.862207 s
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> Does dataframe.rdd.cache work?
>
> No, I tried but same exception.
>
> Thanks,
>
> Jingyu
>
> On 29 October 2015 at 17:38, Romi Kuntsman  wrote:
>
>> Did you try to cache a DataFrame with just a single row?
>> Do you rows have any columns with null values?
>> Can you post a code snippet here on how you load/generate the dataframe?
>> Does dataframe.rdd.cache work?
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu 
>> wrote:
>>
>>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects
>>> read form Json Format). But when I try to use DataFrame.cache(), It shown
>>> exception in below.
>>>
>>> My machine can cache 1 G data in Avro format without any problem.
>>>
>>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>>>
>>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
>>> 27.832369 ms
>>>
>>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0
>>> (TID 1)
>>>
>>> java.lang.NullPointerException
>>>
>>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> TraversableLike.scala:244)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> TraversableLike.scala:244)
>>>
>>> at scala.collection.IndexedSeqOptimized$class.foreach(
>>> IndexedSeqOptimized.scala:33)
>>>
>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>
>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>
>>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>>> SQLContext.scala:500)
>>>
>>> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
>>> SQLContext.scala:498)
>>>
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>
>>> at 

Packaging a jar for a jdbc connection using sbt assembly and scala.

2015-10-29 Thread dean.wood
I'm having a problem building a spark jar with scala. It's a really simple
thing, I want to programatically access a mysql server via JDBC and load it
in to a spark data frame. I can get this to work in the spark shell but I
cannot package a jar that works with spark submit. It will package but when
running, fails with

Exception in thread "main" java.sql.SQLException: No suitable driver found
for jdbc:mysql://localhost:3310/100million
My spark-submit command is

./bin/spark-submit ~/path/to/scala/project/target/scala-2.10/complete.jar
--driver-class-path ~/path/to/mysql-connector-java-5.1.37-bin.jar

My build.sbt looks like

name := "sql_querier"

version := "1.0"

scalaVersion := "2.10.4"

sbtVersion := "0.13.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
"provided"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" %
"provided"

libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.37"

assemblyJarName in assembly := "complete.jar"

mainClass in assembly := Some("sql_querier")

offline := true
and my very simple code is

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

object sql_querier{

def main(args: Array[String]) {

val sc = new org.apache.spark.SparkContext()
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val url="jdbc:mysql://databaseurl:portno/database"

val prop = new java.util.Properties
prop.setProperty("user","myuser")
prop.setProperty("password","mydatabase")
val cats=sqlContext.read.jdbc(url, "categories", prop)
cats.show
 }
 }
Where I've hidden the real values for user password and database url. I've
also got a file in projects that adds the sbt assembly plugin and there is
nothing wrong with this. I've successfully used sbt assembly before with
this configuration. When starting a spark shell with the --driver-class-path
option pointing to the mysql jar, I can run the commands and extract data
from the mysql database.

I've tried version 5.1.34 and 5.0.8 and neither have worked. I've also tried
changing --driver-class-path for --jar in the spark submit command and
adding the lines

   
sc.addJar("/Users/dean.wood/data_science/scala/sqlconn/mysql-connector-java-5.0.8-bin.jar")
Class.forName("com.mysql.jdbc.Driver")

to the scala code.

Any clue what I am doing wrong with the build would be greatly appreciated.

Dean



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-jar-for-a-jdbc-connection-using-sbt-assembly-and-scala-tp25225.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark standalone: zookeeper timeout configuration

2015-10-29 Thread zedar
Hi, in my standalone installation I use zookeeper for high availability (2
master nodes). 
Could you tell me if it is possible to configure zookeeper timeout (for
checking if active master node is alive) and retry interval?

Thanks,
Robert



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-zookeeper-timeout-configuration-tp25224.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need more tasks in KafkaDirectStream

2015-10-29 Thread Adrian Tanase
You can call .repartition on the Dstream created by the Kafka direct consumer. 
You take the one-time hit of a shuffle but gain the ability to scale out 
processing beyond your number of partitions.

We’re doing this to scale up from 36 partitions / topic to 140 partitions (20 
cores * 7 nodes) and it works great.

-adrian

From: varun sharma
Date: Thursday, October 29, 2015 at 8:27 AM
To: user
Subject: Need more tasks in KafkaDirectStream

Right now, there is one to one correspondence between kafka partitions and 
spark partitions.
I dont have a requirement of one to one semantics.
I need more tasks to be generated in the job so that it can be parallelised and 
batch can be completed fast. In the previous Receiver based approach number of 
tasks created were independent of kafka partitions, I need something like that 
only.
Any config available if I dont need one to one semantics?
Is there any way I can repartition without incurring any additional cost.

Thanks
VARUN SHARMA



Re: Packaging a jar for a jdbc connection using sbt assembly and scala.

2015-10-29 Thread Deenar Toraskar
Hi Dean

I guess you are using Spark 1.3.


   - The JDBC driver class must be visible to the primordial class loader
   on the client session and on all executors. This is because Java’s
   DriverManager class does a security check that results in it ignoring all
   drivers not visible to the primordial class loader when one goes to open a
   connection. One convenient way to do this is to modify compute_classpath.sh
   on all worker nodes to include your driver JARs.

Take a look at this https://issues.apache.org/jira/browse/SPARK-6913 and
see
http://stackoverflow.com/questions/30221677/spark-sql-postgresql-jdbc-classpath-issues
.


Regards
Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812



On 29 October 2015 at 10:34, dean.wood  wrote:

> I'm having a problem building a spark jar with scala. It's a really simple
> thing, I want to programatically access a mysql server via JDBC and load it
>

On 29 October 2015 at 10:34, dean.wood  wrote:

> I'm having a problem building a spark jar with scala. It's a really simple
> thing, I want to programatically access a mysql server via JDBC and load it
> in to a spark data frame. I can get this to work in the spark shell but I
> cannot package a jar that works with spark submit. It will package but when
> running, fails with
>
> Exception in thread "main" java.sql.SQLException: No suitable driver found
> for jdbc:mysql://localhost:3310/100million
> My spark-submit command is
>
> ./bin/spark-submit ~/path/to/scala/project/target/scala-2.10/complete.jar
> --driver-class-path ~/path/to/mysql-connector-java-5.1.37-bin.jar
>
> My build.sbt looks like
>
> name := "sql_querier"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> sbtVersion := "0.13.7"
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
>
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" %
> "provided"
>
> libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.37"
>
> assemblyJarName in assembly := "complete.jar"
>
> mainClass in assembly := Some("sql_querier")
>
> offline := true
> and my very simple code is
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SQLContext
>
> object sql_querier{
>
> def main(args: Array[String]) {
>
> val sc = new org.apache.spark.SparkContext()
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val url="jdbc:mysql://databaseurl:portno/database"
>
> val prop = new java.util.Properties
> prop.setProperty("user","myuser")
> prop.setProperty("password","mydatabase")
> val cats=sqlContext.read.jdbc(url, "categories", prop)
> cats.show
>  }
>  }
> Where I've hidden the real values for user password and database url. I've
> also got a file in projects that adds the sbt assembly plugin and there is
> nothing wrong with this. I've successfully used sbt assembly before with
> this configuration. When starting a spark shell with the
> --driver-class-path
> option pointing to the mysql jar, I can run the commands and extract data
> from the mysql database.
>
> I've tried version 5.1.34 and 5.0.8 and neither have worked. I've also
> tried
> changing --driver-class-path for --jar in the spark submit command and
> adding the lines
>
>
>
> sc.addJar("/Users/dean.wood/data_science/scala/sqlconn/mysql-connector-java-5.0.8-bin.jar")
> Class.forName("com.mysql.jdbc.Driver")
>
> to the scala code.
>
> Any clue what I am doing wrong with the build would be greatly appreciated.
>
> Dean
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-jar-for-a-jdbc-connection-using-sbt-assembly-and-scala-tp25225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: No way to supply hive-site.xml in yarn client mode?

2015-10-29 Thread Deenar Toraskar
*Hi Zoltan*

Add hive-site.xml to your YARN_CONF_DIR. i.e. $SPARK_HOME/conf/yarn-conf

Deenar

*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812

On 28 October 2015 at 14:28, Zoltan Fedor  wrote:

> Hi,
> We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it in
> yarn client mode with Hive.
>
> I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
> able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.
>
> hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and also
> in $SPARK_HOME/conf/hive-site.xml
>
> When I start pyspark with the below command and then run some simple
> SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml
>
> $ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
> YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
> $SPARK_HOME/bin/pyspark --deploy-mode client
>
> Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
> [GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
> Type "help", "copyright", "credits" or "license" for more information.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/28 10:22:33 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> 15/10/28 10:22:35 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/28 10:22:59 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 1.5.1
>   /_/
>
> Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> sqlContext2 = HiveContext(sc)
> >>> sqlContext2.sql("show databases").first()
> 15/10/28 10:23:12 WARN HiveConf: HiveConf of name hive.metastore.local
> does not exist
> 15/10/28 10:23:13 WARN ShellBasedUnixGroupsMapping: got exception trying
> to get groups for user biapp: id: biapp: No such user
>
> 15/10/28 10:23:13 WARN UserGroupInformation: No groups available for user
> biapp
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 552, in sql
> return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
>   File
> "/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
> line 660, in _ssql_ctx
> "build/sbt assembly", e)
> Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
> run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
> None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
> >>>
>
>
> See in the above the warning about "WARN HiveConf: HiveConf of name
> hive.metastore.local does not exist" while actually there is a
> hive.metastore.local attribute in the hive-site.xml
>
> Any idea how to submit hive-site.xml in yarn client mode?
>
> Thanks
>


Re: Running FPGrowth over a JavaPairRDD?

2015-10-29 Thread Sabarish Sasidharan
Hi

You cannot use PairRDD but you can use JavaRDD. So in your case, to
make it work with least change, you would call run(transactions.values()).

Each MLLib implementation has its own data structure typically and you
would have to convert from your data structure before you invoke. For ex if
you were doing regression on transactions you would instead convert that to
an RDD of LabeledPoint using a transactions.map(). If you wanted clustering
you would convert that to an RDD of Vector.

And taking a step back, without knowing what you want to accomplish, What
your fp growth snippet will tell you is as to which sensor values occur
together most frequently. That may or may not be what you are looking for.

Regards
Sab
On 30-Oct-2015 3:00 am, "Fernando Paladini"  wrote:

> Hello guys!
>
> First of all, if you want to take a look in a more readable question, take
> a look in my StackOverflow question
> 
> (I've made the same question there).
>
> I want to test Spark machine learning algorithms and I have some questions
> on how to run these algorithms with non-native data types. I'm going to run
> FPGrowth algorithm over the input because I want to get the most frequent
> itemsets for this input.
>
> *My data is disposed as the following:*
>
> [timestamp, sensor1value, sensor2value] # id: 0[timestamp, sensor1value, 
> sensor2value] # id: 1[timestamp, sensor1value, sensor2value] # id: 
> 2[timestamp, sensor1value, sensor2value] # id: 3...
>
> As I need to use Java (because Python doesn't have a lot of machine
> learning algorithms from Spark), this data structure isn't very easy to
> handle / create.
>
> *To achieve this data structure in Java I can visualize two approaches:*
>
>1. Use existing Java classes and data types to structure the input (I
>think some problems can occur in Spark depending on how complex is my 
> data).
>2. Create my own class (don't know if it works with Spark algorithms)
>
> 1. Existing Java classes and data types
>
> In order to do that I've created a* List>*, so
> I can keep my data structured and also can create a RDD:
>
> List> algorithm_data = new ArrayList List>>();
> populate(algorithm_data);JavaPairRDD transactions = 
> sc.parallelizePairs(algorithm_data);
>
> I don't feel okay with JavaPairRDD because FPGrowth algorithm seems to be not 
> available for this data structure, as I will show you later in this post.
>
> 2. Create my own class
>
> I could also create a new class to store the input properly:
>
> public class PointValue {
>
> private long timestamp;
> private double sensorMeasure1;
> private double sensorMeasure2;
>
> // Constructor, getters and setters omitted...
> }
>
> However, I don't know if I can do that and still use it with Spark
> algorithms without any problems (in other words, running Spark algorithms
> without headaches). I'll focus in the first approach, but if you see that
> the second one is easier to achieve, please tell me.
> The solution (based on approach #1):
>
> // Initializing SparkSparkConf conf = new SparkConf().setAppName("FP-growth 
> Example");JavaSparkContext sc = new JavaSparkContext(conf);
> // Getting data for ML algorithmList> 
> algorithm_data = new ArrayList>();
> populate(algorithm_data);JavaPairRDD transactions = 
> sc.parallelizePairs(algorithm_data);
> // Running FPGrowthFPGrowth fpg = new 
> FPGrowth().setMinSupport(0.2).setNumPartitions(10);FPGrowthModel List>> model = fpg.run(transactions);
> // Printing everythingfor (FPGrowth.FreqItemset> 
> itemset: model.freqItemsets().toJavaRDD().collect()) {
> System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());}
>
> But then I got:
>
> *The method run(JavaRDD) in the type FPGrowth is not applicable for 
> the arguments (JavaPairRDD)*
>
> *What can I do in order to solve my problem (run FPGrowth over
> JavaPairRDD)?*
>
> I'm available to give you more information, just tell me exactly what you
> need.
> Thank you!
> Fernando Paladini
>


??????SparkLauncher is blocked until main process is killed.

2015-10-29 Thread ??????
Some additional information: the main.process shares jar files with the Spark 
job's driver and executor as their classpaths. It could not be those files' 
read/write lock, right?

------
??: "Ted Yu"
: 2015??10??30?? 11:46:21
??: "??";
: "jey";"user";
: Re: SparkLauncher is blocked until main process is killed.


Not much clue from the snippet on screen.

Is it possible to pastebin the whole jstack output ?


On Thu, Oct 29, 2015 at 7:58 PM, ??  wrote:



Here'sapartofthejstackoutput.

Thereleaseis1.5.1.

---??---
: "Ted Yu"
: 2015??10??30?? 10:11:34
: "jey";
: "??";"user";
: Re: SparkLauncher is blocked until main process is killed.


Which Spark release are you using ?

Please note the typo in email subject (corrected as of this reply)

On Thu, Oct 29, 2015 at 7:00 PM, Jey Kottalam  wrote:
Could you please provide the jstack output? That would help the devs identify 
the blocking operation more easily.


On Thu, Oct 29, 2015 at 6:54 PM, ??  wrote:
I tried to use SparkLauncher (org.apache.spark.launcher.SparkLauncher) to 
submit a Spark Streaming job, however, in my test, the SparkSubmit process got 
stuck in the "addJar" procedure. Only when the main process (the caller of 
SparkLauncher) is killed, the submit procedeure continues to run. I ran jstack 
for the process, it seems jetty was blocking it, and I'm pretty sure there was 
no port conflicts.


The environment is RHEL(RedHot Enterprise Linux) 6u3 x64, Spark runs in 
standalone mode.


Did this happen to any of you?

Spark 1.5.1 Dynamic Resource Allocation

2015-10-29 Thread tstewart
I am running the following command on a Hadoop cluster to launch Spark shell
with DRA:
spark-shell  --conf spark.dynamicAllocation.enabled=true --conf
spark.shuffle.service.enabled=true --conf
spark.dynamicAllocation.minExecutors=4 --conf
spark.dynamicAllocation.maxExecutors=12 --conf
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=120 --conf
spark.dynamicAllocation.schedulerBacklogTimeout=300 --conf
spark.dynamicAllocation.executorIdleTimeout=60 --executor-memory 512m
--master yarn-client --queue default

This is the code I'm running within the Spark Shell - just demo stuff from
teh web site.

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("hdfs://ns/public/sample/kmeans_data.txt")

val parsedData = data.map(s => Vectors.dense(s.split('
').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

This works fine on Spark 1.4.1 but is failing on Spark 1.5.1. Did something
change that I need to do differently for DRA on 1.5.1?

This is the error I am getting:
15/10/29 21:44:19 WARN YarnScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources
15/10/29 21:44:34 WARN YarnScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources
15/10/29 21:44:49 WARN YarnScheduler: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources

That happens to be the same error you get if you haven't followed the steps
to enable DRA, however I have done those and as I said if I just flip to
Spark 1.4.1 on the same cluster it works with my YARN config.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-Dynamic-Resource-Allocation-tp25231.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Maintaining overall cumulative data in Spark Streaming

2015-10-29 Thread Sandeep Giri
Yes, update state by key worked.

Though there are some more complications.
On Oct 30, 2015 8:27 AM, "skaarthik oss"  wrote:

> Did you consider UpdateStateByKey operation?
>
>
>
> *From:* Sandeep Giri [mailto:sand...@knowbigdata.com]
> *Sent:* Thursday, October 29, 2015 3:09 PM
> *To:* user ; dev 
> *Subject:* Maintaining overall cumulative data in Spark Streaming
>
>
>
> Dear All,
>
>
>
> If a continuous stream of text is coming in and you have to keep
> publishing the overall word count so far since 0:00 today, what would you
> do?
>
>
>
> Publishing the results for a window is easy but if we have to keep
> aggregating the results, how to go about it?
>
>
>
> I have tried to keep an StreamRDD with aggregated count and keep doing a
> fullouterjoin but didn't work. Seems like the StreamRDD gets reset.
>
>
>
> Kindly help.
>
>
>
> Regards,
>
> Sandeep Giri
>
>
>


Re: issue with spark.driver.maxResultSize parameter in spark 1.3

2015-10-29 Thread shahid ashraf
Hi
I guess you need to increase spark driver memory as well. But that should
be set in conf files
Let me know if that resolves
On Oct 30, 2015 7:33 AM, "karthik kadiyam" 
wrote:

> Hi,
>
> In spark streaming job i had the following setting
>
> this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
> and i got the error in the job as below
>
> User class threw exception: Job aborted due to stage failure: Total size
> of serialized results of 120 tasks (1082.2 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>
> Basically i realized that as default value is 1 GB. I changed
> the configuration as below.
>
> this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);
>
> and when i ran the job it gave the error
>
> User class threw exception: Job aborted due to stage failure: Total size
> of serialized results of 120 tasks (1082.2 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>
> So, basically the change i made is not been considered in the job. so my
> question is
>
> - "spark.driver.maxResultSize", “2g” is this the right way to change or
> any other way to do it.
> - Is this a bug in spark 1.3 or something or any one had this issue
> before?
>
>


Re: How do I parallize Spark Jobs at Executor Level.

2015-10-29 Thread Vinoth Sankar
Hi Adrian,

Yes. I need to load all files and process it in parallel. Following code
doesn't seem working(Here I used map, even tried foreach) ,I just
downloading the files from HDFS to local system and printing the logs count
in each file. Its not throwing any Exceptions,but not working. Files are
not getting downloaded. I even didn't get that LOGGER print. Same code
works if I iterate the files, but its not Parallelized. How do I get my
code Parallelized and Working.

JavaRDD files = sparkContext.parallelize(fileList);

files.map(new Function()
{
public static final long serialVersionUID = 1L;

@Override
public Void call(String hdfsPath) throws Exception
{
JavaPairRDD hdfsContent =
sc.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);
JavaRDD logs = hdfsContent.map(new Function, Message>()
{
public Message call(Tuple2 tuple2) throws
Exception
{
BytesWritable value = tuple2._2();
BytesWritable tmp = new BytesWritable();
tmp.setCapacity(value.getLength());
tmp.set(value);
return (Message) getProtos(1, tmp.getBytes());
}
});

String path = "/home/sas/logsfilterapp/temp/" + hdfsPath.substring(26);

Thread.sleep(2000);
logs.saveAsObjectFile(path);

LOGGER.log(Level.INFO, "HDFS Path : {0} Count : {1}", new Object[] {
hdfsPath, logs.count() });
return null;
}
});



Note : In another scenario also i didn't get the logs which are present
inside map,filter closures. But logs outside these closures are getting
printed as usual. If i can't get the logger prints inside these closures
how do i debug them ?

Thanks
Vinoth Sankar

On Wed, Oct 28, 2015 at 8:29 PM Adrian Tanase  wrote:

> The first line is distributing your fileList variable in the cluster as a
> RDD, partitioned using the default partitioner settings (e.g. Number of
> cores in your cluster).
>
> Each of your workers would one or more slices of data (depending on how
> many cores each executor has) and the abstraction is called partition.
>
> What is your use case? If you want to load the files and continue
> processing in parallel, then a simple .map should work.
> If you want to execute arbitrary code based on the list of files that each
> executor received, then you need to use .foreach that will get executed for
> each of the entries, on the worker.
>
> -adrian
>
> From: Vinoth Sankar
> Date: Wednesday, October 28, 2015 at 2:49 PM
> To: "user@spark.apache.org"
> Subject: How do I parallize Spark Jobs at Executor Level.
>
> Hi,
>
> I'm reading and filtering large no of files using Spark. It's getting
> parallized at Spark Driver level only. How do i make it parallelize to
> Executor(Worker) Level. Refer the following sample. Is there any way to
> paralleling iterate the localIterator ?
>
> Note : I use Java 1.7 version
>
> JavaRDD files = javaSparkContext.parallelize(fileList)
> Iterator localIterator = files.toLocalIterator();
>
> Regards
> Vinoth Sankar
>


SparkLauncher is blocked until mail process is killed.

2015-10-29 Thread ??????
I tried to use SparkLauncher (org.apache.spark.launcher.SparkLauncher) to 
submit a Spark Streaming job, however, in my test, the SparkSubmit process got 
stuck in the "addJar" procedure. Only when the main process (the caller of 
SparkLauncher) is killed, the submit procedeure continues to run. I ran jstack 
for the process, it seems jetty was blocking it, and I'm pretty sure there was 
no port conflicts.


The environment is RHEL(RedHot Enterprise Linux) 6u3 x64, Spark runs in 
standalone mode.


Did this happen to any of you?

Re: SparkLauncher is blocked until mail process is killed.

2015-10-29 Thread Jey Kottalam
Could you please provide the jstack output? That would help the devs
identify the blocking operation more easily.

On Thu, Oct 29, 2015 at 6:54 PM, 陈宇航  wrote:

> I tried to use SparkLauncher (org.apache.spark.launcher.SparkLauncher) to
> submit a Spark Streaming job, however, in my test, the SparkSubmit process
> got stuck in the "addJar" procedure. Only when the main process (the caller
> of SparkLauncher) is killed, the submit procedeure continues to run. I ran
> jstack for the process, it seems jetty was blocking it, and I'm pretty sure
> there was no port conflicts.
>
> The environment is RHEL(RedHot Enterprise Linux) 6u3 x64, Spark runs in
> standalone mode.
>
> Did this happen to any of you?
>
>


issue with spark.driver.maxResultSize parameter in spark 1.3

2015-10-29 Thread karthik kadiyam
Hi,

In spark streaming job i had the following setting

this.jsc.getConf().set("spark.driver.maxResultSize", “0”);
and i got the error in the job as below

User class threw exception: Job aborted due to stage failure: Total size of
serialized results of 120 tasks (1082.2 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

Basically i realized that as default value is 1 GB. I changed
the configuration as below.

this.jsc.getConf().set("spark.driver.maxResultSize", “2g”);

and when i ran the job it gave the error

User class threw exception: Job aborted due to stage failure: Total size of
serialized results of 120 tasks (1082.2 MB) is bigger than
spark.driver.maxResultSize (1024.0 MB)

So, basically the change i made is not been considered in the job. so my
question is

- "spark.driver.maxResultSize", “2g” is this the right way to change or any
other way to do it.
- Is this a bug in spark 1.3 or something or any one had this issue before?


Re: [Spark] java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2015-10-29 Thread Deng Ching-Mallete
Hi Yifan,

This is a known issue, please refer to
https://issues.apache.org/jira/browse/SPARK-6235 for more details. In your
case, it looks like you are caching to disk a partition > 2G. A workaround
would be to increase the number of your RDD partitions in order to make
them smaller in size.

HTH,
Deng

On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI  wrote:

> I have a guess that before scanning that RDD, I sorted it and set
> partitioning, so the result is not balanced:
>
> sortBy[S](f: Function
> 
> [T, S], ascending: Boolean, *numPartitions*: Int)
>
> I will try to repartition it to see if it helps.
>
> Best,
> Yifan LI
>
>
>
>
>
> On 29 Oct 2015, at 12:52, Yifan LI  wrote:
>
> Hey,
>
> I was just trying to scan a large RDD sortedRdd, ~1billion elements, using
> toLocalIterator api, but an exception returned as it was almost finished:
>
> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size
> exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
>
> at
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
> at
> 

Re: SparkLauncher is blocked until main process is killed.

2015-10-29 Thread Ted Yu
Which Spark release are you using ?

Please note the typo in email subject (corrected as of this reply)

On Thu, Oct 29, 2015 at 7:00 PM, Jey Kottalam  wrote:

> Could you please provide the jstack output? That would help the devs
> identify the blocking operation more easily.
>
> On Thu, Oct 29, 2015 at 6:54 PM, 陈宇航  wrote:
>
>> I tried to use SparkLauncher (org.apache.spark.launcher.SparkLauncher) to
>> submit a Spark Streaming job, however, in my test, the SparkSubmit process
>> got stuck in the "addJar" procedure. Only when the main process (the caller
>> of SparkLauncher) is killed, the submit procedeure continues to run. I ran
>> jstack for the process, it seems jetty was blocking it, and I'm pretty sure
>> there was no port conflicts.
>>
>> The environment is RHEL(RedHot Enterprise Linux) 6u3 x64, Spark runs in
>> standalone mode.
>>
>> Did this happen to any of you?
>>
>>
>


Re: Pivot Data in Spark and Scala

2015-10-29 Thread Deng Ching-Mallete
Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss  wrote:

> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>