How do I parallize Spark Jobs at Executor Level.

2015-10-28 Thread Vinoth Sankar
Hi,

I'm reading and filtering large no of files using Spark. It's getting
parallized at Spark Driver level only. How do i make it parallelize to
Executor(Worker) Level. Refer the following sample. Is there any way to
paralleling iterate the localIterator ?

Note : I use Java 1.7 version

JavaRDD files = javaSparkContext.parallelize(fileList)
Iterator localIterator = files.toLocalIterator();

Regards
Vinoth Sankar


Re: Spark Core Transitive Dependencies

2015-10-28 Thread Furkan KAMACI
Hi Deng,

Could you give an example of which libraries you include for your purpose?

Kind Regards,
Furkan KAMACI

On Wed, Oct 28, 2015 at 4:07 AM, Deng Ching-Mallete 
wrote:

> Hi,
>
> The spark assembly jar already includes the spark core libraries plus
> their transitive dependencies, so you don't need to include them in your
> jar. I found it easier to use inclusions instead of exclusions when
> creating an assembly jar of my spark job so I would recommend going with
> that.
>
> HTH,
> Deng
>
>
> On Wed, Oct 28, 2015 at 6:20 AM, Furkan KAMACI 
> wrote:
>
>> Hi,
>>
>> I use Spark for for its newAPIHadoopRDD method and map/reduce etc. tasks.
>> When I include it I see that it has many transitive dependencies.
>>
>> Which of them I should exclude? I've included the dependency tree of
>> spark-core. Is there any documentation that explains why they are needed
>> (maybe all of them are necessary?)
>>
>> Kind Regards,
>> Furkan KAMACI
>>
>> PS: Dependency Tree:
>>
>> [INFO] +- org.apache.spark:spark-core_2.10:jar:1.4.1:compile
>> [INFO] |  +- com.twitter:chill_2.10:jar:0.5.0:compile
>> [INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
>> [INFO] |  | +-
>> com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
>> [INFO] |  | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
>> [INFO] |  | \- org.objenesis:objenesis:jar:1.2:compile
>> [INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
>> [INFO] |  +- org.apache.spark:spark-launcher_2.10:jar:1.4.1:compile
>> [INFO] |  +- org.apache.spark:spark-network-common_2.10:jar:1.4.1:compile
>> [INFO] |  +- org.apache.spark:spark-network-shuffle_2.10:jar:1.4.1:compile
>> [INFO] |  +- org.apache.spark:spark-unsafe_2.10:jar:1.4.1:compile
>> [INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
>> [INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
>> [INFO] |  |  \- org.apache.curator:curator-framework:jar:2.4.0:compile
>> [INFO] |  | \- org.apache.curator:curator-client:jar:2.4.0:compile
>> [INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
>> [INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
>> [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>> [INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.6.6:compile
>> [INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.6.6:compile
>> [INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
>> [INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
>> [INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
>> [INFO] |  +- commons-net:commons-net:jar:2.2:compile
>> [INFO] |  +-
>> org.spark-project.akka:akka-remote_2.10:jar:2.3.4-spark:compile
>> [INFO] |  |  +-
>> org.spark-project.akka:akka-actor_2.10:jar:2.3.4-spark:compile
>> [INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
>> [INFO] |  |  +-
>> org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
>> [INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
>> [INFO] |  +-
>> org.spark-project.akka:akka-slf4j_2.10:jar:2.3.4-spark:compile
>> [INFO] |  +- org.scala-lang:scala-library:jar:2.10.4:compile
>> [INFO] |  +- org.json4s:json4s-jackson_2.10:jar:3.2.10:compile
>> [INFO] |  |  \- org.json4s:json4s-core_2.10:jar:3.2.10:compile
>> [INFO] |  | +- org.json4s:json4s-ast_2.10:jar:3.2.10:compile
>> [INFO] |  | \- org.scala-lang:scalap:jar:2.10.0:compile
>> [INFO] |  |\- org.scala-lang:scala-compiler:jar:2.10.0:compile
>> [INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
>> [INFO] |  |  \- asm:asm:jar:3.1:compile
>> [INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
>> [INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
>> [INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
>> [INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
>> [INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
>> [INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
>> [INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
>> [INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
>> [INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
>> [INFO] |  |  +-
>> com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
>> [INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
>> [INFO] |  +-
>> com.fasterxml.jackson.module:jackson-module-scala_2.10:jar:2.4.4:compile
>> [INFO] |  |  \- org.scala-lang:scala-reflect:jar:2.10.4:compile
>> [INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
>> [INFO] |  +- oro:oro:jar:2.0.8:compile
>> [INFO] |  +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
>> [INFO] |  |  \- org.tachyonproject:tachyon:jar:0.6.4:compile
>> [INFO] |  +- net.razorvine:pyrolite:jar:4.4:compile
>> [INFO] |  +- net.sf.py4j:py4j:jar:0.8.2.1:compile
>> [INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
>>
>
>


Why is no predicate pushdown performed, when using Hive (HiveThriftServer2) ?

2015-10-28 Thread Martin Senne
Hi all,

# Programm Sketch


   1. I create a HiveContext `hiveContext`
   2. With that context, I create a DataFrame `df` from a JDBC relational
   table.
   3. I register the DataFrame `df` via

   df.registerTempTable("TESTTABLE")

   4. I start a HiveThriftServer2 via

   HiveThriftServer2.startWithContext(hiveContext)



The TESTTABLE contains 1,000,000 entries, columns are ID (INT) and NAME
(VARCHAR)

+-++
| ID  |  NAME  |
+-++
| 1   | Hello  |
| 2   | Hello  |
| 3   | Hello  |
| ... | ...|

With Beeline I access the SQL Endpoint (at port 1) of the
HiveThriftServer and perform a query. E.g.

SELECT * FROM TESTTABLE WHERE ID='3'

When I inspect the QueryLog of the DB with the SQL Statements executed I see

/*SQL #:100 t:657*/  *SELECT \"ID\",\"NAME\" FROM test;*

So there happens no predicate pushdown , as the where clause is missing.


# Questions

This gives raise to the following questions:

   1. *Why is no predicate pushdown performed?*
   2.
*Can this be changed by not using registerTempTable? If so, how? *
   3. *Or is this a known restriction of the HiveThriftServer?*


# Counterexample

If I create a DataFrame `df` in Spark SQLContext and call

df.filter( df("ID") === 3).show()

I observe

/*SQL #:1*/SELECT \"ID\",\"NAME\" FROM test *WHERE ID = 3*;

as expected.


SparkR 1.5.1 ClassCastException when working with CSV files

2015-10-28 Thread rporcio
Hi,

When I'm working with csv files in R using SparkR, I got ClassCastException
during the execution of SparkR methods. The below process works fine in
1.4.1, but it is broken from 1.5.0.

(I will use the flights csv file from the examples as a reference, but I can
reproduce this with any csv file.)

Steps to reproduce:
1. Init spark and sql contexts. Use spark package
"com.databricks:spark-csv_2.11:1.0.3" for spark context initialization.
2. Init DataFrame as /df <- read.df(sqlContext, "path_to_flights.csv_file",
source = "com.databricks.spark.csv", header="true")/
3. Run command /head(df)/
4. Exception is occurred:
/ERROR CsvRelation$: Exception while parsing line: 2011-01-24
12:00:00,14,48,1448,1546,3,-1,"CO",1079,"SAT","N14214",0,37,191.
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.spark.unsafe.types.UTF8String
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
.../

I'm using CentOS. 
On windows, the exception does not occur, but the DataFrame contains 0 row.

Do I miss something?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-1-5-1-ClassCastException-when-working-with-CSV-files-tp25217.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Prevent partitions from moving

2015-10-28 Thread t3l
I have a cluster with 2 nodes (32 CPU cores each). My data is distributed
evenly, but the processing times for each partition can vary greatly. Now,
sometimes Spark seems to conclude from the current workload on both nodes
that it might be better to shift one partition from node1 to node2 (because
that guy has cores waiting for work). Am i hallucinating or is that really
the happening? Is there any way I prevent this from happening?

Greetings,

T3L



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prevent-partitions-from-moving-tp25216.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting info from DecisionTreeClassificationModel

2015-10-28 Thread Yanbo Liang
AFAIK, you can not traverse the tree from the rootNode of
DecisionTreeClassificationModel, because type Node does not have
information of its children. Type InternalNode has children information but
it's private that users can not access.
I think the best way to get the probability of each prediction is to select
the rawPredictionCol and ProbabilityCol of the transformed DataFrame, it
will produce the raw prediction and probability prediction.

2015-10-22 12:43 GMT+08:00 sethah :

> I believe this question will give you the answer your looking for:
> Decision
> Tree Accuracy
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLlib-Decision-Tree-Node-Accuracy-td24561.html#a24629
> >
>
> Basically, you can traverse the tree from the root node.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-info-from-DecisionTreeClassificationModel-tp25152p25159.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


nested select is not working in spark sql

2015-10-28 Thread Kishor Bachhav
Hi,

I am trying to execute below query in spark sql but throws exception

select n_name from NATION where n_regionkey = (select r_regionkey from
REGION where r_name='ASIA')

Exception:
Exception in thread "main" java.lang.RuntimeException: [1.55] failure:
``)'' expected but identifier r_regionkey found

select n_name from NATION where n_regionkey = (select r_regionkey from
REGION where r_name='ASIA')
  ^
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at
org.apache.spark.sql.SnappyParserDialect.parse(snappyParsers.scala:65)
at
org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
at
org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:169)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)


Same is working in mysql as well as memsql.

Expected Result is

memsql> select n_name from NATION where n_regionkey = (select r_regionkey
from REGION where r_name='ASIA');
+---+
| n_name|
+---+
| INDIA |
| INDONESIA |
| JAPAN |
| CHINA |
| VIETNAM   |
+---+
5 rows in set (0.71 sec)

How can I make this work in spark sql?

Actually above query is one simplified version of Minimum cost supplier
query (Q2) of TPCH which has this nested select nature. I am working on
these TPCH queries. If anybody has the modified set of TPCH queries for
spark sql, kindly let me know. It will be very useful for me.

select
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
from
part,
supplier,
partsupp,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = [SIZE]
and p_type like '%[TYPE]'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = '[REGION]'
and ps_supplycost = (
  select
min(ps_supplycost)
from
partsupp, supplier,
nation, region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = '[REGION]'
)
order by
s_acctbal desc,
n_name,
s_name,
p_partkey;


Regards
  Kishor


Hive Version

2015-10-28 Thread Bryan Jeffrey
All,

I am using a HiveContext to create persistent tables from Spark. I am using
the Spark 1.4.1 (Scala 2.11) built-in Hive support.  What version of Hive
does the Spark Hive correspond to? I ask because AVRO format and Timestamps
in Parquet do not appear to be supported.

I have searched a lot of the Spark documentation, but do not see version
specified anywhere - it would be a good addition.

Thank you,

Bryan Jeffrey


Re: Hive Version

2015-10-28 Thread Michael Armbrust
Documented here:
http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore

In 1.4.1 we compile against 0.13.1

On Wed, Oct 28, 2015 at 2:26 PM, Bryan Jeffrey 
wrote:

> All,
>
> I am using a HiveContext to create persistent tables from Spark. I am
> using the Spark 1.4.1 (Scala 2.11) built-in Hive support.  What version of
> Hive does the Spark Hive correspond to? I ask because AVRO format and
> Timestamps in Parquet do not appear to be supported.
>
> I have searched a lot of the Spark documentation, but do not see version
> specified anywhere - it would be a good addition.
>
> Thank you,
>
> Bryan Jeffrey
>


Apache Spark on Raspberry Pi Cluster with Docker

2015-10-28 Thread Mark Bonnekessel
Hi,

we are trying to setup apache spark on a raspberry pi cluster for educational 
use.
Spark is installed in a docker container and all necessary ports are exposed.

After we start master and workers, all workers are listed as alive in the 
master web ui (http://master:8080 ).

I want to run the SimpleApp-Example from the spark-homepage 
(http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
 
)
 on the cluster to verify that everything is working, but i cannot get it run.

I built an jar-file and submitted the application with spark-submit and get the 
following output:
spark-submit --master spark://master:6066  --deploy-mode 
cluster --class SimpleApp target/simple-project-1.0.jar
Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/28 12:54:43 INFO RestSubmissionClient: Submitting a request to launch an 
application in spark://localhost:6066 .
15/10/28 12:54:43 INFO RestSubmissionClient: Submission successfully created as 
driver-20151028115443-0002. Polling submission state...
15/10/28 12:54:43 INFO RestSubmissionClient: Submitting a request for the 
status of submission driver-20151028115443-0002 in spark://localhost:6066 
.
15/10/28 12:54:43 INFO RestSubmissionClient: State of driver 
driver-20151028115443-0002 is now SUBMITTED.
15/10/28 12:54:43 INFO RestSubmissionClient: Server responded with 
CreateSubmissionResponse:
{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20151028115443-0002",
  "serverSparkVersion" : "1.5.1",
  "submissionId" : "driver-20151028115443-0002",
  "success" : true
}

The driver is created correctly, but it never starts the application.
What am i missing?

Regards,
Mark

Building spark-1.5.x and MQTT

2015-10-28 Thread Bob Corsaro
Has anyone successful built this? I'm trying to determine if there is a
defect in the source package or something strange about my environment. I
get a FileNotFound exception on MQTTUtils.class during the build of the
MQTT module. The only work around I've found is to remove the MQTT modules
from the pom.xml.


No way to supply hive-site.xml in yarn client mode?

2015-10-28 Thread Zoltan Fedor
Hi,
We have a shared CDH 5.3.3 cluster and trying to use Spark 1.5.1 on it in
yarn client mode with Hive.

I have compiled Spark 1.5.1 with SPARK_HIVE=true, but it seems I am not
able to make SparkSQL to pick up the hive-site.xml when runnig pyspark.

hive-site.xml is located in $SPARK_HOME/hadoop-conf/hive-site.xml and also
in $SPARK_HOME/conf/hive-site.xml

When I start pyspark with the below command and then run some simple
SparkSQL it fails, it seems it didn't pic up the settings in hive-site.xml

$ HADOOP_CONF_DIR=$SPARK_HOME/hadoop-conf
YARN_CONF_DIR=$SPARK_HOME/yarn-conf HADOOP_USER_NAME=biapp MASTER=yarn
$SPARK_HOME/bin/pyspark --deploy-mode client

Python 2.6.6 (r266:84292, Jul 23 2015, 05:13:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark-1.5.1-bin-without-hadoop/lib/spark-assembly-1.5.1-hadoop2.5.0-cdh5.3.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/28 10:22:33 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/10/28 10:22:35 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/28 10:22:59 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
  /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 05:13:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>> sqlContext2 = HiveContext(sc)
>>> sqlContext2.sql("show databases").first()
15/10/28 10:23:12 WARN HiveConf: HiveConf of name hive.metastore.local does
not exist
15/10/28 10:23:13 WARN ShellBasedUnixGroupsMapping: got exception trying to
get groups for user biapp: id: biapp: No such user

15/10/28 10:23:13 WARN UserGroupInformation: No groups available for user
biapp
Traceback (most recent call last):
  File "", line 1, in 
  File
"/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
  File
"/usr/lib/spark-1.5.1-bin-without-hadoop/python/pyspark/sql/context.py",
line 660, in _ssql_ctx
"build/sbt assembly", e)
Exception: ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and
run build/sbt assembly", Py4JJavaError(u'An error occurred while calling
None.org.apache.spark.sql.hive.HiveContext.\n', JavaObject id=o20))
>>>


See in the above the warning about "WARN HiveConf: HiveConf of name
hive.metastore.local does not exist" while actually there is a
hive.metastore.local attribute in the hive-site.xml

Any idea how to submit hive-site.xml in yarn client mode?

Thanks


Re: SparkR 1.5.1 ClassCastException when working with CSV files

2015-10-28 Thread rporcio
It seems that the cause of this exception was the wrong version of the
spark-csv package.
After I upgraded it to the latest (1.2.0) version, the exception is gone and
it works fine. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-1-5-1-ClassCastException-when-working-with-CSV-files-tp25217p25219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building spark-1.5.x and MQTT

2015-10-28 Thread Bob Corsaro
Built from
http://mirror.olnevhost.net/pub/apache/spark/spark-1.5.1/spark-1.5.1.tgz using
the following command:

build/mvn -DskipTests=true -Dhadoop.version=2.4.1
-P"hadoop-2.4,kinesis-asl,netlib-lgpl" package install

build/mvn is from the packaged source.

Tried on a couple of ubuntu boxen and a gentoo box.

On Wed, Oct 28, 2015 at 9:59 AM Ted Yu  wrote:

> MQTTUtils.class is generated from
> external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
>
> What command did you use to build ?
> Which release / branch were you building ?
>
> Thanks
>
> On Wed, Oct 28, 2015 at 6:19 AM, Bob Corsaro  wrote:
>
>> Has anyone successful built this? I'm trying to determine if there is a
>> defect in the source package or something strange about my environment. I
>> get a FileNotFound exception on MQTTUtils.class during the build of the
>> MQTT module. The only work around I've found is to remove the MQTT modules
>> from the pom.xml.
>>
>
>


Re: How do I parallize Spark Jobs at Executor Level.

2015-10-28 Thread Adrian Tanase
The first line is distributing your fileList variable in the cluster as a RDD, 
partitioned using the default partitioner settings (e.g. Number of cores in 
your cluster).

Each of your workers would one or more slices of data (depending on how many 
cores each executor has) and the abstraction is called partition.

What is your use case? If you want to load the files and continue processing in 
parallel, then a simple .map should work.
If you want to execute arbitrary code based on the list of files that each 
executor received, then you need to use .foreach that will get executed for 
each of the entries, on the worker.

-adrian

From: Vinoth Sankar
Date: Wednesday, October 28, 2015 at 2:49 PM
To: "user@spark.apache.org"
Subject: How do I parallize Spark Jobs at Executor Level.

Hi,

I'm reading and filtering large no of files using Spark. It's getting 
parallized at Spark Driver level only. How do i make it parallelize to 
Executor(Worker) Level. Refer the following sample. Is there any way to 
paralleling iterate the localIterator ?

Note : I use Java 1.7 version

JavaRDD files = javaSparkContext.parallelize(fileList)
Iterator localIterator = files.toLocalIterator();

Regards
Vinoth Sankar


Re: Building spark-1.5.x and MQTT

2015-10-28 Thread Ted Yu
MQTTUtils.class is generated from
external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

What command did you use to build ?
Which release / branch were you building ?

Thanks

On Wed, Oct 28, 2015 at 6:19 AM, Bob Corsaro  wrote:

> Has anyone successful built this? I'm trying to determine if there is a
> defect in the source package or something strange about my environment. I
> get a FileNotFound exception on MQTTUtils.class during the build of the
> MQTT module. The only work around I've found is to remove the MQTT modules
> from the pom.xml.
>


Apache Spark on Raspberry Pi Cluster with Docker

2015-10-28 Thread Mark Bonnekessel
Hi,

we are trying to setup apache spark on a raspberry pi cluster for educational 
use.
Spark is installed in a docker container and all necessary ports are exposed.

After we start master and workers, all workers are listed as alive in the 
master web ui (http://master:8080 ).

I want to run the SimpleApp-Example from the spark-homepage 
(http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
 
)
 on the cluster to verify that everything is working, but i cannot get it run.

I built an jar-file and submitted the application with spark-submit and get the 
following output:
spark-submit --master spark://master:6066  --deploy-mode 
cluster --class SimpleApp target/simple-project-1.0.jar
Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/10/28 12:54:43 INFO RestSubmissionClient: Submitting a request to launch an 
application in spark://localhost:6066 .
15/10/28 12:54:43 INFO RestSubmissionClient: Submission successfully created as 
driver-20151028115443-0002. Polling submission state...
15/10/28 12:54:43 INFO RestSubmissionClient: Submitting a request for the 
status of submission driver-20151028115443-0002 in spark://localhost:6066 
.
15/10/28 12:54:43 INFO RestSubmissionClient: State of driver 
driver-20151028115443-0002 is now SUBMITTED.
15/10/28 12:54:43 INFO RestSubmissionClient: Server responded with 
CreateSubmissionResponse:
{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20151028115443-0002",
  "serverSparkVersion" : "1.5.1",
  "submissionId" : "driver-20151028115443-0002",
  "success" : true
}

The driver is created correctly, but it never starts the application.
What am i missing?

Regards,
Mark

Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
Hello.

I am working to get a simple solution working using Spark SQL.  I am
writing streaming data to persistent tables using a HiveContext.  Writing
to a persistent non-partitioned table works well - I update the table using
Spark streaming, and the output is available via Hive Thrift/JDBC.

I create a table that looks like the following:

0: jdbc:hive2://localhost:1> describe windows_event;
describe windows_event;
+--+-+--+
| col_name |  data_type  | comment  |
+--+-+--+
| target_entity| string  | NULL |
| target_entity_type   | string  | NULL |
| date_time_utc| timestamp   | NULL |
| machine_ip   | string  | NULL |
| event_id | string  | NULL |
| event_data   | map  | NULL |
| description  | string  | NULL |
| event_record_id  | string  | NULL |
| level| string  | NULL |
| machine_name | string  | NULL |
| sequence_number  | string  | NULL |
| source   | string  | NULL |
| source_machine_name  | string  | NULL |
| task_category| string  | NULL |
| user | string  | NULL |
| additional_data  | map  | NULL |
| windows_event_time_bin   | timestamp   | NULL |
| # Partition Information  | |  |
| # col_name   | data_type   | comment  |
| windows_event_time_bin   | timestamp   | NULL |
+--+-+--+


However, when I create a partitioned table and write data using the
following:

hiveWindowsEvents.foreachRDD( rdd => {
  val eventsDataFrame = rdd.toDF()

eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
})

The data is written as though the table is not partitioned (so everything
is written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because
the data is not following the partition schema, it is not accessible (and
not partitioned).

Is there a straightforward way to write to partitioned tables using Spark
SQL?  I understand that the read performance for partitioned data is far
better - are there other performance improvements that might be better to
use instead of partitioning?

Regards,

Bryan Jeffrey


Inconsistent Persistence of DataFrames in Spark 1.5

2015-10-28 Thread Colin Alstad
We recently switched to Spark 1.5.0 from 1.4.1 and have noticed some
inconsistent behavior in persisting DataFrames.

df1 = sqlContext.read.parquet(“df1.parquet”)
df1.count()
> 161,100,982

df2 = sqlContext.read.parquet(“df2.parquet”)
df2.count()
> 67,498,706

join_df = df1.join(df2, ‘id’)
join_df.count()
> 160,608,147

join_df.write.parquet(“join.parquet”)
join_parquet = sqlContext.read.parquet(“join.parquet”)
join_parquet.count()
> 67,698,892

join_df.write.json(“join.json”)
join_json = sqlContext.read.parquet(“join.json”)
join_son.count()
> 67,695,663

The first major issue is that there is an order of magnitude difference
between the count of the join DataFrame and the persisted join DataFrame.
Secondly, persisting the same DataFrame into 2 different formats yields
different results.

Does anyone have any idea on what could be going on here?

-- 
Colin Alstad
Data Scientist
colin.als...@pokitdok.com




Re: Filter applied on merged Parquet shemsa with new column fails.

2015-10-28 Thread Cheng Lian

Hey Hyukjin,

Sorry that I missed the JIRA ticket. Thanks for bring this issue up 
here, your detailed investigation.


From my side, I think this is a bug of Parquet. Parquet was designed to 
support schema evolution. When scanning a Parquet, if a column exists in 
the requested schema but missing in the file schema, that column is 
filled with null. This should also hold for pushed-down predicate 
filters. For example, if filter "a = 1" is pushed down but column "a" 
doesn't exist in the Parquet file being scanned, it's safe to assume "a" 
is null in all records and drop all of them. On the contrary, if "a IS 
NULL" is pushed down, all records should be preserved.


Apparently, before this issue is properly fixed on Parquet side, we need 
to workaround this issue from Spark side. Please see my comments of all 
3 of your solutions inlined below. In short, I'd like to have approach 1 
for branch-1.5 and approach 2 for master.


Cheng

On 10/28/15 10:11 AM, Hyukjin Kwon wrote:
When enabling mergedSchema and predicate filter, this fails 
since Parquet filters are pushed down regardless of each schema of the 
splits (or rather files).


Dominic Ricard reported this 
issue (https://issues.apache.org/jira/browse/SPARK-11103)


Even though this would work okay by setting 
spark.sql.parquet.filterPushdown to false, the default value of this 
is true. So this looks an issue.


My questions are,
is this clearly an issue?
and if so, which way would this be handled?


I thought this is an issue and I made three rough patches for this and 
tested them and this looks fine though.


The first approach looks simpler and appropriate as I presume from the 
previous approaches such as 
https://issues.apache.org/jira/browse/SPARK-11153
However, in terms of safety and performances, I also want to ensure 
which one would be a proper approach before trying to open a PR.


1. Simply set false to spark.sql.parquet.filterPushdown when using 
mergeSchema
This one is pretty simple and safe, I'd like to have this for 1.5.2, or 
1.5.3 if we can't make it for 1.5.2.


2. If spark.sql.parquet.filterPushdown is true, retrieve all the 
schema of every part-files (and also merged one) and check if each can 
accept the given schema and then, apply the filter only when they all 
can accept, which I think it's a bit over-implemented.
Actually we only need to calculate the intersection of all file 
schemata. We can make ParquetRelation.mergeSchemaInParallel return two 
StructTypes, the first one is the original merged schema, the other is 
the intersection of all file schemata, which only contains fields that 
exist in all file schemata. Then we decide which filter to pushed down 
according to the second StructType.


3. If spark.sql.parquet.filterPushdown is true, retrieve all the 
schema of every part-files (and also merged one) and apply the filter 
to each split (rather file) that can accept the filter which (I think 
it's hacky) ends up different configurations for each task in a job.
The idea I came up with at first was similar to this one. Instead of 
pulling all file schemata to driver side, we can push filter push-down 
to executor side. Namely, passing candidate filters to executor side, 
and compute the Parquet predicate filter according to each file schema. 
I haven't looked into this direction in depth, but we can probably put 
this part into CatalystReadSupport, which is now initialized on executor 
side.


However, correctness of this approach can only guaranteed by the 
defensive filtering we do in Spark SQL (i.e. apply all the filters no 
matter they are pushed down or not), but we are considering to remove it 
because it imposes unnecessary performance cost. This makes me hesitant 
to go along this way.


org.apache.spark.shuffle.FetchFailedException: Failed to connect to ..... on worker failure

2015-10-28 Thread kundan kumar
Hi,

I am running a Spark Streaming Job. I was testing the fault tolerance by
killing one of the workers using the kill -9 command.

What I understand is, when I kill a worker the process should not die and
resume the execution.

But, I am getting the following error and my process is halted.

org.apache.spark.shuffle.FetchFailedException: Failed to connect to .



Now, when I restart the same worker or (2 workers were running on the
machine  and I killed just one of them) then the execution resumes and the
process is completed.

Please help me in understanding why on a worker failure my process is not
fault tolerant. Am I missing something ? Basically I need that my process
resumes even if a worker is lost.



Regards,
Kundan


Re: [Spark Streaming] Connect to Database only once at the start of Streaming job

2015-10-28 Thread Tathagata Das
Yeah, of course. Just create an RDD from jdbc, call cache()/persist(), then
force it to be evaluated using something like count(). Once it is cached,
you can use it in a StreamingContext. Because of the cache it should not
access JDBC any more.

On Tue, Oct 27, 2015 at 12:04 PM, diplomatic Guru 
wrote:

> I know it uses lazy model, which is why I was wondering.
>
> On 27 October 2015 at 19:02, Uthayan Suthakar 
> wrote:
>
>> Hello all,
>>
>> What I wanted to do is configure the spark streaming job to read the
>> database using JdbcRDD and cache the results. This should occur only once
>> at the start of the job. It should not make any further connection to DB
>>  afterwards. Is it possible to do that?
>>
>
>


Re: Spark-Testing-Base Q/A

2015-10-28 Thread Holden Karau
And now (before 1am California time :p) there is a new version of
spark-testing base which adds a java base class for streaming tests. I
noticed you were using 1.3 so I put in the effort to make this release for
Spark 1.3 to 1.5 inclusive).

On Wed, Oct 21, 2015 at 4:16 PM, Holden Karau  wrote:

>
>
> On Wednesday, October 21, 2015, Mark Vervuurt 
> wrote:
>
>> Hi Holden,
>>
>> Thanks for the information, I think that a Java Base Class in order to
>> test SparkStreaming using Java would be useful for the community.
>> Unfortunately not all of our customers are willing to use Scala or Python.
>>
> Sounds reasonable, I'll add it this week.
>
>>
>> If i am not wrong it’s 4:00 AM for you in California ;)
>>
>> Yup, I'm not great a regular schedules but I make up for it by doing
> stuff when I've had too much coffee to sleep :p
>
>> Regards,
>> Mark
>>
>> On 21 Oct 2015, at 12:42, Holden Karau  wrote:
>>
>>
>>
>> On Wednesday, October 21, 2015, Mark Vervuurt 
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I am busy trying out ‘Spark-Testing-Base
>>> ’. I have the following
>>> questions?
>>>
>>>
>>>- Can you test Spark Streaming Jobs using Java?
>>>
>>> The current base class for testing streaming jobs is implemented using a
>> Scala test library (and one in Python too), I can add one using a junit
>> base for streaming if it would be useful for you.
>>
>>>
>>>- Can I use Spark-Testing-Base 1.3.0_0.1.1 together with Spark 1.3.1?
>>>
>>>  You should be able to, the API changes were small enough I didn't
>> publish a seperate package, but if you run into any issues let me know.
>>
>>>
>>>
>>>
>>> Thanks.
>>>
>>> Greetings,
>>> Mark
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: [Spark Streaming] Connect to Database only once at the start of Streaming job

2015-10-28 Thread Tathagata Das
However, if your executor dies. Then it may reconnect to JDBC to
reconstruct the RDD partitions that were lost. To prevent that you can
checkpoint the RDD to a HDFS-like filesystem (using rdd.checkpoint()). Then
you are safe, it wont reconnect to JDBC.


On Tue, Oct 27, 2015 at 11:17 PM, Tathagata Das  wrote:

> Yeah, of course. Just create an RDD from jdbc, call cache()/persist(),
> then force it to be evaluated using something like count(). Once it is
> cached, you can use it in a StreamingContext. Because of the cache it
> should not access JDBC any more.
>
> On Tue, Oct 27, 2015 at 12:04 PM, diplomatic Guru <
> diplomaticg...@gmail.com> wrote:
>
>> I know it uses lazy model, which is why I was wondering.
>>
>> On 27 October 2015 at 19:02, Uthayan Suthakar > > wrote:
>>
>>> Hello all,
>>>
>>> What I wanted to do is configure the spark streaming job to read the
>>> database using JdbcRDD and cache the results. This should occur only once
>>> at the start of the job. It should not make any further connection to DB
>>>  afterwards. Is it possible to do that?
>>>
>>
>>
>


RE: SPARKONHBase checkpointing issue

2015-10-28 Thread Amit Hora
Thanks for sharing the link.Yes I understand that accumulators and broadcast 
variables state are not recovered from checkpoint but is there any way by which 
I can say that the HBaseContext in this context should nt be recovered from 
checkpoint rather must be reinitialized 

-Original Message-
From: "Adrian Tanase" 
Sent: ‎27-‎10-‎2015 18:08
To: "Amit Singh Hora" ; "user@spark.apache.org" 

Subject: Re: SPARKONHBase checkpointing issue

Does this help?

https://issues.apache.org/jira/browse/SPARK-5206



On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:

>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>  
>def main(args: Array[String]): Unit = {
>
>
>
>   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val checkpointDirectory=conf.getString("spark.checkpointDir")
>val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
>  functionToCreateContext(checkpointDirectory)
>})
> 
>
>ssc.start()
>ssc.awaitTermination()
>
> }
>
>def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
>  println("always gets created")
>   val hconf = HBaseConfiguration.create();
>val timeout= conf.getString("hbase.zookeepertimeout")
>val master=conf.getString("hbase.hbase_master")
>val zk=conf.getString("hbase.hbase_zkquorum")
>val zkport=conf.getString("hbase.hbase_zk_port")
>
>  hconf.set("zookeeper.session.timeout",timeout);
>hconf.set("hbase.client.retries.number", Integer.toString(1));
>hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>hconf.set("hbase.master", master);
>hconf.set("hbase.zookeeper.quorum",zk);
>hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>   
>val hbaseContext = new HBaseContext(sc, hconf);
>return hbaseContext
>}
>  def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
>println("creating for frst time")
>val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>val brokerlist = conf.getString("kafka.broker")
>val topic = conf.getString("kafka.topic")
>
>val Array(brokers, topics) = Array(brokerlist, topic)
>
>
>val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
>sparkConf.set("spark.cleaner.ttl", "2");
>sparkConf.setMaster("local[2]")
>
>
> val topicsSet = topic.split(",").toSet
>val batchduration = conf.getString("spark.batchduration").toInt
>val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
>  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
>val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)
>val lines=messages.map(_._2)
>   
>
>
>getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
>  "ecs_test",
>  (putRecord) => {
>if (putRecord.length() > 0) {
>  var maprecord = new HashMap[String, String];
>  val mapper = new ObjectMapper();
>
>  //convert JSON string to Map
>  maprecord = mapper.readValue(putRecord,
>new TypeReference[HashMap[String, String]]() {});
>  
>  var ts: Long = maprecord.get("ts").toLong
>  
>   var tweetID:Long= maprecord.get("id").toLong
>  val key=ts+"_"+tweetID;
>  
>  val put = new Put(Bytes.toBytes(key))
>  maprecord.foreach(kv => {
> 
> 
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>  
>
>  })
>
>
>  put
>} else {
>  null
>}
>  },
>  false);
>
>ssc
>  
>  }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get 
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


RE: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-28 Thread Cheng, Hao
Hi Jerry, I’ve filed a bug in jira, and also the fixing

https://issues.apache.org/jira/browse/SPARK-11364

It will be great appreciated if you can verify the PR with your case.

Thanks,
Hao

From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Wednesday, October 28, 2015 8:51 AM
To: Jerry Lam; Marcelo Vanzin
Cc: user@spark.apache.org
Subject: RE: [Spark-SQL]: Unable to propagate hadoop configuration after 
SparkContext is initialized

After a draft glance, seems a bug in Spark SQL, do you mind to create a jira 
for this? And then I can start to fix it.

Thanks,
Hao

From: Jerry Lam [mailto:chiling...@gmail.com]
Sent: Wednesday, October 28, 2015 3:13 AM
To: Marcelo Vanzin
Cc: user@spark.apache.org
Subject: Re: [Spark-SQL]: Unable to propagate hadoop configuration after 
SparkContext is initialized

Hi Marcelo,

I tried setting the properties before instantiating spark context via 
SparkConf. It works fine.
Originally, the code I have read hadoop configurations from hdfs-site.xml which 
works perfectly fine as well.
Therefore, can I conclude that sparkContext.hadoopConfiguration.set("key", 
"value") does not propagate through all SQL jobs within the same SparkContext? 
I haven't try with Spark Core so I cannot tell.

Is there a workaround given it seems to be broken? I need to do this 
programmatically after the SparkContext is instantiated not before...

Best Regards,

Jerry

On Tue, Oct 27, 2015 at 2:30 PM, Marcelo Vanzin 
> wrote:
If setting the values in SparkConf works, there's probably some bug in
the SQL code; e.g. creating a new Configuration object instead of
using the one in SparkContext. But I'm not really familiar with that
code.

On Tue, Oct 27, 2015 at 11:22 AM, Jerry Lam 
> wrote:
> Hi Marcelo,
>
> Thanks for the advice. I understand that we could set the configurations
> before creating SparkContext. My question is
> SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
> propagate to all subsequent SQLContext jobs. Note that I mentioned I can
> load the parquet file but I cannot perform a count on the parquet file
> because of the AmazonClientException. It means that the credential is used
> during the loading of the parquet but not when we are processing the parquet
> file. How this can happen?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin 
> > wrote:
>>
>> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam 
>> > wrote:
>> > Anyone experiences issues in setting hadoop configurations after
>> > SparkContext is initialized? I'm using Spark 1.5.1.
>> >
>> > I'm trying to use s3a which requires access and secret key set into
>> > hadoop
>> > configuration. I tried to set the properties in the hadoop configuration
>> > from sparktcontext.
>> >
>> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
>> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
>>
>> Try setting "spark.hadoop.fs.s3a.access.key" and
>> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
>> SparkContext.
>>
>> --
>> Marcelo
>
>

--
Marcelo



Re: SPARKONHBase checkpointing issue

2015-10-28 Thread Tathagata Das
Yes, the workaround is the same that has been suggested in the JIRA for
accumulator and broadcast variables. Basically make a singleton object
which lazily initializes the HBaseContext. Because of singleton, it wont
get serialized through checkpoint. After recovering, it will be
reinitialized lazily. This is the exact same approach I did for `
SQLContext.getOrCreate()
`.
Take a look at the code.

On Tue, Oct 27, 2015 at 11:19 PM, Amit Hora  wrote:

> Thanks for sharing the link.Yes I understand that accumulators and
> broadcast variables state are not recovered from checkpoint but is there
> any way by which I can say that the HBaseContext in this context should nt
> be recovered from checkpoint rather must be reinitialized
> --
> From: Adrian Tanase 
> Sent: ‎27-‎10-‎2015 18:08
> To: Amit Singh Hora ; user@spark.apache.org
> Subject: Re: SPARKONHBase checkpointing issue
>
> Does this help?
>
> https://issues.apache.org/jira/browse/SPARK-5206
>
>
>
> On 10/27/15, 1:53 PM, "Amit Singh Hora"  wrote:
>
> >Hi all ,
> >
> >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
> >below code
> >object test {
> >
> >def main(args: Array[String]): Unit = {
> >
> >
> >
> >   val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >val checkpointDirectory=conf.getString("spark.checkpointDir")
> >val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
> >  functionToCreateContext(checkpointDirectory)
> >})
> >
> >
> >ssc.start()
> >ssc.awaitTermination()
> >
> > }
> >
> >def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
> >  println("always gets created")
> >   val hconf = HBaseConfiguration.create();
> >val timeout= conf.getString("hbase.zookeepertimeout")
> >val master=conf.getString("hbase.hbase_master")
> >val zk=conf.getString("hbase.hbase_zkquorum")
> >val zkport=conf.getString("hbase.hbase_zk_port")
> >
> >  hconf.set("zookeeper.session.timeout",timeout);
> >hconf.set("hbase.client.retries.number", Integer.toString(1));
> >hconf.set("zookeeper.recovery.retry", Integer.toString(1));
> >hconf.set("hbase.master", master);
> >hconf.set("hbase.zookeeper.quorum",zk);
> >hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
> >hconf.set("hbase.zookeeper.property.clientPort",zkport );
> >
> >
> >val hbaseContext = new HBaseContext(sc, hconf);
> >return hbaseContext
> >}
> >  def functionToCreateContext(checkpointDirectory: String):
> StreamingContext
> >= {
> >println("creating for frst time")
> >val conf =
> ConfigFactory.load("connection.conf").getConfig("connection")
> >val brokerlist = conf.getString("kafka.broker")
> >val topic = conf.getString("kafka.topic")
> >
> >val Array(brokers, topics) = Array(brokerlist, topic)
> >
> >
> >val sparkConf = new
> SparkConf().setAppName("HBaseBulkPutTimestampExample
> >" )
> >sparkConf.set("spark.cleaner.ttl", "2");
> >sparkConf.setMaster("local[2]")
> >
> >
> > val topicsSet = topic.split(",").toSet
> >val batchduration = conf.getString("spark.batchduration").toInt
> >val ssc: StreamingContext = new StreamingContext(sparkConf,
> >Seconds(batchduration))
> >  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
> > val kafkaParams = Map[String, String]("metadata.broker.list" ->
> >brokerlist, "auto.offset.reset" -> "smallest")
> >val messages = KafkaUtils.createDirectStream[String, String,
> >StringDecoder, StringDecoder](
> >  ssc, kafkaParams, topicsSet)
> >val lines=messages.map(_._2)
> >
> >
> >
> >getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
> >  "ecs_test",
> >  (putRecord) => {
> >if (putRecord.length() > 0) {
> >  var maprecord = new HashMap[String, String];
> >  val mapper = new ObjectMapper();
> >
> >  //convert JSON string to Map
> >  maprecord = mapper.readValue(putRecord,
> >new TypeReference[HashMap[String, String]]() {});
> >
> >  var ts: Long = maprecord.get("ts").toLong
> >
> >   var tweetID:Long= maprecord.get("id").toLong
> >  val key=ts+"_"+tweetID;
> >
> >  val put = new Put(Bytes.toBytes(key))
> >  maprecord.foreach(kv => {
> >
> >
> >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
> >
> >
> >  })
> >
> >
> >  put
> >} else {
> >  null
> >}
> >  },
> >  false);
> >
> >ssc
> >
> >  }
> >}
> >
> >i am not able to retrieve from checkpoint after restart ,always get
> >Unable to getConfig from broadcast
> >
> >after 

Re: Building spark-1.5.x and MQTT

2015-10-28 Thread Ted Yu
Using your command, I did get:

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-assembly-plugin:2.5.5:single
(test-jar-with-dependencies) on project spark-streaming-mqtt_2.10: Failed
to create assembly: Error creating assembly archive
test-jar-with-dependencies: Problem creating jar:
jar:file:/home/hbase/spark-1.5.2/external/mqtt/target/spark-streaming-mqtt_2.10-1.5.2.jar!/org/apache/spark/streaming/mqtt/MQTTReceiver$$anon$1.class:
JAR entry org/apache/spark/streaming/mqtt/MQTTReceiver$$anon$1.class not
found in
/home/hbase/spark-1.5.2/external/mqtt/target/spark-streaming-mqtt_2.10-1.5.2.jar
-> [Help 1]

But the following command passed:

build/mvn -DskipTests=true -Dhadoop.version=2.4.1
-P"hadoop-2.4,kinesis-asl,netlib-lgpl" package

FYI

On Wed, Oct 28, 2015 at 7:38 AM, Bob Corsaro  wrote:

> Built from
> http://mirror.olnevhost.net/pub/apache/spark/spark-1.5.1/spark-1.5.1.tgz using
> the following command:
>
> build/mvn -DskipTests=true -Dhadoop.version=2.4.1
> -P"hadoop-2.4,kinesis-asl,netlib-lgpl" package install
>
> build/mvn is from the packaged source.
>
> Tried on a couple of ubuntu boxen and a gentoo box.
>
> On Wed, Oct 28, 2015 at 9:59 AM Ted Yu  wrote:
>
>> MQTTUtils.class is generated from
>> external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
>>
>> What command did you use to build ?
>> Which release / branch were you building ?
>>
>> Thanks
>>
>> On Wed, Oct 28, 2015 at 6:19 AM, Bob Corsaro  wrote:
>>
>>> Has anyone successful built this? I'm trying to determine if there is a
>>> defect in the source package or something strange about my environment. I
>>> get a FileNotFound exception on MQTTUtils.class during the build of the
>>> MQTT module. The only work around I've found is to remove the MQTT modules
>>> from the pom.xml.
>>>
>>
>>


Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Afshartous, Nick

Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job and 
seeing a problem.  This is running in AWS/Yarn and the streaming batch interval 
is set to 3 minutes and this is a ten node cluster.

Testing at 30,000 events per second we are seeing the streaming job get stuck 
(stack trace below) for over an hour.

Thanks on any insights or suggestions.
--
  Nick

org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)

Notice: This communication is for the intended recipient(s) only and may 
contain confidential, proprietary, legally protected or privileged information 
of Turbine, Inc. If you are not the intended recipient(s), please notify the 
sender at once and delete this communication. Unauthorized use of the 
information in this communication is strictly prohibited and may be unlawful. 
For those recipients under contract with Turbine, Inc., the information in this 
communication is subject to the terms and conditions of any applicable 
contracts or agreements.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building spark-1.5.x and MQTT

2015-10-28 Thread Steve Loughran

> On 28 Oct 2015, at 13:19, Bob Corsaro  wrote:
> 
> Has anyone successful built this? I'm trying to determine if there is a 
> defect in the source package or something strange about my environment. I get 
> a FileNotFound exception on MQTTUtils.class during the build of the MQTT 
> module. The only work around I've found is to remove the MQTT modules from 
> the pom.xml.

I saw this last week, and believe that the problem a race condition between the 
compiler and the maven-assembly-plugin  zip file for python tests; they can 
apparently start at the same time, and one fails because the files it's listed 
aren't quite there yet. SPARK-5155 would be the cause of this.

Fix: move the assembly code to its own profile and only invoke it if you want 
that feature

Com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Inconsistent Persistence of DataFrames in Spark 1.5

2015-10-28 Thread Saif.A.Ellafi
Hi, just a couple cents.

are your joining columns StringTypes (id field)? I have recently reported a bug 
where having inconsistent results when filtering String fields in group 
operations.

Saif

From: Colin Alstad [mailto:colin.als...@pokitdok.com]
Sent: Wednesday, October 28, 2015 12:39 PM
To: user@spark.apache.org
Subject: Inconsistent Persistence of DataFrames in Spark 1.5

We recently switched to Spark 1.5.0 from 1.4.1 and have noticed some 
inconsistent behavior in persisting DataFrames.

df1 = sqlContext.read.parquet(“df1.parquet”)
df1.count()
> 161,100,982

df2 = sqlContext.read.parquet(“df2.parquet”)
df2.count()
> 67,498,706

join_df = df1.join(df2, ‘id’)
join_df.count()
> 160,608,147

join_df.write.parquet(“join.parquet”)
join_parquet = sqlContext.read.parquet(“join.parquet”)
join_parquet.count()
> 67,698,892

join_df.write.json(“join.json”)
join_json = sqlContext.read.parquet(“join.json”)
join_son.count()
> 67,695,663

The first major issue is that there is an order of magnitude difference between 
the count of the join DataFrame and the persisted join DataFrame.  Secondly, 
persisting the same DataFrame into 2 different formats yields different results.

Does anyone have any idea on what could be going on here?

--
Colin Alstad
Data Scientist
colin.als...@pokitdok.com

[https://platform.pokitdok.com/static/pd-assets/images/brand-nav.png]


SparkSQL: What is the cost of DataFrame.registerTempTable(String)? Can I have multiple tables referencing to the same DataFrame?

2015-10-28 Thread Anfernee Xu
Hi,

I just want to understand the cost of DataFrame.registerTempTable(String),
is it just a trivial operation(like creating a object reference) in
master(Driver) JVM? And Can I have multiple tables with different name
referencing to the same DataFrame?

Thanks

-- 
--Anfernee


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Susan Zhang
Have you tried partitionBy?

Something like

hiveWindowsEvents.foreachRDD( rdd => {
  val eventsDataFrame = rdd.toDF()
  eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
windows_event_time_bin").saveAsTable("windows_event")
})



On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
wrote:

> Hello.
>
> I am working to get a simple solution working using Spark SQL.  I am
> writing streaming data to persistent tables using a HiveContext.  Writing
> to a persistent non-partitioned table works well - I update the table using
> Spark streaming, and the output is available via Hive Thrift/JDBC.
>
> I create a table that looks like the following:
>
> 0: jdbc:hive2://localhost:1> describe windows_event;
> describe windows_event;
> +--+-+--+
> | col_name |  data_type  | comment  |
> +--+-+--+
> | target_entity| string  | NULL |
> | target_entity_type   | string  | NULL |
> | date_time_utc| timestamp   | NULL |
> | machine_ip   | string  | NULL |
> | event_id | string  | NULL |
> | event_data   | map  | NULL |
> | description  | string  | NULL |
> | event_record_id  | string  | NULL |
> | level| string  | NULL |
> | machine_name | string  | NULL |
> | sequence_number  | string  | NULL |
> | source   | string  | NULL |
> | source_machine_name  | string  | NULL |
> | task_category| string  | NULL |
> | user | string  | NULL |
> | additional_data  | map  | NULL |
> | windows_event_time_bin   | timestamp   | NULL |
> | # Partition Information  | |  |
> | # col_name   | data_type   | comment  |
> | windows_event_time_bin   | timestamp   | NULL |
> +--+-+--+
>
>
> However, when I create a partitioned table and write data using the
> following:
>
> hiveWindowsEvents.foreachRDD( rdd => {
>   val eventsDataFrame = rdd.toDF()
>
> eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
> })
>
> The data is written as though the table is not partitioned (so everything
> is written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because
> the data is not following the partition schema, it is not accessible (and
> not partitioned).
>
> Is there a straightforward way to write to partitioned tables using Spark
> SQL?  I understand that the read performance for partitioned data is far
> better - are there other performance improvements that might be better to
> use instead of partitioning?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
Susan,

I did give that a shot -- I'm seeing a number of oddities:

(1) 'Partition By' appears only accepts alphanumeric lower case fields.  It
will work for 'machinename', but not 'machineName' or 'machine_name'.
(2) When partitioning with maps included in the data I get odd string
conversion issues
(3) When partitioning without maps I see frequent out of memory issues

I'll update this email when I've got a more concrete example of problems.

Regards,

Bryan Jeffrey



On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang  wrote:

> Have you tried partitionBy?
>
> Something like
>
> hiveWindowsEvents.foreachRDD( rdd => {
>   val eventsDataFrame = rdd.toDF()
>   eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
> windows_event_time_bin").saveAsTable("windows_event")
> })
>
>
>
> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
> wrote:
>
>> Hello.
>>
>> I am working to get a simple solution working using Spark SQL.  I am
>> writing streaming data to persistent tables using a HiveContext.  Writing
>> to a persistent non-partitioned table works well - I update the table using
>> Spark streaming, and the output is available via Hive Thrift/JDBC.
>>
>> I create a table that looks like the following:
>>
>> 0: jdbc:hive2://localhost:1> describe windows_event;
>> describe windows_event;
>> +--+-+--+
>> | col_name |  data_type  | comment  |
>> +--+-+--+
>> | target_entity| string  | NULL |
>> | target_entity_type   | string  | NULL |
>> | date_time_utc| timestamp   | NULL |
>> | machine_ip   | string  | NULL |
>> | event_id | string  | NULL |
>> | event_data   | map  | NULL |
>> | description  | string  | NULL |
>> | event_record_id  | string  | NULL |
>> | level| string  | NULL |
>> | machine_name | string  | NULL |
>> | sequence_number  | string  | NULL |
>> | source   | string  | NULL |
>> | source_machine_name  | string  | NULL |
>> | task_category| string  | NULL |
>> | user | string  | NULL |
>> | additional_data  | map  | NULL |
>> | windows_event_time_bin   | timestamp   | NULL |
>> | # Partition Information  | |  |
>> | # col_name   | data_type   | comment  |
>> | windows_event_time_bin   | timestamp   | NULL |
>> +--+-+--+
>>
>>
>> However, when I create a partitioned table and write data using the
>> following:
>>
>> hiveWindowsEvents.foreachRDD( rdd => {
>>   val eventsDataFrame = rdd.toDF()
>>
>> eventsDataFrame.write.mode(SaveMode.Append).saveAsTable("windows_event")
>> })
>>
>> The data is written as though the table is not partitioned (so everything
>> is written to /user/hive/warehouse/windows_event/file.gz.paquet.  Because
>> the data is not following the partition schema, it is not accessible (and
>> not partitioned).
>>
>> Is there a straightforward way to write to partitioned tables using Spark
>> SQL?  I understand that the read performance for partitioned data is far
>> better - are there other performance improvements that might be better to
>> use instead of partitioning?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>
>


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
All,

One issue I'm seeing is that I start the thrift server (for jdbc access)
via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
spark://master:7077 --hiveconf "spark.cores.max=2"

After about 40 seconds the Thrift server is started and available on
default port 1.

I then submit my application - and the application throws the following
error:

Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
with class loader
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see
the next exception for details.
at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)
... 86 more
Caused by: java.sql.SQLException: Another instance of Derby may have
already booted the database /spark/spark-1.4.1/metastore_db.
at
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
Source)
at
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
Source)
... 83 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted
the database /spark/spark-1.4.1/metastore_db.

This also happens if I do the opposite (submit the application first, and
then start the thrift server).

It looks similar to the following issue -- but not quite the same:
https://issues.apache.org/jira/browse/SPARK-9776

It seems like this set of steps works fine if the metadata database is not
yet created - but once it's created this happens every time.  Is this a
known issue? Is there a workaround?

Regards,

Bryan Jeffrey

On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
wrote:

> Susan,
>
> I did give that a shot -- I'm seeing a number of oddities:
>
> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
> It will work for 'machinename', but not 'machineName' or 'machine_name'.
> (2) When partitioning with maps included in the data I get odd string
> conversion issues
> (3) When partitioning without maps I see frequent out of memory issues
>
> I'll update this email when I've got a more concrete example of problems.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
> On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang  wrote:
>
>> Have you tried partitionBy?
>>
>> Something like
>>
>> hiveWindowsEvents.foreachRDD( rdd => {
>>   val eventsDataFrame = rdd.toDF()
>>   eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
>> windows_event_time_bin").saveAsTable("windows_event")
>> })
>>
>>
>>
>> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
>> wrote:
>>
>>> Hello.
>>>
>>> I am working to get a simple solution working using Spark SQL.  I am
>>> writing streaming data to persistent tables using a HiveContext.  Writing
>>> to a persistent non-partitioned table works well - I update the table using
>>> Spark streaming, and the output is available via Hive Thrift/JDBC.
>>>
>>> I create a table that looks like the following:
>>>
>>> 0: jdbc:hive2://localhost:1> describe windows_event;
>>> describe windows_event;
>>> +--+-+--+
>>> | col_name |  data_type  | comment  |
>>> +--+-+--+
>>> | target_entity| string  | NULL |
>>> | target_entity_type   | string  | NULL |
>>> | date_time_utc| timestamp   | NULL |
>>> | machine_ip   | string  | NULL |
>>> | event_id | string  | NULL |
>>> | event_data   | map  | NULL |
>>> | description  | string  | NULL |
>>> | event_record_id  | string  | NULL |
>>> | level| string  | NULL |
>>> | machine_name | string  | NULL |
>>> | sequence_number  | string  | NULL |
>>> | source   | string  | NULL |
>>> | source_machine_name  | string  | NULL |
>>> | task_category| string  | NULL |
>>> | user | string  | NULL |
>>> | additional_data  | map  | NULL |
>>> | windows_event_time_bin   | timestamp   | NULL |
>>> | # Partition Information  | |  |
>>> | # col_name   | data_type   | comment  |
>>> | windows_event_time_bin   | timestamp   | NULL |
>>> +--+-+--+
>>>
>>>
>>> However, when I create a partitioned 

Re: Spark Core Transitive Dependencies

2015-10-28 Thread Deng Ching-Mallete
Hi Furkan,

A few examples of libraries that we include are joda time, hbase libraries
and spark-kafka (for streaming). We use the maven-assembly-plugin to build
our assembly jar, btw.

Thanks,
Deng

On Wed, Oct 28, 2015 at 9:10 PM, Furkan KAMACI 
wrote:

> Hi Deng,
>
> Could you give an example of which libraries you include for your purpose?
>
> Kind Regards,
> Furkan KAMACI
>
> On Wed, Oct 28, 2015 at 4:07 AM, Deng Ching-Mallete 
> wrote:
>
>> Hi,
>>
>> The spark assembly jar already includes the spark core libraries plus
>> their transitive dependencies, so you don't need to include them in your
>> jar. I found it easier to use inclusions instead of exclusions when
>> creating an assembly jar of my spark job so I would recommend going with
>> that.
>>
>> HTH,
>> Deng
>>
>>
>> On Wed, Oct 28, 2015 at 6:20 AM, Furkan KAMACI 
>> wrote:
>>
>>> Hi,
>>>
>>> I use Spark for for its newAPIHadoopRDD method and map/reduce etc.
>>> tasks. When I include it I see that it has many transitive dependencies.
>>>
>>> Which of them I should exclude? I've included the dependency tree of
>>> spark-core. Is there any documentation that explains why they are needed
>>> (maybe all of them are necessary?)
>>>
>>> Kind Regards,
>>> Furkan KAMACI
>>>
>>> PS: Dependency Tree:
>>>
>>> [INFO] +- org.apache.spark:spark-core_2.10:jar:1.4.1:compile
>>> [INFO] |  +- com.twitter:chill_2.10:jar:0.5.0:compile
>>> [INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
>>> [INFO] |  | +-
>>> com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
>>> [INFO] |  | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
>>> [INFO] |  | \- org.objenesis:objenesis:jar:1.2:compile
>>> [INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
>>> [INFO] |  +- org.apache.spark:spark-launcher_2.10:jar:1.4.1:compile
>>> [INFO] |  +- org.apache.spark:spark-network-common_2.10:jar:1.4.1:compile
>>> [INFO] |  +-
>>> org.apache.spark:spark-network-shuffle_2.10:jar:1.4.1:compile
>>> [INFO] |  +- org.apache.spark:spark-unsafe_2.10:jar:1.4.1:compile
>>> [INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
>>> [INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
>>> [INFO] |  |  \- org.apache.curator:curator-framework:jar:2.4.0:compile
>>> [INFO] |  | \- org.apache.curator:curator-client:jar:2.4.0:compile
>>> [INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
>>> [INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
>>> [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>> [INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.6.6:compile
>>> [INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.6.6:compile
>>> [INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
>>> [INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
>>> [INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
>>> [INFO] |  +- commons-net:commons-net:jar:2.2:compile
>>> [INFO] |  +-
>>> org.spark-project.akka:akka-remote_2.10:jar:2.3.4-spark:compile
>>> [INFO] |  |  +-
>>> org.spark-project.akka:akka-actor_2.10:jar:2.3.4-spark:compile
>>> [INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
>>> [INFO] |  |  +-
>>> org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
>>> [INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
>>> [INFO] |  +-
>>> org.spark-project.akka:akka-slf4j_2.10:jar:2.3.4-spark:compile
>>> [INFO] |  +- org.scala-lang:scala-library:jar:2.10.4:compile
>>> [INFO] |  +- org.json4s:json4s-jackson_2.10:jar:3.2.10:compile
>>> [INFO] |  |  \- org.json4s:json4s-core_2.10:jar:3.2.10:compile
>>> [INFO] |  | +- org.json4s:json4s-ast_2.10:jar:3.2.10:compile
>>> [INFO] |  | \- org.scala-lang:scalap:jar:2.10.0:compile
>>> [INFO] |  |\- org.scala-lang:scala-compiler:jar:2.10.0:compile
>>> [INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
>>> [INFO] |  |  \- asm:asm:jar:3.1:compile
>>> [INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
>>> [INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
>>> [INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
>>> [INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
>>> [INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
>>> [INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
>>> [INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
>>> [INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
>>> [INFO] |  +-
>>> com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
>>> [INFO] |  |  +-
>>> com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
>>> [INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
>>> [INFO] |  +-
>>> com.fasterxml.jackson.module:jackson-module-scala_2.10:jar:2.4.4:compile
>>> [INFO] |  |  \- org.scala-lang:scala-reflect:jar:2.10.4:compile
>>> [INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
>>> [INFO] |  +- 

NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-28 Thread Zhang, Jingyu
It is not a problem to use JavaRDD.cache() for 200M data (all Objects read
form Json Format). But when I try to use DataFrame.cache(), It shown
exception in below.

My machine can cache 1 G data in Avro format without any problem.

15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms

15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
27.832369 ms

15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)

java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
SQLContext.scala:500)

at
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
SQLContext.scala:500)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.TraversableLike$$anonfun$map$1.apply(
TraversableLike.scala:244)

at scala.collection.IndexedSeqOptimized$class.foreach(
IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
SQLContext.scala:500)

at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
SQLContext.scala:498)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:127)

at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
InMemoryColumnarTableScan.scala:120)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)

at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
localhost): java.lang.NullPointerException

at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)


Thanks,


Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: How to implement zipWithIndex as a UDF?

2015-10-28 Thread Benyi Wang
Thanks Michael.

I should make my question more clear. This is the data type:

StructType(Seq(
   StructField("uid", LongType),
   StructField("infos", ArrayType(
  StructType(Seq(
 StructType("cid", LongType),
 StructType("cnt", LongType)
  ))
   ))
))

I want to explode “infos” to get three columns “uid”, “index” and “info”.
The only way I figured out is to explode the whole nested data type into a
tuple of primary data types like this:

df.explode("infos") { (r: Row) =>
val arr = row.getSeq[Row](0)
arr.zipWithIndex.map {
  case (info, idx) =>
(idx, info.getLong(0), info.getLong(1))
}

What I really want is to keep info as a struct type.

df.explode("infos") { (r: Row) =>
val arr = row.getSeq[Row](0)
arr.zipWithIndex.map {
  case (info, idx) =>
(idx, info)
}

Unfortunately the current DataFrame API doesn’t support it: the explode
methods try to figure out the schema for the exploded data, but could not
handle Any or Row type in reflection, and the caller has no way to pass
through a schema for the exploded data.
​

On Fri, Oct 23, 2015 at 12:44 PM, Michael Armbrust 
wrote:

> The user facing type mapping is documented here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
>
> On Fri, Oct 23, 2015 at 12:10 PM, Benyi Wang 
> wrote:
>
>> If I have two columns
>>
>> StructType(Seq(
>>   StructField("id", LongType),
>>   StructField("phones", ArrayType(StringType
>>
>> I want to add index for “phones” before I explode it.
>>
>> Can this be implemented as GenericUDF?
>>
>> I tried DataFrame.explode. It worked for simple types like string, but I
>> could not figure out how to handle a nested type like StructType.
>>
>> Can somebody shed a light?
>>
>> I’m using spark 1.5.1.
>> ​
>>
>
>


newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-28 Thread Andy Davidson
Hi



I just created new cluster using the spark-c2 script from the
spark-1.5.1-bin-hadoop2.6 distribution. The master and slaves seem to be up
and running. I am having a heck of time figuring out how to submit apps. As
a test I compile the sample JavaSparkPi example. I have copied my jar file
to the master and want to run the application in cluster mode. My real app
will take a long time to complete. I do not want to wait around.



Any idea what the issue is?



Kind regards



Andy





http://spark.apache.org/docs/latest/submitting-applications.html


The following command works fine on my Mac, how ever when I run it on my
master I get the following warning. The app works correctly

[ec2-user@ip-172-31-29-60 ~]$ $SPARK_ROOT/bin/spark-submit --class
org.apache.spark.examples.JavaSparkPi --master local[4]
sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee pi.out

15/10/28 21:07:10 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:07:11 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



Adding ‹num-exactors I still get the same warning. The app works correctly



 $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master local[4] --num-executors 4 sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee
pi.numExecutor4.out

15/10/28 21:09:41 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:09:41 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



I also tried variations on [ec2-user@ip-172-31-29-60 ~]$
$SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
--master spark://172.31.29.60:7077 --num-executors 4
sparkPi-1.0-SNAPSHOT.jar

15/10/28 21:14:48 INFO spark.SparkContext: Running Spark version 1.5.1

15/10/28 21:14:48 WARN spark.SparkConf:

SPARK_WORKER_INSTANCES was detected (set to '1').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with --num-executors to specify the number of executors

 - Or set SPARK_EXECUTOR_INSTANCES

 - spark.executor.instances to configure the number of instances in the
spark config.



15/10/28 21:14:48 INFO spark.SecurityManager: Changing view acls to:
ec2-user

15/10/28 21:14:48 INFO spark.SecurityManager: Changing modify acls to:
ec2-user

15/10/28 21:14:48 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(ec2-user); users with modify permissions: Set(ec2-user)

15/10/28 21:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started

15/10/28 21:14:49 INFO Remoting: Starting remoting

15/10/28 21:14:50 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@172.31.29.60:52405]

15/10/28 21:14:50 INFO util.Utils: Successfully started service
'sparkDriver' on port 52405.

15/10/28 21:14:50 INFO spark.SparkEnv: Registering MapOutputTracker

15/10/28 21:14:50 INFO spark.SparkEnv: Registering BlockManagerMaster

15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
/mnt/spark/blockmgr-e6197751-e3a2-40b7-8228-3512ffe2b69d

15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
/mnt2/spark/blockmgr-9547279f-c011-44e2-9c6e-295f6b36b084

15/10/28 21:14:50 INFO storage.MemoryStore: MemoryStore started with
capacity 530.0 MB

15/10/28 21:14:50 INFO spark.HttpFileServer: HTTP File server directory is
/mnt/spark/spark-60c478cd-7adb-4d92-96e4-aad52eaaf8bf/httpd-71c01fdc-0e5f-4a
73-83f0-bac856bc3548

15/10/28 21:14:50 INFO spark.HttpServer: Starting HTTP Server

15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT

15/10/28 21:14:50 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:48262

15/10/28 21:14:50 INFO util.Utils: Successfully started service 'HTTP file
server' on port 48262.

15/10/28 21:14:50 INFO spark.SparkEnv: Registering OutputCommitCoordinator

15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT

15/10/28 21:14:50 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040

15/10/28 21:14:50 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.

15/10/28 21:14:50 INFO ui.SparkUI: Started SparkUI at
http://ec2-54-215-207-132.us-west-1.compute.amazonaws.com:4040

15/10/28 21:14:50 INFO spark.SparkContext: Added JAR
file:/home/ec2-user/sparkPi-1.0-SNAPSHOT.jar at
http://172.31.29.60:48262/jars/sparkPi-1.0-SNAPSHOT.jar with timestamp
1446066890783

15/10/28 21:14:50 WARN metrics.MetricsSystem: Using default name
DAGScheduler for source because spark.app.id is 

Re: newbie trouble submitting java app to AWS cluster I created using spark-ec2 script from spark-1.5.1-bin-hadoop2.6 distribution

2015-10-28 Thread Andy Davidson
I forgot to mention. I do not have a preference for the cluster manager. I
choose the spark-1.5.1-bin-hadoop2.6 distribution because I want to use
hdfs. I assumed this distribution would use yarn.

Thanks

Andy

From:  Andrew Davidson 
Date:  Wednesday, October 28, 2015 at 2:37 PM
To:  "user@spark.apache.org" 
Subject:  newbie trouble submitting java app to AWS cluster I created using
spark-ec2  script from spark-1.5.1-bin-hadoop2.6 distribution

> Hi
> 
> 
> 
> I just created new cluster using the spark-c2 script from the
> spark-1.5.1-bin-hadoop2.6 distribution. The master and slaves seem to be up
> and running. I am having a heck of time figuring out how to submit apps. As a
> test I compile the sample JavaSparkPi example. I have copied my jar file to
> the master and want to run the application in cluster mode. My real app will
> take a long time to complete. I do not want to wait around.
> 
> 
> 
> Any idea what the issue is?
> 
> 
> 
> Kind regards
> 
> 
> 
> Andy
> 
> 
> 
> 
> 
> http://spark.apache.org/docs/latest/submitting-applications.html
> 
> 
> The following command works fine on my Mac, how ever when I run it on my
> master I get the following warning. The app works correctly
> 
> [ec2-user@ip-172-31-29-60 ~]$ $SPARK_ROOT/bin/spark-submit --class
> org.apache.spark.examples.JavaSparkPi --master local[4]
> sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee pi.out
> 
> 15/10/28 21:07:10 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:07:11 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> Adding ‹num-exactors I still get the same warning. The app works correctly
> 
> 
> 
>  $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
> --master local[4] --num-executors 4 sparkPi-1.0-SNAPSHOT.jar 2>&1 | tee
> pi.numExecutor4.out
> 
> 15/10/28 21:09:41 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:09:41 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> I also tried variations on [ec2-user@ip-172-31-29-60 ~]$
> $SPARK_ROOT/bin/spark-submit --class org.apache.spark.examples.JavaSparkPi
> --master spark://172.31.29.60:7077 --num-executors 4 sparkPi-1.0-SNAPSHOT.jar
> 
> 15/10/28 21:14:48 INFO spark.SparkContext: Running Spark version 1.5.1
> 
> 15/10/28 21:14:48 WARN spark.SparkConf:
> 
> SPARK_WORKER_INSTANCES was detected (set to '1').
> 
> This is deprecated in Spark 1.0+.
> 
> 
> 
> Please instead use:
> 
>  - ./spark-submit with --num-executors to specify the number of executors
> 
>  - Or set SPARK_EXECUTOR_INSTANCES
> 
>  - spark.executor.instances to configure the number of instances in the spark
> config.
> 
> 
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: Changing view acls to: ec2-user
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: Changing modify acls to:
> ec2-user
> 
> 15/10/28 21:14:48 INFO spark.SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(ec2-user); users
> with modify permissions: Set(ec2-user)
> 
> 15/10/28 21:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 
> 15/10/28 21:14:49 INFO Remoting: Starting remoting
> 
> 15/10/28 21:14:50 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@172.31.29.60:52405]
> 
> 15/10/28 21:14:50 INFO util.Utils: Successfully started service 'sparkDriver'
> on port 52405.
> 
> 15/10/28 21:14:50 INFO spark.SparkEnv: Registering MapOutputTracker
> 
> 15/10/28 21:14:50 INFO spark.SparkEnv: Registering BlockManagerMaster
> 
> 15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
> /mnt/spark/blockmgr-e6197751-e3a2-40b7-8228-3512ffe2b69d
> 
> 15/10/28 21:14:50 INFO storage.DiskBlockManager: Created local directory at
> /mnt2/spark/blockmgr-9547279f-c011-44e2-9c6e-295f6b36b084
> 
> 15/10/28 21:14:50 INFO storage.MemoryStore: MemoryStore started with capacity
> 530.0 MB
> 
> 15/10/28 21:14:50 INFO spark.HttpFileServer: HTTP File server directory is
> /mnt/spark/spark-60c478cd-7adb-4d92-96e4-aad52eaaf8bf/httpd-71c01fdc-0e5f-4a73
> -83f0-bac856bc3548
> 
> 15/10/28 21:14:50 INFO spark.HttpServer: Starting HTTP Server
> 
> 15/10/28 21:14:50 INFO server.Server: jetty-8.y.z-SNAPSHOT
> 
> 15/10/28 21:14:50 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:48262
> 
> 15/10/28 21:14:50 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan Jeffrey
The second issue I'm seeing is an OOM issue when writing partitioned data.
I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive
libraries packaged with Spark.  Spark was compiled using the following:
mvn -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive
-Phive-thriftserver package

Given a case class like the following:

case class HiveWindowsEvent(
 targetEntity: String,
 targetEntityType: String,
 dateTimeUtc: Timestamp,
 eventid: String,
 eventData: Map[String, String],
 description: String,
 eventRecordId: String,
 level: String,
 machineName: String,
 sequenceNumber: String,
 source: String,
 sourceMachineName: String,
 taskCategory: String,
 user: String,
 machineIp: String,
 additionalData: Map[String, String],
 windowseventtimebin: Long
 )

The command to write data works fine (and when queried via Beeline data is
correct):

val hc = new HiveContext(sc)
import hc.implicits._

val partitioner = new HashPartitioner(5)
hiveWindowsEvents.foreachRDD(rdd => {
  val eventsDF = rdd.toDF()
  eventsDF
.write
.mode(SaveMode.Append).saveAsTable("windows_event9")
})

Once I add the partitioning (few partitions - three or less):

val hc = new HiveContext(sc)
import hc.implicits._

val partitioner = new HashPartitioner(5)
hiveWindowsEvents.foreachRDD(rdd => {
  val eventsDF = rdd.toDF()
  eventsDF
.write
.partitionBy("windowseventtimebin")
.mode(SaveMode.Append).saveAsTable("windows_event9")
})

I see the following error when writing to (3) partitions:

15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at
parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at
parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.(newParquet.scala:83)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:530)
at
org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:525)
at

Re: Spark/Kafka Streaming Job Gets Stuck

2015-10-28 Thread Adrian Tanase
Does it work as expected with smaller batch or smaller load? Could it be that 
it's accumulating too many events over 3 minutes?

You could also try increasing the parallelism via repartition to ensure smaller 
tasks that can safely fit in working memory.

Sent from my iPhone

> On 28 Oct 2015, at 17:45, Afshartous, Nick  wrote:
> 
> 
> Hi, we are load testing our Spark 1.3 streaming (reading from Kafka)  job and 
> seeing a problem.  This is running in AWS/Yarn and the streaming batch 
> interval is set to 3 minutes and this is a ten node cluster.
> 
> Testing at 30,000 events per second we are seeing the streaming job get stuck 
> (stack trace below) for over an hour.
> 
> Thanks on any insights or suggestions.
> --
>  Nick
> 
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapPartitionsToPair(JavaDStreamLike.scala:43)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.runStream(StreamingKafkaConsumerDriver.java:125)
> com.wb.analytics.spark.services.streaming.drivers.StreamingKafkaConsumerDriver.main(StreamingKafkaConsumerDriver.java:71)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
> 
> Notice: This communication is for the intended recipient(s) only and may 
> contain confidential, proprietary, legally protected or privileged 
> information of Turbine, Inc. If you are not the intended recipient(s), please 
> notify the sender at once and delete this communication. Unauthorized use of 
> the information in this communication is strictly prohibited and may be 
> unlawful. For those recipients under contract with Turbine, Inc., the 
> information in this communication is subject to the terms and conditions of 
> any applicable contracts or agreements.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html 


Best Regards,

Jerry

> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey  wrote:
> 
> The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
> am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
> packaged with Spark.  Spark was compiled using the following:  mvn 
> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
> -Phive-thriftserver package
> 
> Given a case class like the following:
> 
> case class HiveWindowsEvent(
>  targetEntity: String,
>  targetEntityType: String,
>  dateTimeUtc: Timestamp,
>  eventid: String,
>  eventData: Map[String, String],
>  description: String,
>  eventRecordId: String,
>  level: String,
>  machineName: String,
>  sequenceNumber: String,
>  source: String,
>  sourceMachineName: String,
>  taskCategory: String,
>  user: String,
>  machineIp: String,
>  additionalData: Map[String, String],
>  windowseventtimebin: Long
>  )
> 
> The command to write data works fine (and when queried via Beeline data is 
> correct):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> Once I add the partitioning (few partitions - three or less):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .partitionBy("windowseventtimebin")
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> I see the following error when writing to (3) partitions:
> 
> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at 
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
> at 
> parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
> at 
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
> at 
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
> at 
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
> at 
> parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
> at 
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
> at 
> 

RE: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan
Jerry,

Thank you for the note. It sounds like you were able to get further than I have 
been - any insight? Just a Spark 1.4.1 vs Spark 1.5?

Regards,

Bryan Jeffrey

-Original Message-
From: "Jerry Lam" 
Sent: ‎10/‎28/‎2015 6:29 PM
To: "Bryan Jeffrey" 
Cc: "Susan Zhang" ; "user" 
Subject: Re: Spark -- Writing to Partitioned Persistent Table

Hi Bryan,


Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html


Best Regards,


Jerry


On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey  wrote:


The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
packaged with Spark.  Spark was compiled using the following:  mvn 
-Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
-Phive-thriftserver package


Given a case class like the following:


case class HiveWindowsEvent(
 targetEntity: String,
 targetEntityType: String,
 dateTimeUtc: Timestamp,
 eventid: String,
 eventData: Map[String, String],
 description: String,
 eventRecordId: String,
 level: String,
 machineName: String,
 sequenceNumber: String,
 source: String,
 sourceMachineName: String,
 taskCategory: String,
 user: String,
 machineIp: String,
 additionalData: Map[String, String],
 windowseventtimebin: Long
 )


The command to write data works fine (and when queried via Beeline data is 
correct):


val hc = new HiveContext(sc)
import hc.implicits._


val partitioner = new HashPartitioner(5)
hiveWindowsEvents.foreachRDD(rdd => {
  val eventsDF = rdd.toDF()
  eventsDF
.write
.mode(SaveMode.Append).saveAsTable("windows_event9")
})


Once I add the partitioning (few partitions - three or less):


val hc = new HiveContext(sc)
import hc.implicits._


val partitioner = new HashPartitioner(5)
hiveWindowsEvents.foreachRDD(rdd => {
  val eventsDF = rdd.toDF()
  eventsDF
.write
.partitionBy("windowseventtimebin")
.mode(SaveMode.Append).saveAsTable("windows_event9")
})


I see the following error when writing to (3) partitions:


15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 

How to check whether my Spark Jobs are palatalized or not

2015-10-28 Thread Vinoth Sankar
Hi,

I'm reading N(mostly in thousands) no of files and filtering it through
Spark based on some criteria. Running Spark Application with two Workers(4
cores each). I enforced parallelism by giving
*sparkContext.parallelize(fileList*) in my java code, but didn't any
performance improvement. And i'm always getting "Active Jobs" as 1 in Spark
UI. Am I missing anything.? How do I Check whether my Spark Jobs are
paralleled or not ?

Regards
Vinoth Sankar


Re: [Spark Streaming] Why are some uncached RDDs are growing?

2015-10-28 Thread Tathagata Das
UpdateStateByKey automatically caches its RDDs.

On Tue, Oct 27, 2015 at 8:05 AM, diplomatic Guru 
wrote:

>
> Hello All,
>
> When I checked my running Stream job on WebUI, I can see that some RDDs
> are being listed that were not requested to be cached. What more is that
> they are growing! I've not asked them to be cached. What are they? Are they
> the state (UpdateStateByKey)?
>
> Only the rows in white are being requested to be cached. But where are the
> RDDs  that are highlighted in yellow are from?
>
>
>
> ​
>


Mllib explain feature for tree ensembles

2015-10-28 Thread Eugen Cepoi
Hey,

Is there some kind of "explain" feature implemented in mllib for the
algorithms based on tree ensembles?
Some method to which you would feed in a single feature vector and it would
return/print what features contributed to the decision or how much each
feature contributed "negatively" and "positively" to the decision.

This can be very useful to debug a model on some specific samples and for
feature engineering.

Thanks,
Eugen


Re: Mllib explain feature for tree ensembles

2015-10-28 Thread Yanbo Liang
Spark ML/MLlib has provided featureImportances

to
estimate the importance of each feature.

2015-10-28 18:29 GMT+08:00 Eugen Cepoi :

> Hey,
>
> Is there some kind of "explain" feature implemented in mllib for the
> algorithms based on tree ensembles?
> Some method to which you would feed in a single feature vector and it
> would return/print what features contributed to the decision or how much
> each feature contributed "negatively" and "positively" to the decision.
>
> This can be very useful to debug a model on some specific samples and for
> feature engineering.
>
> Thanks,
> Eugen
>


Re: Mllib explain feature for tree ensembles

2015-10-28 Thread Eugen Cepoi
I guess I will have to upgrade to spark 1.5, thanks!

2015-10-28 11:50 GMT+01:00 Yanbo Liang :

> Spark ML/MLlib has provided featureImportances
> 
>  to
> estimate the importance of each feature.
>
> 2015-10-28 18:29 GMT+08:00 Eugen Cepoi :
>
>> Hey,
>>
>> Is there some kind of "explain" feature implemented in mllib for the
>> algorithms based on tree ensembles?
>> Some method to which you would feed in a single feature vector and it
>> would return/print what features contributed to the decision or how much
>> each feature contributed "negatively" and "positively" to the decision.
>>
>> This can be very useful to debug a model on some specific samples and for
>> feature engineering.
>>
>> Thanks,
>> Eugen
>>
>
>


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

I think they fixed some memory issues in 1.4 for the partition table 
implementation. 1.5 does much better in terms of executor memory usage for 
generating partition tables. However, if your table has over some thousand of 
partitions, reading the partition could be challenging. it takes awhile to 
initialize the partition table and it requires a lot of memory from the driver. 
I would not use it if the number of partition go over a few hundreds. 

Hope this help,

Jerry

Sent from my iPhone

> On 28 Oct, 2015, at 6:33 pm, Bryan  wrote:
> 
> Jerry,
> 
> Thank you for the note. It sounds like you were able to get further than I 
> have been - any insight? Just a Spark 1.4.1 vs Spark 1.5?
> 
> Regards,
> 
> Bryan Jeffrey
> From: Jerry Lam
> Sent: ‎10/‎28/‎2015 6:29 PM
> To: Bryan Jeffrey
> Cc: Susan Zhang; user
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
> 
> Hi Bryan,
> 
> Did you read the email I sent few days ago. There are more issues with 
> partitionBy down the road: 
> https://www.mail-archive.com/user@spark.apache.org/msg39512.html
> 
> Best Regards,
> 
> Jerry
> 
>> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey  wrote:
>> 
>> The second issue I'm seeing is an OOM issue when writing partitioned data.  
>> I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive 
>> libraries packaged with Spark.  Spark was compiled using the following:  mvn 
>> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
>> -Phive-thriftserver package
>> 
>> Given a case class like the following:
>> 
>> case class HiveWindowsEvent(
>>  targetEntity: String,
>>  targetEntityType: String,
>>  dateTimeUtc: Timestamp,
>>  eventid: String,
>>  eventData: Map[String, String],
>>  description: String,
>>  eventRecordId: String,
>>  level: String,
>>  machineName: String,
>>  sequenceNumber: String,
>>  source: String,
>>  sourceMachineName: String,
>>  taskCategory: String,
>>  user: String,
>>  machineIp: String,
>>  additionalData: Map[String, String],
>>  windowseventtimebin: Long
>>  )
>> 
>> The command to write data works fine (and when queried via Beeline data is 
>> correct):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> Once I add the partitioning (few partitions - three or less):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .partitionBy("windowseventtimebin")
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> I see the following error when writing to (3) partitions:
>> 
>> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
>> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>> at 
>> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>> at 
>> parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
>> at 
>> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
>> at 
>> 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Yana Kadiyska
For this issue in particular ( ERROR XSDB6: Another instance of Derby may
have already booted the database /spark/spark-1.4.1/metastore_db) -- I
think it depends on where you start your application and HiveThriftserver
from. I've run into a similar issue running a driver app first, which would
create a directory called metastore_db. If I then try to start SparkShell
from the same directory, I will see this exception. So it is like
SPARK-9776. It's not so much that the two are in the same process (as the
bug resolution states) I think you can't run 2 drivers which start a
HiveConext from the same directory.


On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey 
wrote:

> All,
>
> One issue I'm seeing is that I start the thrift server (for jdbc access)
> via the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master
> spark://master:7077 --hiveconf "spark.cores.max=2"
>
> After about 40 seconds the Thrift server is started and available on
> default port 1.
>
> I then submit my application - and the application throws the following
> error:
>
> Caused by: java.sql.SQLException: Failed to start database 'metastore_db'
> with class loader
> org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721,
> see the next exception for details.
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
> ... 86 more
> Caused by: java.sql.SQLException: Another instance of Derby may have
> already booted the database /spark/spark-1.4.1/metastore_db.
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
> Source)
> at
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown
> Source)
> ... 83 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted
> the database /spark/spark-1.4.1/metastore_db.
>
> This also happens if I do the opposite (submit the application first, and
> then start the thrift server).
>
> It looks similar to the following issue -- but not quite the same:
> https://issues.apache.org/jira/browse/SPARK-9776
>
> It seems like this set of steps works fine if the metadata database is not
> yet created - but once it's created this happens every time.  Is this a
> known issue? Is there a workaround?
>
> Regards,
>
> Bryan Jeffrey
>
> On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey 
> wrote:
>
>> Susan,
>>
>> I did give that a shot -- I'm seeing a number of oddities:
>>
>> (1) 'Partition By' appears only accepts alphanumeric lower case fields.
>> It will work for 'machinename', but not 'machineName' or 'machine_name'.
>> (2) When partitioning with maps included in the data I get odd string
>> conversion issues
>> (3) When partitioning without maps I see frequent out of memory issues
>>
>> I'll update this email when I've got a more concrete example of problems.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>>
>> On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang 
>> wrote:
>>
>>> Have you tried partitionBy?
>>>
>>> Something like
>>>
>>> hiveWindowsEvents.foreachRDD( rdd => {
>>>   val eventsDataFrame = rdd.toDF()
>>>   eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
>>> windows_event_time_bin").saveAsTable("windows_event")
>>> })
>>>
>>>
>>>
>>> On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey 
>>> wrote:
>>>
 Hello.

 I am working to get a simple solution working using Spark SQL.  I am
 writing streaming data to persistent tables using a HiveContext.  Writing
 to a persistent non-partitioned table works well - I update the table using
 Spark streaming, and the output is available via Hive Thrift/JDBC.

 I create a table that looks like the following:

 0: jdbc:hive2://localhost:1> describe windows_event;
 describe windows_event;
 +--+-+--+
 | col_name |  data_type  | comment  |
 +--+-+--+
 | target_entity| string  | NULL |
 | target_entity_type   | string  | NULL |
 | date_time_utc| timestamp   | NULL |
 | machine_ip   | string  | NULL |
 | event_id | string  | NULL |
 | event_data   | map  | NULL |
 | description  | string  | NULL |
 | event_record_id  | string  | NULL |
 | level| string   

RE: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Bryan
Yana,

My basic use-case is that I want to process streaming data, and publish it to a 
persistent spark table. After that I want to make the published data (results) 
available via JDBC and spark SQL to drive a web API. That would seem to require 
two drivers starting separate HiveContexts (one for sparksql/jdbc, one for 
streaming)

Is there a way to share a hive context between the driver for the thrift spark 
SQL instance and the streaming spark driver? A better method to do this?

An alternate option might be to create the table in two separate metastores and 
simply use the same storage location for the data. That seems very hacky 
though, and likely to result in maintenance issues.

Regards,

Bryan Jeffrey

-Original Message-
From: "Yana Kadiyska" 
Sent: ‎10/‎28/‎2015 8:32 PM
To: "Bryan Jeffrey" 
Cc: "Susan Zhang" ; "user" 
Subject: Re: Spark -- Writing to Partitioned Persistent Table

For this issue in particular ( ERROR XSDB6: Another instance of Derby may have 
already booted the database /spark/spark-1.4.1/metastore_db) -- I think it 
depends on where you start your application and HiveThriftserver from. I've run 
into a similar issue running a driver app first, which would create a directory 
called metastore_db. If I then try to start SparkShell from the same directory, 
I will see this exception. So it is like SPARK-9776. It's not so much that the 
two are in the same process (as the bug resolution states) I think you can't 
run 2 drivers which start a HiveConext from the same directory.




On Wed, Oct 28, 2015 at 4:10 PM, Bryan Jeffrey  wrote:

All,


One issue I'm seeing is that I start the thrift server (for jdbc access) via 
the following: /spark/spark-1.4.1/sbin/start-thriftserver.sh --master 
spark://master:7077 --hiveconf "spark.cores.max=2"


After about 40 seconds the Thrift server is started and available on default 
port 1.


I then submit my application - and the application throws the following error:  


Caused by: java.sql.SQLException: Failed to start database 'metastore_db' with 
class loader 
org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@6a552721, see the 
next exception for details.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
... 86 more
Caused by: java.sql.SQLException: Another instance of Derby may have already 
booted the database /spark/spark-1.4.1/metastore_db.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
Source)
... 83 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
database /spark/spark-1.4.1/metastore_db.


This also happens if I do the opposite (submit the application first, and then 
start the thrift server).


It looks similar to the following issue -- but not quite the same: 
https://issues.apache.org/jira/browse/SPARK-9776


It seems like this set of steps works fine if the metadata database is not yet 
created - but once it's created this happens every time.  Is this a known 
issue? Is there a workaround?


Regards,


Bryan Jeffrey



On Wed, Oct 28, 2015 at 3:13 PM, Bryan Jeffrey  wrote:

Susan,


I did give that a shot -- I'm seeing a number of oddities:


(1) 'Partition By' appears only accepts alphanumeric lower case fields.  It 
will work for 'machinename', but not 'machineName' or 'machine_name'.
(2) When partitioning with maps included in the data I get odd string 
conversion issues
(3) When partitioning without maps I see frequent out of memory issues


I'll update this email when I've got a more concrete example of problems.


Regards,


Bryan Jeffrey






On Wed, Oct 28, 2015 at 1:33 PM, Susan Zhang  wrote:

Have you tried partitionBy? 


Something like


hiveWindowsEvents.foreachRDD( rdd => {
  val eventsDataFrame = rdd.toDF()
  
eventsDataFrame.write.mode(SaveMode.Append).partitionBy("windows_event_time_bin").saveAsTable("windows_event")
})







On Wed, Oct 28, 2015 at 7:41 AM, Bryan Jeffrey  wrote:

Hello.


I am working to get a simple solution working using Spark SQL.  I am writing 
streaming data to persistent tables using a HiveContext.  Writing to a 
persistent non-partitioned table works well - I update the table using Spark 
streaming, and the output is available via Hive Thrift/JDBC.  


I create a table that looks like the following:


0: jdbc:hive2://localhost:1> describe 

Collect Column as Array in Grouped DataFrame

2015-10-28 Thread saurfang
Sorry if this functionality already exists or has been asked before, but I'm
looking for an aggregate function in SparkSQL that allows me to collect a
column into array per group in a grouped dataframe.

For example, if I have the following table
user, score

user1, 1
user2, 2
user1, 3
user2, 4
user1, 5

I want to produce a dataframe that is like

user1, [1, 3]
user2, [2, 4, 5]

(possibly via select collect(score) from table group by user)


I realize I can probably implement this as a UDAF but just want to double
check if such thing already exists. If not, would there be interests to have
this in SparkSQL?

To give some context, I am trying to do a cogroup on datasets and then
persist as parquet but want to take advantage of Tungsten. So the plan is
to, collapse row into key, value of struct => group by key and collect value
as array[struct] => outer join dataframes.

p.s. Here are some resources I have found so far but all of them concerns
top K per key instead:
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-way-to-get-top-K-values-per-key-in-key-value-RDD-td20370.html
and
https://issues.apache.org/jira/browse/SPARK-5954 (where Reynold mentioned
this could be an API in dataframe)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Collect-Column-as-Array-in-Grouped-DataFrame-tp25223.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org