Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread Skanda
Hi

Storing states/intermediate data in realtime processing depends on how much
throughput/latency your application requires. There are lot of technologies
that help you build this realtime datastore. Some examples include HBase,
Memsql, etc or in some cases an RDBMS like MySQL itself. This is a
judgement that you will have to make.

Regards,
Skanda

On Sun, Mar 13, 2016 at 11:23 PM, trung kien  wrote:

> Thanks all for actively sharing your experience.
>
> @Chris: using something like Redis is something I am trying to figure out.
> I have  a lots of transactions, so I couldn't trigger update event for
> every single transaction.
> I'm looking at Spark Streaming because it provide batch processing (e.g I
> can update the cache every 5 seconds). In addition Spark can scale pretty
> well and I don't have to worry about losing data.
>
> Now having the cache with following information:
>  * Date
>  * BranchID
>  * ProductID
>  TotalQty
>  TotalDollar
>
> * is key, note that I have history data as well (byday).
>
> Now I want to use zeppelin for querying again the cache (while the cache
> is updating).
> I don't need the Zeppelin update automatically (I can hit the run button
> myself :) )
> Just curious if parquet is the right solution for us?
>
>
>
> On Sun, Mar 13, 2016 at 3:25 PM, Chris Miller 
> wrote:
>
>> Cool! Thanks for sharing.
>>
>>
>> --
>> Chris Miller
>>
>> On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist  wrote:
>>
>>> Below is a link to an example which Silvio Fiorito put together
>>> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
>>> I think the original thread was pack in early November 2015, subject: Real
>>> time chart in Zeppelin, if you care to try to find it.
>>>
>>> https://gist.github.com/granturing/a09aed4a302a7367be92
>>>
>>> HTH.
>>>
>>> -Todd
>>>
>>> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller 
>>> wrote:
>>>
>>>> I'm pretty new to all of this stuff, so bare with me.
>>>>
>>>> Zeppelin isn't really intended for realtime dashboards as far as I
>>>> know. Its reporting features (tables, graphs, etc.) are more for displaying
>>>> the results from the output of something. As far as I know, there isn't
>>>> really anything to "watch" a dataset and have updates pushed to the
>>>> Zeppelin UI.
>>>>
>>>> As for Spark, unless you're doing a lot of processing that you didn't
>>>> mention here, I don't think it's a good fit just for this.
>>>>
>>>> If it were me (just off the top of my head), I'd just build a simple
>>>> web service that uses websockets to push updates to the client which could
>>>> then be used to update graphs, tables, etc. The data itself -- that is, the
>>>> accumulated totals -- you could store in something like Redis. When an
>>>> order comes in, just add that quantity and price to the existing value and
>>>> trigger your code to push out an updated value to any clients via the
>>>> websocket. You could use something like a Redis pub/sub channel to trigger
>>>> the web app to notify clients of an update.
>>>>
>>>> There are about 5 million other ways you could design this, but I would
>>>> just keep it as simple as possible. I just threw one idea out...
>>>>
>>>> Good luck.
>>>>
>>>>
>>>> --
>>>> Chris Miller
>>>>
>>>> On Sat, Mar 12, 2016 at 6:58 PM, trung kien  wrote:
>>>>
>>>>> Thanks Chris and Mich for replying.
>>>>>
>>>>> Sorry for not explaining my problem clearly.  Yes i am talking about a
>>>>> flexibke dashboard when mention Zeppelin.
>>>>>
>>>>> Here is the problem i am having:
>>>>>
>>>>> I am running a comercial website where we selle many products and we
>>>>> have many branchs in many place. We have a lots of realtime transactions
>>>>> and want to anaylyze it in realtime.
>>>>>
>>>>> We dont want every time doing analytics we have to aggregate every
>>>>> single transactions ( each transaction have BranchID, ProductID, Qty,
>>>>> Price). So, we maintain intermediate data which contains : BranchID,
>>>>> ProducrID, totalQty, totalDollar
>>>>>

Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
Hi

I was using the wrong version of the  spark-hive jar. I downloaded the
right version of the jar from the cloudera repo and it works now.

Thanks,
Skanda

On Fri, May 22, 2015 at 2:36 PM, Skanda  wrote:

> Hi All,
>
> I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any
> luck solving the issue?
>
> Exception:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Unsupported language features in query: select * from
> everest_marts_test.hive_ql_test where daily_partition=20150101
> TOK_QUERY 1, 0,18, 14
>   TOK_FROM 1, 4,8, 14
> TOK_TABREF 1, 6,8, 14
>   TOK_TABNAME 1, 6,8, 14
> everest_marts_test 1, 6,6, 14
> hive_ql_test 1, 8,8, 33
>   TOK_INSERT 0, -1,18, 0
> TOK_DESTINATION 0, -1,-1, 0
>   TOK_DIR 0, -1,-1, 0
> TOK_TMP_FILE 0, -1,-1, 0
> TOK_SELECT 0, 0,2, 0
>   TOK_SELEXPR 0, 2,2, 0
> TOK_ALLCOLREF 0, 2,2, 0
> TOK_WHERE 1, 10,18, 68
>   TOK_FUNCTION 1, 12,18, 68
> in 1, 14,14, 68
> TOK_TABLE_OR_COL 1, 12,12, 52
>   daily_partition 1, 12,12, 52
> 20150101 1, 16,18, 72
>
> scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
> 20150101 :
> 20150101 1, 16,18, 72
> " +
>
> org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
>   ;
> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
> at
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.par

Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
ntext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at sparkwork.HiveQl.main(HiveQl.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Regards,
Skanda

On Wed, Jan 28, 2015 at 4:59 PM, Pala M Muthaia  wrote:

> By typo i meant that the column name had a spelling error: 
> conversion_aciton_id.
> It should have been conversion_action_id.
>
> No, we tried it a few times, and we didn't have + signs or anything like
> that - we tried it with columns of different types too - string, double etc
> and saw the same error.
>
>
>
>
> On Tue, Jan 20, 2015 at 8:59 PM, yana  wrote:
>
>> I run Spark 1.2 and do not have this issue. I dont believe the Hive
>> version would matter(I run spark1.2 with Hive12 profile) but that would be
>> a good test. The last version I tried for you was a cdh4.2 spark1.2
>> prebuilt without pointing to an external hive install(in fact I tried it on
>> a machine w/ no other hadoop/hive jars). So download, unzip and run spark
>> shell. I dont believe it's a bug personally. When you say typo do you mean
>> there was indeed token Plus in your string? If you remove that token what
>> stacktrace do you get?
>>
>>
>> Sent on the new Sprint Network from my Samsung Galaxy S®4.
>>
>>
>>  Original message 
>> From: Pala M Muthaia
>> Date:01/19/2015 8:26 PM (GMT-05:00)
>> To: Yana Kadiyska
>> Cc: "Cheng, Hao" ,user@spark.apache.org
>> Subject: Re: Issues with constants in Spark HiveQL queries
>>
>> Yes we tried the master branch (sometime last week) and there was no
>> issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the
>> final release branch for Spark 1.2?
>>
>> If so, a patch needs to be created or back-ported from master?
>>
>> (Yes the obvious typo in the column name was introduced in this email
>> only, so is irrelevant to the error).
>>
>> On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska 
>> wrote:
>>
>>> yeah, that makes sense. Pala, are you on a prebuild version of Spark --
>>> I just tried the CDH4 prebuilt...Here is what I get for the = token:
>>>
>>> [image: Inline image 1]
>>>
>>> The literal type shows as 290, not 291, and 290 is numeric. According to
>>> this
>>> http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser
>>> 291 is token PLUS which is really weird...
>>>
>>>
>>> On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao  wrote:
>>>
>>>>  The log showed it failed in parsing, so the typo stuff shouldn’t be
>>>> the root cause. BUT I couldn’t reproduce that with master branch.
>>>>
>>>>
>>>>
>>>> I did the test as follow:
>>>>
>>>>
>>>>
>>>> sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console
>>>>
>>>> scala> sql(“SELECT user_id FROM actions where
>>>> conversion_aciton_id=20141210”)
>>>>
>>>>
>>>>
>>>> sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console
>>>>
>>>> scala> sql(“SELECT user_id FROM actions where
>>>> conversion_aciton_id=20141210”)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
>>>> *Sent:* Wednesday, January 14, 2015 11:12 PM
>>>> *To:* Pala M Muthaia
>>>> *Cc:* user@spark.apache.org
>>>> *Subject:* Re: Issues with constants in Spark HiveQL queries
>>>>
>>>>
>>>>
>>>> Just a guess but what is the type of conversion_aciton_id? I do
>>>> queries over an epoch all the time with no issues(where epoch's type is
>>>> bigint). 

Re: RE: Can't access remote Hive table from spark

2015-02-05 Thread Skanda
Hi,

My spark-env.sh has the following entries with respect to classpath:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/lib/hive/lib/*:/etc/hive/conf/

-Skanda

On Sun, Feb 1, 2015 at 11:45 AM, guxiaobo1982  wrote:

> Hi Skanda,
>
> How do set up your SPARK_CLASSPATH?
>
> I add the following line to my SPARK_HOME/conf/spark-env.sh , and still
> got the same error.
>
> export SPARK_CLASSPATH=${SPARK_CLASSPATH}:/etc/hive/conf
>
>
> -- Original --
> *From: * "Skanda Prasad";;
> *Send time:* Monday, Jan 26, 2015 7:41 AM
> *To:* ""; "user@spark.apache.org"<
> user@spark.apache.org>;
> *Subject: * RE: Can't access remote Hive table from spark
>
> This happened to me as well, putting hive-site.xml inside conf doesn't
> seem to work. Instead I added /etc/hive/conf to SPARK_CLASSPATH and it
> worked. You can try this approach.
>
> -Skanda
> --
> From: guxiaobo1982 
> Sent: ‎25-‎01-‎2015 13:50
> To: user@spark.apache.org
> Subject: Can't access remote Hive table from spark
>
> Hi,
> I built and started a single node standalone Spark 1.2.0 cluster along
> with a single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the
> Spark and Hive node I can create and query tables inside Hive, and on
> remote machines I can submit the SparkPi example to the Spark master. But
> I failed to run the following example code :
>
> public class SparkTest {
>
> public static void main(String[] args)
>
> {
>
>  String appName= "This is a test application";
>
>  String master="spark://lix1.bh.com:7077";
>
>   SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
>
>  JavaSparkContext sc = new JavaSparkContext(conf);
>
>   JavaHiveContext sqlCtx = new
> org.apache.spark.sql.hive.api.java.JavaHiveContext(sc);
>
>  //sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
>
>  //sqlCtx.sql("LOAD DATA LOCAL INPATH '/opt/spark/examples/src
> /main/resources/kv1.txt' INTO TABLE src");
>
>  // Queries are expressed in HiveQL.
>
> List rows = sqlCtx.sql("FROM src SELECT key, value").collect();
>
> System.out.print("I got " + rows.size() + " rows \r\n");
>
>  sc.close();}
>
> }
>
>
> Exception in thread "main"
> org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found
> src
>
> at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980)
>
> at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
>
> at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(
> HiveMetastoreCatalog.scala:70)
>
> at org.apache.spark.sql.hive.HiveContext$$anon$2.org
> $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(
> HiveContext.scala:253)
>
> at
> org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(
> Catalog.scala:141)
>
> at
> org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(
> Catalog.scala:141)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at
> org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(
> Catalog.scala:141)
>
> at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(
> HiveContext.scala:253)
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(
> Analyzer.scala:143)
>
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(
> Analyzer.scala:138)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:144)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(
> TreeNode.scala:162)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48
> )
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:103)
>
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47
> )
>
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>
> at scala.collection.TraversableOnce$class.toBuffer(
> TraversableOnce.scala:265)
>
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>
> at scala.collection.TraversableOnce$class.toArray(
> Traversab

RE: Can't access remote Hive table from spark

2015-01-25 Thread Skanda Prasad
This happened to me as well, putting hive-site.xml inside conf doesn't seem to 
work. Instead I added /etc/hive/conf to SPARK_CLASSPATH and it worked. You can 
try this approach.

-Skanda

-Original Message-
From: "guxiaobo1982" 
Sent: ‎25-‎01-‎2015 13:50
To: "user@spark.apache.org" 
Subject: Can't access remote Hive table from spark

Hi,
I built and started a single node standalone Spark 1.2.0 cluster along with a 
single node Hive 0.14.0 instance installed by Ambari 1.17.0. On the Spark and 
Hive node I can create and query tables inside Hive, and on remote machines I 
can submit the SparkPi example to the Spark master. But I failed to run the 
following example code :


public class SparkTest {
public static void main(String[] args)
{
String appName= "This is a test application";
String master="spark://lix1.bh.com:7077";
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaHiveContext sqlCtx = new 
org.apache.spark.sql.hive.api.java.JavaHiveContext(sc);
//sqlCtx.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
//sqlCtx.sql("LOAD DATA LOCAL INPATH 
'/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL.
List rows = sqlCtx.sql("FROM src SELECT key, value").collect();
System.out.print("I got " + rows.size() + " rows \r\n");
sc.close();}
}


Exception in thread "main" 
org.apache.hadoop.hive.ql.metadata.InvalidTableException: Table not found src
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:980)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70)
at 
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141)
at 
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$Qu

Re: unable to write SequenceFile using saveAsNewAPIHadoopFile

2015-01-22 Thread Skanda
Yeah it worked like charm!! Thank you!

On Thu, Jan 22, 2015 at 2:28 PM, Sean Owen  wrote:

> First as an aside I am pretty sure you cannot reuse one Text and
> IntWritable object here. Spark does not necessarily finish with one's value
> before the next call(). Although it should not be directly related to the
> serialization problem I suspect it is. Your function is not serializable
> since it contains references to these cached writables. I think removing
> them fixes both problems.
> On Jan 22, 2015 9:42 AM, "Skanda"  wrote:
>
>> Hi All,
>>
>> I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm
>> getting the following runtime exception:
>>
>> *Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable*
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
>> at org.apache.spark.rdd.RDD.map(RDD.scala:271)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102)
>> at
>> org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45)
>> at XoanonKMeansV2.main(XoanonKMeansV2.java:125)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> *Caused by: java.io.NotSerializableException:
>> org.apache.hadoop.io.IntWritable*
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>> ... 13 more
>>
>> Pls find below the code snippet:
>>
>> joiningKeyPlusPredictedPoint.mapToPair(
>> new PairFunction, Text,
>> IntWritable>() {
>> Text text = new Text();
>> IntWritable intwritable = new IntWritable();
>>
>> @Override
>> public Tuple2 call(
>> Tuple2 tuple) throws
>> Exception {
>> text.set(tuple._1);
>> intwritable.set(tuple._2);
>> return new Tuple2(text,
>> intwritable);
>> }
>> })
>>
>> *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq",
>> Text.class, IntWritable.class, SequenceFileOutputFormat.class);*
>>
>> Regards,
>> Skanda
>>
>


unable to write SequenceFile using saveAsNewAPIHadoopFile

2015-01-22 Thread Skanda
Hi All,

I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm
getting the following runtime exception:

*Exception in thread "main" org.apache.spark.SparkException: Task not
serializable*
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
at
org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45)
at XoanonKMeansV2.main(XoanonKMeansV2.java:125)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
*Caused by: java.io.NotSerializableException:
org.apache.hadoop.io.IntWritable*
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 13 more

Pls find below the code snippet:

joiningKeyPlusPredictedPoint.mapToPair(
new PairFunction, Text,
IntWritable>() {
Text text = new Text();
IntWritable intwritable = new IntWritable();

@Override
public Tuple2 call(
Tuple2 tuple) throws Exception
{
text.set(tuple._1);
intwritable.set(tuple._2);
return new Tuple2(text,
intwritable);
}
})

*.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq",
Text.class, IntWritable.class, SequenceFileOutputFormat.class);*

Regards,
Skanda