Re: Shutdown with streaming driver running in cluster broke master web UI permanently
Sorry I was on vacation for a few days. Yes, it is on. This is what I have in the logs: 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x14dd82e22f70ef1, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from server sessionid 0x24dc5a319b40090, likely server has closed socket, closing socket connection and attempting reconnect 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED 15/06/22 10:44:01 WARN ConnectionStateManager: There are no ConnectionStateListeners registered. 15/06/22 10:44:01 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/06/22 10:44:01 ERROR Master: Leadership has been revoked -- master shutting down. On Thu, Jun 11, 2015 at 8:59 PM, Tathagata Das t...@databricks.com wrote: Do you have the event logging enabled? TD On Thu, Jun 11, 2015 at 11:24 AM, scar0909 scar0...@gmail.com wrote: I have the same problem. i realized that the master spark becomes unresponsive when we kill the leader zookeeper (of course i assigned the leader election task to the zookeeper). please let me know if you have any devlepments. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Code review - Spark SQL command-line client for Cassandra
Yes, just put the Cassandra connector on the Spark classpath and set the connector config properties in the interpreter settings. From: Mohammed Guller Date: Monday, June 22, 2015 at 11:56 AM To: Matthew Johnson, shahid ashraf Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed From: Matthew Johnson [mailto:matt.john...@algomi.com] Sent: Monday, June 22, 2015 2:15 AM To: Mohammed Guller; shahid ashraf Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew From: Mohammed Guller [mailto:moham...@glassbeam.commailto:moham...@glassbeam.com] Sent: 20 June 2015 17:27 To: shahid ashraf Cc: Matthew Johnson; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed From: shahid ashraf [mailto:sha...@trialx.com] Sent: Saturday, June 20, 2015 6:52 AM To: Mohammed Guller Cc: Matthew Johnson; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed From: Matthew Johnson [mailto:matt.john...@algomi.commailto:matt.john...@algomi.com] Sent: Friday, June 19, 2015 2:20 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) privatevoid run(String sqlQuery) { SparkContext scc = new SparkContext(conf); CassandraSQLContext csql = new CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.currentTimeMillis(); LOG.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); LOG.info(SQL results saved); } publicstaticvoid main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery = args[2]; SparkConf conf = new SparkConf(); conf.setAppName(Java Spark SQL); conf.setMaster(sparkMasterUrl); conf.set(spark.cassandra.connection.host, sparkHost); JavaSparkSQL app = new JavaSparkSQL(conf); app.run(sqlQuery, printToConsole); } I can then submit this to Spark with ‘spark-submit’: ./spark-submit --class com.algomi.spark.JavaSparkSQL --master spark://sales3:7077 spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://sales3:7077 sales3 select * from mykeyspace.operationlog It seems to work pretty well, so I’m pretty happy, but wondering why this isn’t common practice (at least I haven’t been able to find much about it on Google) – is there something terrible that I’m missing? Thanks! Matthew
RE: Code review - Spark SQL command-line client for Cassandra
I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed From: Matthew Johnson [mailto:matt.john...@algomi.com] Sent: Monday, June 22, 2015 2:15 AM To: Mohammed Guller; shahid ashraf Cc: user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew From: Mohammed Guller [mailto:moham...@glassbeam.commailto:moham...@glassbeam.com] Sent: 20 June 2015 17:27 To: shahid ashraf Cc: Matthew Johnson; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed From: shahid ashraf [mailto:sha...@trialx.com] Sent: Saturday, June 20, 2015 6:52 AM To: Mohammed Guller Cc: Matthew Johnson; user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed From: Matthew Johnson [mailto:matt.john...@algomi.commailto:matt.john...@algomi.com] Sent: Friday, June 19, 2015 2:20 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) private void run(String sqlQuery) { SparkContext scc = new SparkContext(conf); CassandraSQLContext csql = new CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.currentTimeMillis(); LOG.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); LOG.info(SQL results saved); } public static void main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery = args[2]; SparkConf conf = new SparkConf(); conf.setAppName(Java Spark SQL); conf.setMaster(sparkMasterUrl); conf.set(spark.cassandra.connection.host, sparkHost); JavaSparkSQL app = new JavaSparkSQL(conf); app.run(sqlQuery, printToConsole); } I can then submit this to Spark with ‘spark-submit’: ./spark-submit --class com.algomi.spark.JavaSparkSQL --master spark://sales3:7077 spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://sales3:7077 sales3 select * from mykeyspace.operationlog It seems to work pretty well, so I’m pretty happy, but wondering why this isn’t common practice (at least I haven’t been able to find much about it on Google) – is there something terrible that I’m missing? Thanks! Matthew
RE: Code review - Spark SQL command-line client for Cassandra
Hi Pawan, Looking at the changes for that git pull request, it looks like it just pulls in the dependency (and transitives) for “spark-cassandra-connector”. Since I am having to build Zeppelin myself anyway, would it be ok to just add this myself for the connector for 1.4.0 (as found here http://search.maven.org/#artifactdetails%7Ccom.datastax.spark%7Cspark-cassandra-connector_2.11%7C1.4.0-M1%7Cjar)? What exactly is it that does not currently exist for Spark 1.4? Thanks, Matthew *From:* pawan kumar [mailto:pkv...@gmail.com] *Sent:* 22 June 2015 17:19 *To:* Silvio Fiorito *Cc:* Mohammed Guller; Matthew Johnson; shahid ashraf; user@spark.apache.org *Subject:* Re: Code review - Spark SQL command-line client for Cassandra Hi, Zeppelin has a cassandra-spark-connector built into the build. I have not tried it yet may be you could let us know. https://github.com/apache/incubator-zeppelin/pull/79 To build a Zeppelin version with the *Datastax Spark/Cassandra connector https://github.com/datastax/spark-cassandra-connector* mvn clean package *-Pcassandra-spark-1.x* -Dhadoop.version=xxx -Phadoop-x.x -DskipTests Right now the Spark/Cassandra connector is available for *Spark 1.1* and *Spark 1.2*. Support for *Spark 1.3* is not released yet (*but you can build you own Spark/Cassandra connector version **1.3.0-SNAPSHOT*). Support for *Spark 1.4* does not exist yet Please do not forget to add -Dspark.cassandra.connection.host=xxx to the *ZEPPELIN_JAVA_OPTS*parameter in *conf/zeppelin-env.sh* file. Alternatively you can add this parameter in the parameter list of the *Spark interpreter* on the GUI -Pawan On Mon, Jun 22, 2015 at 9:04 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Yes, just put the Cassandra connector on the Spark classpath and set the connector config properties in the interpreter settings. *From: *Mohammed Guller *Date: *Monday, June 22, 2015 at 11:56 AM *To: *Matthew Johnson, shahid ashraf *Cc: *user@spark.apache.org *Subject: *RE: Code review - Spark SQL command-line client for Cassandra I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com matt.john...@algomi.com] *Sent:* Monday, June 22, 2015 2:15 AM *To:* Mohammed Guller; shahid ashraf *Cc:* user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew *From:* Mohammed Guller [mailto:moham...@glassbeam.com] *Sent:* 20 June 2015 17:27 *To:* shahid ashraf *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed *From:* shahid ashraf [mailto:sha...@trialx.com sha...@trialx.com] *Sent:* Saturday, June 20, 2015 6:52 AM *To:* Mohammed Guller *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com] *Sent:* Friday, June 19, 2015 2:20 AM *To:* user@spark.apache.org *Subject:* Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot
Help optimising Spark SQL query
Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
RE: Help optimising Spark SQL query
Hi James, What version of Spark are you using? In Spark 1.2.2 I had an issue where Spark would report a job as complete but I couldn’t find my results anywhere – I just assumed it was me doing something wrong as I am still quite new to Spark. However, since upgrading to 1.4.0 I have not seen this issue, so might be worth upgrading if you are not already on 1.4. Cheers, Matthew *From:* Lior Chaga [mailto:lio...@taboola.com] *Sent:* 22 June 2015 17:24 *To:* James Aley *Cc:* user *Subject:* Re: Help optimising Spark SQL query Hi James, There are a few configurations that you can try: https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options From my experience, the codegen really boost things up. Just run sqlContext.sql(spark.sql.codegen=true) before you execute your query. But keep in mind that sometimes this is buggy (depending on your query), so compare to results without codegen to be sure. Also you can try changing the default partitions. You can also use dataframes (since 1.3). Not sure they are better than specifying the query in 1.3, but with spark 1.4 there should be an enormous performance improvement in dataframes. Lior On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Support for Windowing and Analytics functions in Spark SQL
Hi, Though the documentation does not explicitly mention support for Windowing and Analytics function in Spark SQL, looks like it is not supported. I tried running a query like Select Lead(column name, 1) over (Partition By column name order by column name) from table name and I got error saying that this feature is unsupported. I tried it in Databricks cloud and that supports Spark 1.4. Can anyone please confirm this ? Regards, Sourav
Re: Code review - Spark SQL command-line client for Cassandra
Hi, Zeppelin has a cassandra-spark-connector built into the build. I have not tried it yet may be you could let us know. https://github.com/apache/incubator-zeppelin/pull/79 To build a Zeppelin version with the *Datastax Spark/Cassandra connector https://github.com/datastax/spark-cassandra-connector* mvn clean package *-Pcassandra-spark-1.x* -Dhadoop.version=xxx -Phadoop-x.x -DskipTests Right now the Spark/Cassandra connector is available for *Spark 1.1* and *Spark 1.2*. Support for *Spark 1.3* is not released yet (*but you can build you own Spark/Cassandra connector version 1.3.0-SNAPSHOT*). Support for *Spark 1.4* does not exist yet Please do not forget to add -Dspark.cassandra.connection.host=xxx to the *ZEPPELIN_JAVA_OPTS*parameter in *conf/zeppelin-env.sh* file. Alternatively you can add this parameter in the parameter list of the *Spark interpreter* on the GUI -Pawan On Mon, Jun 22, 2015 at 9:04 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Yes, just put the Cassandra connector on the Spark classpath and set the connector config properties in the interpreter settings. From: Mohammed Guller Date: Monday, June 22, 2015 at 11:56 AM To: Matthew Johnson, shahid ashraf Cc: user@spark.apache.org Subject: RE: Code review - Spark SQL command-line client for Cassandra I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com matt.john...@algomi.com] *Sent:* Monday, June 22, 2015 2:15 AM *To:* Mohammed Guller; shahid ashraf *Cc:* user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew *From:* Mohammed Guller [mailto:moham...@glassbeam.com] *Sent:* 20 June 2015 17:27 *To:* shahid ashraf *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed *From:* shahid ashraf [mailto:sha...@trialx.com sha...@trialx.com] *Sent:* Saturday, June 20, 2015 6:52 AM *To:* Mohammed Guller *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com] *Sent:* Friday, June 19, 2015 2:20 AM *To:* user@spark.apache.org *Subject:* Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) *private**void* run(String sqlQuery) { SparkContext scc = *new* SparkContext(conf); CassandraSQLContext csql = *new* CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.*currentTimeMillis*(); *LOG*.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); *LOG*.info(SQL results saved); } *public**static**void* main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery =
Re: Multiple executors writing file using java filewriter
Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Web UI vs History Server Bugs
No, what I'm seeing is that while the cluster is running, I can't see the app info after the app is completed. That is to say, when I click on the application name on master:8080, no info is shown. However, when I examine the same file on the History Server, the application information opens fine. On Sat, Jun 20, 2015 at 6:47 AM, Steve Loughran ste...@hortonworks.com wrote: On 17 Jun 2015, at 19:10, jcai jonathon@yale.edu wrote: Hi, I am running this on Spark stand-alone mode. I find that when I examine the web UI, a couple bugs arise: 1. There is a discrepancy between the number denoting the duration of the application when I run the history server and the number given by the web UI (default address is master:8080). I checked more specific details, including task and stage durations (when clicking on the application), and these appear to be the same for both avenues. 2. Sometimes the web UI on master:8080 is unable to display more specific information for an application that has finished (when clicking on the application), even when there is a log file in the appropriate directory. But when the history server is opened, it is able to read this file and output information. There's a JIRA open on the history server caching incomplete work...if you click on the link to a job while it's in progress, you don't get any updates later. does this sound like what you are seeing?
Re: Registering custom metrics
Great, thank you, Silvio. In your experience, is there any way to instument a callback into Coda Hale or the Spark consumers from the metrics sink? If the sink performs some steps once it has received the metrics, I'd like to be able to make the consumers aware of that via some sort of a callback.. On Mon, Jun 22, 2015 at 10:14 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Sorry, replied to Gerard’s question vs yours. See here: Yes, you have to implement your own custom Metrics Source using the Code Hale library. See here for some examples: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala The source gets registered, then you have to configure a sink for it just as the JSON servlet you mentioned. I had done it in the past but don’t have the access to the source for that project anymore unfortunately. Thanks, Silvio On 6/22/15, 9:57 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi Gerard, Have there been any responses? Any insights as to what you ended up doing to enable custom metrics? I'm thinking of implementing a custom metrics sink, not sure how doable that is yet... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry
Any response to this guys? On Fri, Jun 19, 2015 at 2:34 PM, Nitin kak nitinkak...@gmail.com wrote: Any other suggestions guys? On Wed, Jun 17, 2015 at 7:54 PM, Nitin kak nitinkak...@gmail.com wrote: With Sentry, only hive user has the permission for read/write/execute on the subdirectories of warehouse. All the users get translated to hive when interacting with hiveserver2. But i think HiveContext is bypassing hiveserver2. On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote: Try to grant read execute access through sentry. On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com wrote: I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)* *
Re: Does HiveContext connect to HiveServer2?
Hey, I have exactly this question. Did you get an answer to it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-HiveContext-connect-to-HiveServer2-tp22200p23431.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Multiple executors writing file using java filewriter
Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla
Re: Task Serialization Error on DataFrame.foreachPartition
private HTable table; You should declare table variable within apply() method. BTW which hbase release are you using ? I see you implement caching yourself. You can make use of the following HTable method: public void setWriteBufferSize(long writeBufferSize) throws IOException { Cheers On Sun, Jun 21, 2015 at 11:16 PM, Nishant Patel nishant.k.pa...@gmail.com wrote: Hi, Please find code as below. dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { private HTable table; private char ROWKEY_SEPERATOR = '\u'; public BoxedUnit apply(scala.collection.IteratorRow rows) { Configuration config = HBaseConfiguration.create(); config.set( hbase.zookeeper.quorum, ); config.set(hbase.zookeeper.property.clientPort, ???); config.set(zookeeper.znode.parent, ); try { table = new HTable(config, table_name); } catch (Exception e) { throw new RuntimeException(e); } ListPut puts = new ArrayListPut(); try { while (rows.hasNext()) { Row row = rows.next(); MapString, Object map = new HashMapString, Object(); String[] fieldNames = row.schema().fieldNames(); for (int i = 0; i fieldNames.length; i++) { map.put(fieldNames[i].toUpperCase(), row.get(i)); } puts.add(mapToPut(map)); if (puts.size() = 500) { table.put(puts); puts.clear(); } } table.put(puts); } catch (Exception e) { throw new RuntimeException(e); } return BoxedUnit.UNIT; } private Put mapToPut(MapString, Object map) throws IOException { try { Put put = new Put(getRowKey(map)); String value = null; for (String key : map.keySet()) { value = (map.get(key) == null ? : map.get(key).toString()); put.add(Bytes.toBytes(0), Bytes.toBytes(key), Bytes.toBytes(value)); } return put; } catch (Exception e) { e.printStackTrace(); throw e; } } private byte[] getRowKey(MapString, Object map) { StringBuilder builder = new StringBuilder(); return Bytes.toBytes(builder.toString()); } }); Regards, Nishant On Mon, Jun 22, 2015 at 11:08 AM, Ted Yu yuzhih...@gmail.com wrote: Can you show us the code for loading Hive into hbase ? There shouldn't be 'return' statement in that code. Cheers On Jun 20, 2015, at 10:10 PM, Nishant Patel nishant.k.pa...@gmail.com wrote: Hi, I am loading data from Hive table to Hbase after doing some manipulation. I am getting error as 'Task not Serializable'. My code is as below. public class HiveToHbaseLoader implements Serializable { public static void main(String[] args) throws Exception { String hbaseTableName = args[0]; String hiveQuery = args[1]; SparkConf conf = new SparkConf().setAppName(Hive to Hbase Loader) .setMaster(); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); hiveContext.setConf(hive.metastore.uris, ?); DataFrame dataFrame = hiveContext.sql(hiveQuery); dataFrame .foreachPartition(new AbstractFunction1scala.collection.IteratorRow, BoxedUnit() { //Logic to load row from hive to Hbase. } }} Getting error as below. Exception in thread main org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Yarn application ID for Spark job on Yarn
Hi, Is there a way to get Yarn application ID inside spark application, when running spark Job on YARN ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-application-ID-for-Spark-job-on-Yarn-tp23429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help optimising Spark SQL query
Hi James, There are a few configurations that you can try: https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options From my experience, the codegen really boost things up. Just run sqlContext.sql(spark.sql.codegen=true) before you execute your query. But keep in mind that sometimes this is buggy (depending on your query), so compare to results without codegen to be sure. Also you can try changing the default partitions. You can also use dataframes (since 1.3). Not sure they are better than specifying the query in 1.3, but with spark 1.4 there should be an enormous performance improvement in dataframes. Lior On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster
Hi, I can't get this to work using CDH 5.4, Spark 1.4.0 in yarn cluster mode. @andrew did you manage to get it work with the latest version ? Le mar. 21 avr. 2015 à 00:02, Andrew Lee alee...@hotmail.com a écrit : Hi Marcelo, Exactly what I need to track, thanks for the JIRA pointer. Date: Mon, 20 Apr 2015 14:03:55 -0700 Subject: Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster From: van...@cloudera.com To: alee...@hotmail.com CC: user@spark.apache.org I think you want to take a look at: https://issues.apache.org/jira/browse/SPARK-6207 On Mon, Apr 20, 2015 at 1:58 PM, Andrew Lee alee...@hotmail.com wrote: Hi All, Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1 Posting this problem to user group first to see if someone is encountering the same problem. When submitting spark jobs that invokes HiveContext APIs on a Kerberos Hadoop + YARN (2.4.1) cluster, I'm getting this error. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Apparently, the Kerberos ticket is not on the remote data node nor computing node since we don't deploy Kerberos tickets, and that is not a good practice either. On the other hand, we can't just SSH to every machine and run kinit for that users. This is not practical and it is insecure. The point here is that shouldn't there be a delegation token during the doAs to use the token instead of the ticket ? I'm trying to understand what is missing in Spark's HiveContext API while a normal MapReduce job that invokes Hive APIs will work, but not in Spark SQL. Any insights or feedback are appreciated. Anyone got this running without pre-deploying (pre-initializing) all tickets node by node? Is this worth filing a JIRA? 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with URI thrift://alee-cluster.test.testserver.com:9083 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:214) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at
Re: Help optimising Spark SQL query
Thanks for the responses, guys! Sorry, I forgot to mention that I'm using Spark 1.3.0, but I'll test with 1.4.0 and try the codegen suggestion then report back. On 22 June 2015 at 12:37, Matthew Johnson matt.john...@algomi.com wrote: Hi James, What version of Spark are you using? In Spark 1.2.2 I had an issue where Spark would report a job as complete but I couldn’t find my results anywhere – I just assumed it was me doing something wrong as I am still quite new to Spark. However, since upgrading to 1.4.0 I have not seen this issue, so might be worth upgrading if you are not already on 1.4. Cheers, Matthew *From:* Lior Chaga [mailto:lio...@taboola.com] *Sent:* 22 June 2015 17:24 *To:* James Aley *Cc:* user *Subject:* Re: Help optimising Spark SQL query Hi James, There are a few configurations that you can try: https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options From my experience, the codegen really boost things up. Just run sqlContext.sql(spark.sql.codegen=true) before you execute your query. But keep in mind that sometimes this is buggy (depending on your query), so compare to results without codegen to be sure. Also you can try changing the default partitions. You can also use dataframes (since 1.3). Not sure they are better than specifying the query in 1.3, but with spark 1.4 there should be an enormous performance improvement in dataframes. Lior On Mon, Jun 22, 2015 at 6:28 PM, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
Thanks, I've updated my code to use updateStateByKey but am still getting these errors when I resume from a checkpoint. One thought of mine was that I used sc.parallelize to generate the RDDs for the queue, but perhaps on resume, it doesn't recreate the context needed? -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote: Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
Where does task_batches come from? On 22 Jun 2015 4:48 pm, Shaanan Cohney shaan...@gmail.com wrote: Thanks, I've updated my code to use updateStateByKey but am still getting these errors when I resume from a checkpoint. One thought of mine was that I used sc.parallelize to generate the RDDs for the queue, but perhaps on resume, it doesn't recreate the context needed? -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote: Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
It's a generated set of shell commands to run (written in C, highly optimized numerical computer), which is create from a set of user provided parameters. The snippet above is: task_outfiles_to_cmds = OrderedDict(run_sieving.leftover_tasks) task_outfiles_to_cmds.update(generate_sieving_task_commands(parameters, poly_path, fb_paths)) task_commands = list(task_outfiles_to_cmds.values()) task_path = os.path.join(utils.WORK_DIR, sieving_tasks) if not os.path.exists(task_path): os.makedirs(task_path) batch_size = utils.TOTAL_CORES task_batches = [task_commands[c:c+batch_size] for c in range(0, len(task_commands),batch_size)] Which does not reference the SparkContext or StreamingContext at all. -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Tue, Jun 23, 2015 at 1:05 AM, Benjamin Fradet benjamin.fra...@gmail.com wrote: Where does task_batches come from? On 22 Jun 2015 4:48 pm, Shaanan Cohney shaan...@gmail.com wrote: Thanks, I've updated my code to use updateStateByKey but am still getting these errors when I resume from a checkpoint. One thought of mine was that I used sc.parallelize to generate the RDDs for the queue, but perhaps on resume, it doesn't recreate the context needed? -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote: Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: Help optimising Spark SQL query
Have you test this on a smaller set to verify that the query is correct? On Mon, Jun 22, 2015 at 2:59 PM, ayan guha guha.a...@gmail.com wrote: You may also want to change count(*) to specific column. On 23 Jun 2015 01:29, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
jars are not loading from 1.3. those set via setJars to the SparkContext
I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
Yes. Thanks Best Regards On Mon, Jun 22, 2015 at 8:33 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have more than one jar. can we set sc.addJar multiple times with each dependent jar ? On Mon, Jun 22, 2015 at 8:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try sc.addJar instead of setJars Thanks Best Regards On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Calling rdd() on a DataFrame causes stage boundary
When I call rdd() on a DataFrame, it ends the current stage and starts a new one that just maps the DataFrame to rdd and nothing else. It doesn't seem to do a shuffle (which is good and expected), but then why does why is there a separate stage? I also thought that stages only end when there's a shuffle or the job ends with the action that triggered the job. Thanks.
Re: Code review - Spark SQL command-line client for Cassandra
Hi Matthew, you could add the dependencies yourself by using the %dep command in zeppelin ( https://zeppelin.incubator.apache.org/docs/interpreter/spark.html). I have not tried with zeppelin but have used spark-notebook https://github.com/andypetrella/spark-notebook and got Cassandra connector working. Below have provided samples. *In Zeppelin: (Not Tested)* %dep z.load(com.datastax.com:spark-cassandra-connector_2.11:1.4.0-M1) Note: In order for Spark and Cassandra to work the Spark , Spark-Cassandra-Connector, Spark-notebook spark version should match. In the above case it was 1.2.0 *If using spark-notebook: (Tested works)* Installed : 1. Apache Spark 1.2.0 2. Cassandra DSE - 1 node (just Cassandra and no analytics) 3. Notebook: wget https://s3.eu-central-1.amazonaws.com/spark-notebook/tgz/spark-notebook-0.4.3-scala-2.10.4-spark-1.2.0-hadoop-2.4.0.tgz Once notebook have been started : http://ec2-xx-x-xx-xxx.us-west-x.compute.amazonaws.com:9000/#clusters Select Standalone: In SparkConf : update the spark master ip to EC2 : internal DNS name. In Spark Notebook: :dp com.datastax.spark % spark-cassandra-connector_2.10 % 1.2.0-rc3 import com.datastax.spark.connector._ import com.datastax.spark.connector.rdd.CassandraRDD val cassandraHost:String = localhost reset(lastChanges = _.set(spark.cassandra.connection.host, cassandraHost)) val rdd = sparkContext.cassandraTable(excelsior,test) rdd.toArray.foreach(println) Note: In order for Spark and Cassandra to work the Spark , Spark-Cassandra-Connector, Spark-notebook spark version should match. In the above case it was 1.2.0 On Mon, Jun 22, 2015 at 9:52 AM, Matthew Johnson matt.john...@algomi.com wrote: Hi Pawan, Looking at the changes for that git pull request, it looks like it just pulls in the dependency (and transitives) for “spark-cassandra-connector”. Since I am having to build Zeppelin myself anyway, would it be ok to just add this myself for the connector for 1.4.0 (as found here http://search.maven.org/#artifactdetails%7Ccom.datastax.spark%7Cspark-cassandra-connector_2.11%7C1.4.0-M1%7Cjar)? What exactly is it that does not currently exist for Spark 1.4? Thanks, Matthew *From:* pawan kumar [mailto:pkv...@gmail.com] *Sent:* 22 June 2015 17:19 *To:* Silvio Fiorito *Cc:* Mohammed Guller; Matthew Johnson; shahid ashraf; user@spark.apache.org *Subject:* Re: Code review - Spark SQL command-line client for Cassandra Hi, Zeppelin has a cassandra-spark-connector built into the build. I have not tried it yet may be you could let us know. https://github.com/apache/incubator-zeppelin/pull/79 To build a Zeppelin version with the *Datastax Spark/Cassandra connector https://github.com/datastax/spark-cassandra-connector* mvn clean package *-Pcassandra-spark-1.x* -Dhadoop.version=xxx -Phadoop-x.x -DskipTests Right now the Spark/Cassandra connector is available for *Spark 1.1* and *Spark 1.2*. Support for *Spark 1.3* is not released yet (*but you can build you own Spark/Cassandra connector version **1.3.0-SNAPSHOT*). Support for *Spark 1.4* does not exist yet Please do not forget to add -Dspark.cassandra.connection.host=xxx to the *ZEPPELIN_JAVA_OPTS*parameter in *conf/zeppelin-env.sh* file. Alternatively you can add this parameter in the parameter list of the *Spark interpreter* on the GUI -Pawan On Mon, Jun 22, 2015 at 9:04 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Yes, just put the Cassandra connector on the Spark classpath and set the connector config properties in the interpreter settings. *From: *Mohammed Guller *Date: *Monday, June 22, 2015 at 11:56 AM *To: *Matthew Johnson, shahid ashraf *Cc: *user@spark.apache.org *Subject: *RE: Code review - Spark SQL command-line client for Cassandra I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for sure, but it should not be difficult. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com matt.john...@algomi.com] *Sent:* Monday, June 22, 2015 2:15 AM *To:* Mohammed Guller; shahid ashraf *Cc:* user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew *From:* Mohammed Guller [mailto:moham...@glassbeam.com] *Sent:* 20 June 2015 17:27 *To:* shahid ashraf *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and
Re: SQL vs. DataFrame API
Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Spark job fails silently
Hi, Our spark job on yarn suddenly started failing silently without showing any error following is the trace. Using properties file: /usr/lib/spark/conf/spark-defaults.conf Adding default property: spark.serializer=org.apache.spark.serializer.KryoSerializer Adding default property: spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties Adding default property: spark.eventLog.enabled=true Adding default property: spark.shuffle.service.enabled=true Adding default property: spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.yarn.historyServer.address=http://ds-hnn002.dev.abc.com:18088 Adding default property: spark.yarn.am.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.ui.showConsoleProgress=true Adding default property: spark.shuffle.service.port=7337 Adding default property: spark.master=yarn-client Adding default property: spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.eventLog.dir=hdfs://magnetic-hadoop-dev/user/spark/applicationHistory Adding default property: spark.yarn.jar=local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar Parsed arguments: master yarn deployMode null executorMemory 3G executorCores null totalExecutorCores null propertiesFile /usr/lib/spark/conf/spark-defaults.conf driverMemory4G driverCores null driverExtraClassPathnull driverExtraLibraryPath /usr/lib/hadoop/lib/native driverExtraJavaOptions null supervise false queue null numExecutors30 files null pyFiles null archivesnull mainClass null primaryResource file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py nameupdb2vw_testing.py childArgs [--date 2015-05-20] jarsnull packagesnull repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file /usr/lib/spark/conf/spark-defaults.conf: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.eventLog.enabled - true spark.ui.showConsoleProgress - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://magnetic-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Main class: org.apache.spark.deploy.PythonRunner Arguments: file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py null --date 2015-05-20 System properties: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.driver.memory - 4G spark.executor.memory - 3G spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.executor.instances - 30 spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.ui.showConsoleProgress - true spark.eventLog.enabled - true spark.yarn.dist.files - file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py SPARK_SUBMIT - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.app.name - updb2vw_testing.py spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://magnetic-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Classpath elements: spark.akka.frameSize=60 spark.app.name=updb2vw_2015-05-20 spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native spark.driver.maxResultSize=2G spark.driver.memory=4G spark.eventLog.dir=hdfs://magnetic-hadoop-dev/user/spark/applicationHistory spark.eventLog.enabled=true spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native spark.executor.instances=30 spark.executor.memory=3G spark.master=yarn-client spark.serializer=org.apache.spark.serializer.KryoSerializer spark.shuffle.manager=hash spark.shuffle.service.enabled=true spark.shuffle.service.port=7337 spark.task.maxFailures=6 spark.ui.showConsoleProgress=true
workaround for groupByKey
Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: workaround for groupByKey
There is reduceByKey that works on K,V. You need to accumulate partial results and proceed. does your computation allow that ? On Mon, Jun 22, 2015 at 2:12 PM, Jianguo Li flyingfromch...@gmail.com wrote: Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo -- Deepak
Re: SQL vs. DataFrame API
Right now, we can not figure out which column you referenced in `select`, if there are multiple row with the same name in the joined DataFrame (for example, two `value`). A workaround could be: numbers2 = numbers.select(df.name, df.value.alias('other')) rows = numbers.join(numbers2, (numbers.name==numbers2.name) (numbers.value != numbers2.other), how=inner) \ .select(numbers.name, numbers.value, numbers2.other) \ .collect() On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco elnopin...@gmail.com wrote: Sorry thought it was scala/spark El 22/6/2015 9:49 p. m., Bob Corsaro rcors...@gmail.com escribió: That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark on yarn failing silently
Hi, suddenly our spark job on yarn started failing silently without showing any error, following is the trace in verbose mode Using properties file: /usr/lib/spark/conf/spark-defaults.conf Adding default property: spark.serializer=org.apache.spark.serializer.KryoSerializer Adding default property: spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties Adding default property: spark.eventLog.enabled=true Adding default property: spark.shuffle.service.enabled=true Adding default property: spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.yarn.historyServer.address=http://ds-hnn002.dev.abc.com:18088 Adding default property: spark.yarn.am.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.ui.showConsoleProgress=true Adding default property: spark.shuffle.service.port=7337 Adding default property: spark.master=yarn-client Adding default property: spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native Adding default property: spark.eventLog.dir=hdfs://my-hadoop-dev/user/spark/applicationHistory Adding default property: spark.yarn.jar=local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar Parsed arguments: master yarn deployMode null executorMemory 3G executorCores null totalExecutorCores null propertiesFile /usr/lib/spark/conf/spark-defaults.conf driverMemory4G driverCores null driverExtraClassPathnull driverExtraLibraryPath /usr/lib/hadoop/lib/native driverExtraJavaOptions null supervise false queue null numExecutors30 files null pyFiles null archivesnull mainClass null primaryResource file:/home/xyz/code/updb/spark/updb2vw_testing.py nameupdb2vw_testing.py childArgs [--date 2015-05-20] jarsnull packagesnull repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file /usr/lib/spark/conf/spark-defaults.conf: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.eventLog.enabled - true spark.ui.showConsoleProgress - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://my-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Main class: org.apache.spark.deploy.PythonRunner Arguments: file:/home/xyz/code/updb/spark/updb2vw_testing.py null --date 2015-05-20 System properties: spark.executor.extraLibraryPath - /usr/lib/hadoop/lib/native spark.driver.memory - 4G spark.executor.memory - 3G spark.yarn.jar - local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar spark.driver.extraLibraryPath - /usr/lib/hadoop/lib/native spark.executor.instances - 30 spark.yarn.historyServer.address - http://ds-hnn002.dev.abc.com:18088 spark.yarn.am.extraLibraryPath - /usr/lib/hadoop/lib/native spark.ui.showConsoleProgress - true spark.eventLog.enabled - true spark.yarn.dist.files - file:/home/xyz/code/updb/spark/updb2vw_testing.py SPARK_SUBMIT - true spark.serializer - org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions - -Dlog4j.configuration=file:///etc/spark/log4j.properties spark.shuffle.service.enabled - true spark.app.name - updb2vw_testing.py spark.shuffle.service.port - 7337 spark.eventLog.dir - hdfs://my-hadoop-dev/user/spark/applicationHistory spark.master - yarn-client Classpath elements: spark.akka.frameSize=60 spark.app.name=updb2vw_2015-05-20 spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native spark.driver.maxResultSize=2G spark.driver.memory=4G spark.eventLog.dir=hdfs://my-hadoop-dev/user/spark/applicationHistory spark.eventLog.enabled=true spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native spark.executor.instances=30 spark.executor.memory=3G spark.master=yarn-client spark.serializer=org.apache.spark.serializer.KryoSerializer spark.shuffle.manager=hash spark.shuffle.service.enabled=true spark.shuffle.service.port=7337 spark.task.maxFailures=6 spark.ui.showConsoleProgress=true spark.yarn.am.extraLibraryPath=/usr/lib/hadoop/lib/native
Re: Velox Model Server
Models that I am looking for are mostly factorization based models (which includes both recommendation and topic modeling use-cases). For recommendation models, I need a combination of Spark SQL and ml model prediction api...I think spark job server is what I am looking for and it has fast http rest backend through spray which will scale fine through akka. Out of curiosity why netty? What model are you serving? Velox doesn't look like it is optimized for cases like ALS recs, if that's what you mean. I think scoring ALS at scale in real time takes a fairly different approach. The servlet engine probably doesn't matter at all in comparison. On Sat, Jun 20, 2015, 9:40 PM Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
Re: Submitting Spark Applications using Spark Submit
Did you restart your master / workers? On the master node, run `sbin/stop-all.sh` followed by `sbin/start-all.sh` 2015-06-20 17:59 GMT-07:00 Raghav Shankar raghav0110...@gmail.com: Hey Andrew, I tried the following approach: I modified my Spark build on my local machine. I did downloaded the Spark 1.4.0 src code and then made a change to ResultTask.scala( I made a simple change to see if it work. I added a print statement). Now, I built spark using mvn -Dhadoop.version=1.0.4 -Phadoop-1 -DskipTests -Dscala-2.10 clean package Now, the new assembly jar was built. I started my EC2 Cluster using this command: ./ec2/spark-ec2 -k key -i ../aggr/key.pem --instance-type=m3.medium --zone=us-east-1b -s 9 launch spark-cluster I initially launched my application jar and it worked fine. After that I scp’d the new assembly jar to the spark lib directory of all my ec2 nodes. When I ran the jar again I got the following error: 5/06/21 00:42:51 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:42:52 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:42:52 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:11 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:43:11 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:11 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: ec2-XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:43:31 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077/user/Master... 15/06/21 00:43:31 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@xxx.compute-1.amazonaws.com:7077 15/06/21 00:43:31 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@ec2-xxx.compute-1.amazonaws.com:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: XXX.compute-1.amazonaws.com/10.165.103.16:7077 15/06/21 00:43:51 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/06/21 00:43:51 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/06/21 00:43:51 INFO SparkUI: Stopped Spark web UI at http://XXX.compute-1.amazonaws.com:4040 15/06/21 00:43:51 INFO DAGScheduler: Stopping DAGScheduler 15/06/21 00:43:51 INFO SparkDeploySchedulerBackend: Shutting down all executors 15/06/21 00:43:51 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 15/06/21 00:43:51 ERROR OneForOneStrategy: java.lang.NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
Re: Why can't I allocate more than 4 executors with 2 machines on YARN?
1) Can you try with yarn-cluster 2) Does your queue have enough capacity On Mon, Jun 22, 2015 at 11:10 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am running a simple spark streaming application on hadoop 2.7.0/YARN (master: yarn-client) cluster with 2 different machines (12GB RAM with 8 CPU cores each). I am launching my application like this: ~/myapp$ ~/my-spark/bin/spark-submit --class App --master yarn-client --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 6 target/scala-2.10/my-app_2.10-0.1-SNAPSHOT.jar 1 mymachine3 1000 8 10 4 stdev 3 Despite I required 6 executors for my application, it seems that I am unable to get more than 4 executors (2 per machine). If I request any number of executors below 5 it works fine, but otherwise it seems that it is not able to allocate more than 4. Why does this happen? Thanks. -- Deepak
Re: s3 - Can't make directory for path
hi, have you tested s3://ww-sandbox/name_of_path/ instead of s3://ww-sandbox/name_of_path or have you test to add your file extension with placeholder (*) like: s3://ww-sandbox/name_of_path/*.gz or s3://ww-sandbox/name_of_path/*.csv depend on your files. If it does not work pls test with the new s3a protocol of Spark/Hadoop: https://issues.apache.org/jira/browse/HADOOP-10400 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-Can-t-make-directory-for-path-tp23419p23438.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compute Median in Spark Dataframe
Many thanks, will look into this. I dont want to particularly reuse the custom Hive UDAF I have, would prefer writing a new one it that is cleaner. I am just using the JVM. On 5 June 2015 at 00:03, Holden Karau hol...@pigscanfly.ca wrote: My current example doesn't use a Hive UDAF, but you would do something pretty similar (it calls a new user defined UDAF, and there are wrappers to make Spark SQL UDAFs from Hive UDAFs but they are private). So this is doable, but since it pokes at internals it will likely break between versions of Spark. If you want to see the WIP PR I have with Sparkling Pandas its at https://github.com/sparklingpandas/sparklingpandas/pull/90/files . If your doing this in JVM and just want to know how to wrap the Hive UDAF, you can grep/look in sql/hive/ in Spark, but I'd encourage you to see if there is another way to accomplish what you want (since poking at the internals is kind of dangerous). On Thu, Jun 4, 2015 at 6:28 AM, Deenar Toraskar deenar.toras...@gmail.com wrote: Hi Holden, Olivier So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. I also need to call a Hive UDAF in a dataframe agg function. Are there any examples of what Column expects? Deenar On 2 June 2015 at 21:13, Holden Karau hol...@pigscanfly.ca wrote: So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. On Tuesday, June 2, 2015, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
which mllib algorithm for large multi-class classification?
hi, I am unfortunately not very fit in the whole MLlib stuff, so I would appreciate a little help: Which multi-class classification algorithm i should use if i want to train texts (100-1000 words each) into categories. The number of categories is between 100-500 and the number of training documents which i have transform to tf-idf vectors is max ~ 300.000 it looks like the most algorithms are running into OOM exception or array larger than MaxInt exceptions with a large number of classes/categories cause there are collect steps in it? thanks a lot -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/which-mllib-algorithm-for-large-multi-class-classification-tp23439.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question about SPARK_WORKER_CORES and spark.task.cpus
Hi, I was running a WordCount application on Spark, and the machine I used has 4 physical cores. However, in spark-env.sh file, I set SPARK_WORKER_CORES = 32. The web UI says it launched one executor with 32 cores and the executor could execute 32 tasks simultaneously. Does spark create 32 vCores out of 4 physical cores? How much physical CPU resource can each task get then? Also, I found a parameter “spark.task.cpus”, but I don’t quite understand this parameter. If I set it to 2, does Spark allocate 2 CPU cores for one task? I think “task” is a “thread” within executor (“process”), so how can a thread utilize two CPU cores simultaneously? I am looking forward to your reply, thanks! Best, Rui
Re: workaround for groupByKey
Silvio, Suppose my RDD is (K-1, v1,v2,v3,v4). If i want to do simple addition i can use reduceByKey or aggregateByKey. What if my processing needs to check all the items in the value list each time, Above two operations do not get all the values, they just get two pairs (v1, v2) , you do some processing and store it back into v1. How do i use the combiner facility present with reduceByKey aggregateByKey. -deepak On Mon, Jun 22, 2015 at 2:43 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo -- Deepak
Re: Why can't I allocate more than 4 executors with 2 machines on YARN?
OK, I figured out this. The maximum number of containers YARN can create per node is based on the total available RAM and the maximum allocation per container ( yarn.scheduler.maximum-allocation-mb ). The default is 8192; setting to a lower value allowed me to create more containers per node. On Mon, Jun 22, 2015 at 10:42 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: 1) Can you try with yarn-cluster 2) Does your queue have enough capacity On Mon, Jun 22, 2015 at 11:10 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am running a simple spark streaming application on hadoop 2.7.0/YARN (master: yarn-client) cluster with 2 different machines (12GB RAM with 8 CPU cores each). I am launching my application like this: ~/myapp$ ~/my-spark/bin/spark-submit --class App --master yarn-client --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 6 target/scala-2.10/my-app_2.10-0.1-SNAPSHOT.jar 1 mymachine3 1000 8 10 4 stdev 3 Despite I required 6 executors for my application, it seems that I am unable to get more than 4 executors (2 per machine). If I request any number of executors below 5 it works fine, but otherwise it seems that it is not able to allocate more than 4. Why does this happen? Thanks. -- Deepak
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
Yes I have the producer in the class path. And I am using in standalone mode. Sent from my iPhone On 23-Jun-2015, at 3:31 am, Tathagata Das t...@databricks.com wrote: Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
Fwd: Storing an action result in HDFS
Hello All, I am new to Spark. I have a very basic question.How do I write the output of an action on a RDD to HDFS? Thanks in advance for the help. Cheers, Ravi
Re: Storing an action result in HDFS
Hi Chris, Thanks for the quick reply and the welcome. I am trying to read a file from hdfs and then writing back just the first line to hdfs. I calling first() on the RDD to get the first line. Sent from my iPhone On Jun 22, 2015, at 7:42 PM, Chris Gore cdg...@cdgore.com wrote: Hi Ravi, Welcome, you probably want RDD.saveAsTextFile(“hdfs:///my_file”) Chris On Jun 22, 2015, at 5:28 PM, ravi tella ddpis...@gmail.com wrote: Hello All, I am new to Spark. I have a very basic question.How do I write the output of an action on a RDD to HDFS? Thanks in advance for the help. Cheers, Ravi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: jars are not loading from 1.3. those set via setJars to the SparkContext
Do you have Kafka producer in your classpath? If so how are adding that library? Are you running on YARN, or Mesos or Standalone or local. These details will be very useful. On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri kmurt...@gmail.com wrote: I am using spark streaming. what i am trying to do is sending few messages to some kafka topic. where its failing. java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at kafka.utils.Utils$.createObject(Utils.scala:438) at kafka.producer.Producer.init(Producer.scala:61) On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri kmurt...@gmail.com wrote: I have been using the spark from the last 6 months with the version 1.2.0. I am trying to migrate to the 1.3.0 but the same problem i have written is not wokring. Its giving class not found error when i try to load some dependent jars from the main program. This use to work in 1.2.0 when set all the dependent jars array to the spark context but not working in 1.3.0 Please help me how to resolve this. Thanks, Murthy Chelankuri
New Spark Meetup group in Munich
Hi everyone, I want to announce that we have create a Spark Meetup Group in Munich (Germany). We currently plan our first event which will take place in July. There we will show basics about spark to catch a lot of people who are new to this framework. In the following evens we will go deeper in special topics about Spark. It would be nice if someone can add our meetup group to the spark website (http://spark.apache.org/community.html) :) You find us here: http://www.meetup.com/de/Spark-Munich/ http://www.meetup.com/de/Spark-Munich/ Thanks, Danny Linden
Re: Storing an action result in HDFS
Hi Ravi, Welcome, you probably want RDD.saveAsTextFile(“hdfs:///my_file”) Chris On Jun 22, 2015, at 5:28 PM, ravi tella ddpis...@gmail.com wrote: Hello All, I am new to Spark. I have a very basic question.How do I write the output of an action on a RDD to HDFS? Thanks in advance for the help. Cheers, Ravi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: workaround for groupByKey
You’re right of course, I’m sorry. I was typing before thinking about what you actually asked! On a second thought, what is the ultimate outcome for what you want the sequence of pages for? Do they need to actually all be grouped? Could you instead partition by user id then use a mapPartitions perhaps? From: Jianguo Li Date: Monday, June 22, 2015 at 6:21 PM To: Silvio Fiorito Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: workaround for groupByKey Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. I read in the Learning Sparking We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupByKey() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passingrdd.partitioner It seems that when the map-side aggregation function is to append something to a list (as opposed to summing over all the numbers), then this map-side aggregation does not offer any benefit since appending to a list does not save any space. Is my understanding correct? Thanks, Jianguo On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: You can use aggregateByKey as one option: val input: RDD[Int, String] = ... val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) = a += b, (a, b) = a ++ b) From: Jianguo Li Date: Monday, June 22, 2015 at 5:12 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: workaround for groupByKey Hi, I am processing an RDD of key-value pairs. The key is an user_id, and the value is an website url the user has ever visited. Since I need to know all the urls each user has visited, I am tempted to call the groupByKey on this RDD. However, since there could be millions of users and urls, the shuffling caused by groupByKey proves to be a major bottleneck to get the job done. Is there any workaround? I want to end up with an RDD of key-value pairs, where the key is an user_id, the value is a list of all the urls visited by the user. Thanks, Jianguo
Re: Confusion matrix for binary classification
Hi, In Spark 1.4, you may use DataFrame.stat.crosstab to generate the confusion matrix. This would be very simple if you are using the ML Pipelines Api, and are working with DataFrames. Best, Burak On Mon, Jun 22, 2015 at 4:21 AM, CD Athuraliya cdathural...@gmail.com wrote: Hi, I am looking for a way to get confusion matrix for binary classification. I was able to get confusion matrix for multiclass classification using this [1]. But I could not find a proper way to get confusion matrix in similar class available for binary classification [2]. Later I found this class [3] which corresponds to my requirement but I am not sure about the way I should use that class to get evaluation metrics for binary classification. e.g. Given the constructor BinaryConfusionMatrixImpl(BinaryLabelCounter count, BinaryLabelCounter totalCount), from where I can get this count and totalCount? Appreciate any help on this. [1] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/MulticlassMetrics.html#confusionMatrix() [2] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html [3] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrixImpl.html Thanks -- *CD Athuraliya* Software Engineer WSO2, Inc. Mobile: +94 716288847 94716288847 LinkedIn http://lk.linkedin.com/in/cdathuraliya | Twitter https://twitter.com/cdathuraliya | Blog http://cdathuraliya.tumblr.com/
Programming with java on spark
Hello, everyone! I'm new in spark. I have already written programs in Hadoop2.5.2, where I defined my own InputFormat and OutputFormat. Now I want to move my codes to spark using java language. The first problem I encountered is how to transform big txt file in local storage to RDD, which is compatible to my program written in hadoop. I found that there are functions in SparkContext which maybe helpful. But I don't know how to use them. E.G. public K,V,F extends org.apache.hadoop.mapreduce.InputFormatK,V RDD http://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/rdd/RDD.htmlscala.Tuple2K,V newAPIHadoopFile(String path, ClassF fClass, ClassK kClass, ClassV vClass, org.apache.hadoop.conf.Configuration conf) Get an RDD for a given Hadoop file with an arbitrary new API InputFormat and extra configuration options to pass to the input format. '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. In java, the following is wrong. /option one Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text distFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat,LongWritable,Text,confHadoop); /option two Configuration confHadoop = new Configuration(); DataInputFormat input=new DataInputFormat(); LongWritable longType=new LongWritable(); Text text=new Text(); JavaPairRDDLongWritable,Text distFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, input,longType,text,confHadoop); Can anyone help me? Thank you so much.
JavaDStreamString read and write rdbms
Hi Team, How to split and put the red JavaDStreamString in to mysql in java. any existing api in sark 1.3/1.4. team can you please share the code snippet if any body have it. Thanks, Manohar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaDStream-String-read-and-write-rdbms-tp23423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaDStreamString read and write rdbms
Spark docs has addresses this pretty well. Look for patterns of use foreachRDD. On 22 Jun 2015 17:09, Manohar753 manohar.re...@happiestminds.com wrote: Hi Team, How to split and put the red JavaDStreamString in to mysql in java. any existing api in sark 1.3/1.4. team can you please share the code snippet if any body have it. Thanks, Manohar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaDStream-String-read-and-write-rdbms-tp23423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Serializer not switching
I am trying to run a function on every line of a parquet file. The function is in an object. When I run the program, I get an exception that the object is not serializable. I read around the internet and found that I should use Kryo Serializer. I changed the setting in the spark conf and registered the object to the Kryo Serializer. When I run the program I still get the same exception (from the stack trace: at org.apache.spark.serializer.JavaSerializationStream.write object(JavaSerializer.scala:47)). For some reason, the program is still trying to serialize using the default java Serializer. I am working with spark 1.4.
[Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: Using Accumulators in Streaming
But i just want to update rdd , by appending unique message ID with each element of RDD , which should be automatically(m++ ..) updated every time a new element comes to rdd . On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia mici...@gmail.com wrote: StreamingContext.sparkContext() On 21 June 2015 at 21:32, Will Briggs wrbri...@gmail.com wrote: It sounds like accumulators are not necessary in Spark Streaming - see this post ( http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html) for more details. On June 21, 2015, at 7:31 PM, anshu shukla anshushuk...@gmail.com wrote: In spark Streaming ,Since we are already having Streaming context , which does not allows us to have accumulators .We have to get sparkContext for initializing accumulator value . But having 2 spark context will not serve the problem . Please Help !! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Spark 1.3 - Connect to to Cassandra - cassandraTable is not recognised by sc
Hello, I'm writing an application in Scala to connect to Cassandra to read the data. My setup is Intellij with maven. When I try to compile the application I get the following *error: object datastax is not a member of package com* *error: value cassandraTable is not a member of org.apache.spark.SparkContext* libraryDependencies += com.datastax.spark %% spark-cassandra-connector % 1.4.0-M1 import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} object ReadCassandra { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(Streaming Test) .set(spark.executor.memory, 1g) .set(spark.cassandra.connection.host, ns6512097.ip-37-187-69.eu) .set(spark.cassandra.auth.username,cassandra) .set(spark.cassandra.auth.password,cassandra) val sc = new SparkContext(conf) val rdd_cassandra = sc.cassandraTable(tutorial,user) // read the data rdd_cassandra.collect()
Re: How to get and parse whole xml file in HDFS by Spark Streaming
You can use fileStream for that, look at the XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. It should give you full XML object as on record, (as opposed to an XML record spread across multiple line records in textFileStream). Also this thread http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-td19239.html has some discussion around it. Thanks Best Regards On Mon, Jun 22, 2015 at 12:23 AM, Yong Feng fengyong...@gmail.com wrote: Hi Spark Experts I have a customer who wants to monitor coming data files (with xml format), and then analysize them after that put analysized data into DB. The size of each file is about 30MB (or even less in future). Spark streaming seems promising. After learning Spark Streaming and also google-ing how Spark Streaming handle xml files, I found there seems no existing Spark Stream utility to recognize whole xml file and parse it. The fileStream seems line-oriented. There is suggestion of putting whole xml file into one line, however it requires pre-processing files which will bring unexpected I/O. Can anyone throw some light on it? If will be great if there are some sample codes for me to start with. Thanks Yong
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: Spark 1.3 - Connect to to Cassandra - cassandraTable is not recognised by sc
its fixed now, adding dependecies in pom.xml fixed it dependency groupIdcom.datastax.spark/groupId artifactIdspark-cassandra-connector-embedded_2.10/artifactId version1.4.0-M1/version /dependency On Mon, Jun 22, 2015 at 10:46 AM, Koen Vantomme koen.vanto...@gmail.com wrote: Hello, I'm writing an application in Scala to connect to Cassandra to read the data. My setup is Intellij with maven. When I try to compile the application I get the following *error: object datastax is not a member of package com* *error: value cassandraTable is not a member of org.apache.spark.SparkContext* libraryDependencies += com.datastax.spark %% spark-cassandra-connector % 1.4.0-M1 import com.datastax.spark.connector._ import org.apache.spark.{SparkConf, SparkContext} object ReadCassandra { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(Streaming Test) .set(spark.executor.memory, 1g) .set(spark.cassandra.connection.host, ns6512097.ip-37-187-69.eu) .set(spark.cassandra.auth.username,cassandra) .set(spark.cassandra.auth.password,cassandra) val sc = new SparkContext(conf) val rdd_cassandra = sc.cassandraTable(tutorial,user) // read the data rdd_cassandra.collect()
Re: s3 - Can't make directory for path
Could you elaborate a bit more? What do you meant by set up a standalone server? and what is leading you to that exceptions? Thanks Best Regards On Mon, Jun 22, 2015 at 2:22 AM, nizang ni...@windward.eu wrote: hi, I'm trying to setup a standalone server, and in one of my tests, I got the following exception: java.io.IOException: Can't make directory for path 's3n://ww-sandbox/name_of_path' since it is a file. at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:541) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:532) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1867) at org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:52) at org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1093) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965) at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:789) at com.windward.spark.io.SparkWriter.write(SparkWriter.java:93) at com.windward.spark.io.MultipleDataFileWriter.write(MultipleDataFileWriter.java:48) at com.windward.spark.io.SparkWriterContainer.write(SparkWriterContainer.java:85) at com.windward.spark.io.SparkWriterContainer.write(SparkWriterContainer.java:72) at com.windward.spark.io.SparkWriterContainer.write(SparkWriterContainer.java:56) at com.windward.spark.apps.VesselStoriesRunner.doWork(VesselStoriesRunner.java:91) at com.windward.spark.AbstractSparkRunner.calcAll(AbstractSparkRunner.java:60) at com.windward.spark.apps.VesselStoriesApp.main(VesselStoriesApp.java:8) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) the part in s3 for the relevant part looks like: root@ip-172-31-7-77 startup_scripts]$ s3cmd ls s3://ww-sandbox/name_of_path DIR s3://ww-sandbox/name_of_path/ 2014-11-13 10:27 0 s3://ww-sandbox/name_of_path 2015-06-21 20:39 0 s3://ww-sandbox/name_of_path_$folder$ I tried to give it as a parameter with or without the '/' in the end The exact same call works for me with a yarn cluster (which I'm trying to remove) anyone has any idea? thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-Can-t-make-directory-for-path-tp23419.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Confusion matrix for binary classification
Hi, I am looking for a way to get confusion matrix for binary classification. I was able to get confusion matrix for multiclass classification using this [1]. But I could not find a proper way to get confusion matrix in similar class available for binary classification [2]. Later I found this class [3] which corresponds to my requirement but I am not sure about the way I should use that class to get evaluation metrics for binary classification. e.g. Given the constructor BinaryConfusionMatrixImpl(BinaryLabelCounter count, BinaryLabelCounter totalCount), from where I can get this count and totalCount? Appreciate any help on this. [1] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/MulticlassMetrics.html#confusionMatrix() [2] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html [3] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrixImpl.html Thanks -- *CD Athuraliya* Software Engineer WSO2, Inc. Mobile: +94 716288847 94716288847 LinkedIn http://lk.linkedin.com/in/cdathuraliya | Twitter https://twitter.com/cdathuraliya | Blog http://cdathuraliya.tumblr.com/
Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream
I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote: Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks! def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print(EXCEPTION: %s % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet benjamin.fra...@gmail.com wrote: What does counts refer to? Could you also paste the code of your update_state function? On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote: I'm receiving the SPARK-5063 error (RDD transformations and actions can only be invoked by the driver, not inside of other transformations) whenever I try and restore from a checkpoint in spark streaming on my app. I'm using python3 and my RDDs are inside a queuestream DStream. This is the little chunk of code causing issues: - p_batches = [sc.parallelize(batch) for batch in task_batches] sieving_tasks = ssc.queueStream(p_batches) sieving_tasks.checkpoint(20) relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, fb_paths)) relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) ).foreachRDD(lambda s: update_state(out_files, counts, s)) ssc.checkpoint(s3n_path) - Thanks again! -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com
Re: Problem attaching to YARN
On 22 Jun 2015, at 04:08, Shawn Garbett shawn.garb...@gmail.commailto:shawn.garb...@gmail.com wrote: 2015-06-21 11:03:22,029 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=39288,containerID=container_1434751301309_0015_02_01] is running beyond virtual memory limits. Current usage: 240.2 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory used. Killing container. Dump of the process-tree for container_1434751301309_0015_02_01 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 39288 39286 39288 39288 (bash) 0 0 13590528 709 /bin/bash -c /usr/lib/jvm/java-8-oracle/bin/java -server -Xmx512m - your using more allocated virtual memory than asked for, and there's some limit checking going on. edit your yarn-site.xml file to turn off the vmem check. property nameyarn.nodemanager.vmem-check-enabled/name valuefalse/value /property
RE: Code review - Spark SQL command-line client for Cassandra
Thanks Mohammed, it’s good to know I’m not alone! How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like it would only support Hadoop out of the box. Is it just a case of dropping the Cassandra Connector onto the Spark classpath? Cheers, Matthew *From:* Mohammed Guller [mailto:moham...@glassbeam.com] *Sent:* 20 June 2015 17:27 *To:* shahid ashraf *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra It is a simple Play-based web application. It exposes an URI for submitting a SQL query. It then executes that query using CassandraSQLContext provided by Spark Cassandra Connector. Since it is web-based, I added an authentication and authorization layer to make sure that only users with the right authorization can use it. I am happy to open-source that code if there is interest. Just need to carve out some time to clean it up and remove all the other services that this web application provides. Mohammed *From:* shahid ashraf [mailto:sha...@trialx.com sha...@trialx.com] *Sent:* Saturday, June 20, 2015 6:52 AM *To:* Mohammed Guller *Cc:* Matthew Johnson; user@spark.apache.org *Subject:* RE: Code review - Spark SQL command-line client for Cassandra Hi Mohammad Can you provide more info about the Service u developed On Jun 20, 2015 7:59 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Matthew, It looks fine to me. I have built a similar service that allows a user to submit a query from a browser and returns the result in JSON format. Another alternative is to leave a Spark shell or one of the notebooks (Spark Notebook, Zeppelin, etc.) session open and run queries from there. This model works only if people give you the queries to execute. Mohammed *From:* Matthew Johnson [mailto:matt.john...@algomi.com] *Sent:* Friday, June 19, 2015 2:20 AM *To:* user@spark.apache.org *Subject:* Code review - Spark SQL command-line client for Cassandra Hi all, I have been struggling with Cassandra’s lack of adhoc query support (I know this is an anti-pattern of Cassandra, but sometimes management come over and ask me to run stuff and it’s impossible to explain that it will take me a while when it would take about 10 seconds in MySQL) so I have put together the following code snippet that bundles DataStax’s Cassandra Spark connector and allows you to submit Spark SQL to it, outputting the results in a text file. Does anyone spot any obvious flaws in this plan?? (I have a lot more error handling etc in my code, but removed it here for brevity) *private* *void* run(String sqlQuery) { SparkContext scc = *new* SparkContext(conf); CassandraSQLContext csql = *new* CassandraSQLContext(scc); DataFrame sql = csql.sql(sqlQuery); String folderName = /tmp/output_ + System.*currentTimeMillis*(); *LOG*.info(Attempting to save SQL results in folder: + folderName); sql.rdd().saveAsTextFile(folderName); *LOG*.info(SQL results saved); } *public* *static* *void* main(String[] args) { String sparkMasterUrl = args[0]; String sparkHost = args[1]; String sqlQuery = args[2]; SparkConf conf = *new* SparkConf(); conf.setAppName(Java Spark SQL); conf.setMaster(sparkMasterUrl); conf.set(spark.cassandra.connection.host, sparkHost); JavaSparkSQL app = *new* JavaSparkSQL(conf); app.run(sqlQuery, printToConsole); } I can then submit this to Spark with ‘spark-submit’: Ø *./spark-submit --class com.algomi.spark.JavaSparkSQL --master spark://sales3:7077 spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://sales3:7077 sales3 select * from mykeyspace.operationlog * It seems to work pretty well, so I’m pretty happy, but wondering why this isn’t common practice (at least I haven’t been able to find much about it on Google) – is there something terrible that I’m missing? Thanks! Matthew
Re: Using Accumulators in Streaming
If I am not mistaken, one way to see the accumulators is that they are just write-only for the workers and their value can be read by the driver. Therefore they cannot be used for ID generation as you wish. On 22 June 2015 at 04:30, anshu shukla anshushuk...@gmail.com wrote: But i just want to update rdd , by appending unique message ID with each element of RDD , which should be automatically(m++ ..) updated every time a new element comes to rdd . On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia mici...@gmail.com wrote: StreamingContext.sparkContext() On 21 June 2015 at 21:32, Will Briggs wrbri...@gmail.com wrote: It sounds like accumulators are not necessary in Spark Streaming - see this post ( http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html) for more details. On June 21, 2015, at 7:31 PM, anshu shukla anshushuk...@gmail.com wrote: In spark Streaming ,Since we are already having Streaming context , which does not allows us to have accumulators .We have to get sparkContext for initializing accumulator value . But having 2 spark context will not serve the problem . Please Help !! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: memory needed for each executor
Totally depends on the use-case that you are solving with Spark, for instance there was some discussion around the same which you could read over here http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-td23326.html Thanks Best Regards On Mon, Jun 22, 2015 at 10:57 AM, pth001 patcharee.thong...@uni.no wrote: Hi, How can I know the size of memory needed for each executor (one core) to execute each job? If there are many cores per executors, will the memory be the multiplication (memory needed for each executor (one core) * no. of cores)? Any suggestions/guidelines? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JavaDStreamString read and write rdbms
Its pretty straight forward, this would get you started http://stackoverflow.com/questions/24896233/how-to-save-apache-spark-schema-output-in-mysql-database Thanks Best Regards On Mon, Jun 22, 2015 at 12:39 PM, Manohar753 manohar.re...@happiestminds.com wrote: Hi Team, How to split and put the red JavaDStreamString in to mysql in java. any existing api in sark 1.3/1.4. team can you please share the code snippet if any body have it. Thanks, Manohar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaDStream-String-read-and-write-rdbms-tp23423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Support for Windowing and Analytics functions in Spark SQL
1.4 supports it On 23 Jun 2015 02:59, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Though the documentation does not explicitly mention support for Windowing and Analytics function in Spark SQL, looks like it is not supported. I tried running a query like Select Lead(column name, 1) over (Partition By column name order by column name) from table name and I got error saying that this feature is unsupported. I tried it in Databricks cloud and that supports Spark 1.4. Can anyone please confirm this ? Regards, Sourav
Re: Help optimising Spark SQL query
Hi James, Maybe it's the DISTINCT causing the issue. I rewrote the query as follows. Maybe this one can finish faster. select sum(cnt) as uses, count(id) as users from ( select count(*) cnt, cast(id as string) as id, from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' group by cast(id as string) ) tmp Thanks, Yin On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke jornfra...@gmail.com wrote: Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
External Jar file with SparkR
I have been unsuccessful with incorporating an external Jar into a SparkR program. Does anyone know how to do this successfully? JarTest.java = package com.myco; public class JarTest { public static double myStaticMethod() { return 5.515; } } = JarTest.R = Sys.setenv(SPARK_HOME=/usr/local/spark-1.4.0-bin-hadoop2.6/) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) #sparkR.stop() # Stop if you want to rerun the code sc = sparkR.init(master=local, sparkJars=c(/locOfMyJar/JarTest.jar)) SparkR:::callJStatic(java.lang.Math, max, 5, 2) # OK: 5 SparkR:::callJStatic(java.lang.Math, min, 5, 2) # OK: 2 SparkR:::callJStatic(com.myco.JarTest, myStaticMethod) # Fails, see below # 5/06/22 13:38:07 ERROR RBackendHandler: myStaticMethod on com.myco.JarTest failed # java.lang.ClassNotFoundException: com.myco.JarTest # at java.net.URLClassLoader$1.run(URLClassLoader.java:366) # at java.net.URLClassLoader$1.run(URLClassLoader.java:355) # at java.security.AccessController.doPrivileged(Native Method) # at java.net.URLClassLoader.findClass(URLClassLoader.java:354) # at java.lang.ClassLoader.loadClass(ClassLoader.java:425) # at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) # at java.lang.ClassLoader.loadClass(ClassLoader.java:358) # at java.lang.Class.forName0(Native Method) # at java.lang.Class.forName(Class.java:190) # at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:101) # at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) # at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) # at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) # at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) # at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) # at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) # at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) # at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) # at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) # at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) # at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) # at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) # at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) # at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) # at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) # at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) # at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) # at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) # at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) # at java.lang.Thread.run(Thread.java:744) # Error: returnStatus == 0 is not TRUE = So basically something like: javac jarTest.java jar -cf JarTest.jar JarTest.class jar -tf JarTest.jar Then I run RStudio or R with the commands you see in JarTest.R (make sure you point to your jar file). As you can see in the comments, it does not appear to find the Java class. Does anyone know a way to make this work correctly? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-Jar-file-with-SparkR-tp23433.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark on YARN port out of range
Unfortunately there is not a great way to do it without modifying Spark to print more things it reads from the stream. 2015-06-20 23:10 GMT-07:00 John Meehan meeh...@dls.net: Yes it seems to be consistently port out of range:1315905645”. Is there any way to see what the python process is actually outputting (in hopes that yields a clue)? On Jun 19, 2015, at 6:47 PM, Andrew Or and...@databricks.com wrote: Hm, one thing to see is whether the same port appears many times (1315905645). The way pyspark works today is that the JVM reads the port from the stdout of the python process. If there is some interference in output from the python side (e.g. any print statements, exception messages), then the Java side will think that it's actually a port even when it's not. I'm not sure why it fails sometimes but not others, but 2/3 of the time is a lot... 2015-06-19 14:57 GMT-07:00 John Meehan meeh...@dls.net: Has anyone encountered this “port out of range” error when launching PySpark jobs on YARN? It is sporadic (e.g. 2/3 jobs get this error). LOG: 15/06/19 11:49:44 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 39.0 (TID 211) on executor xxx.xxx.xxx.com: java.lang.IllegalArgumentException (port out of range:1315905645) [duplicate 7] Traceback (most recent call last): File stdin, line 1, in module 15/06/19 11:49:44 INFO cluster.YarnScheduler: Removed TaskSet 39.0, whose tasks have all completed, from pool File /home/john/spark-1.4.0/python/pyspark/rdd.py, line 745, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File /home/john/spark-1.4.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/john/spark-1.4.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError15/06/19 11:49:44 INFO storage.BlockManagerInfo: Removed broadcast_38_piece0 on 17.134.160.35:47455 in memory (size: 2.2 KB, free: 265.4 MB) : An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 39.0 failed 4 times, most recent failure: Lost task 1.3 in stage 39.0 (TID 210, xxx.xxx.xxx.com): java.lang.IllegalArgumentException: port out of range:1315905645 at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) at java.net.InetSocketAddress.init(InetSocketAddress.java:185) at java.net.Socket.init(Socket.java:241) at org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:75) at org.apache.spark.api.python.PythonWorkerFactory.liftedTree1$1(PythonWorkerFactory.scala:90) at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:130) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:73) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) * Spark 1.4.0 build: build/mvn -Pyarn -Phive -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.1.4 -DskipTests clean package LAUNCH CMD: export HADOOP_CONF_DIR=/path/to/conf
Re: Help optimising Spark SQL query
You may also want to change count(*) to specific column. On 23 Jun 2015 01:29, James Aley james.a...@swiftkey.com wrote: Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: SQL vs. DataFrame API
Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Why can't I allocate more than 4 executors with 2 machines on YARN?
Hi, I am running a simple spark streaming application on hadoop 2.7.0/YARN (master: yarn-client) cluster with 2 different machines (12GB RAM with 8 CPU cores each). I am launching my application like this: ~/myapp$ ~/my-spark/bin/spark-submit --class App --master yarn-client --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 6 target/scala-2.10/my-app_2.10-0.1-SNAPSHOT.jar 1 mymachine3 1000 8 10 4 stdev 3 Despite I required 6 executors for my application, it seems that I am unable to get more than 4 executors (2 per machine). If I request any number of executors below 5 it works fine, but otherwise it seems that it is not able to allocate more than 4. Why does this happen? Thanks.
Re: Help optimising Spark SQL query
Generally (not only spark sql specific) you should not cast in the where part of a sql query. It is also not necessary in your case. Getting rid of casts in the whole query will be also beneficial. Le lun. 22 juin 2015 à 17:29, James Aley james.a...@swiftkey.com a écrit : Hello, A colleague of mine ran the following Spark SQL query: select count(*) as uses, count (distinct cast(id as string)) as users from usage_events where from_unixtime(cast(timestamp_millis/1000 as bigint)) between '2015-06-09' and '2015-06-16' The table contains billions of rows, but totals only 64GB of data across ~30 separate files, which are stored as Parquet with LZO compression in S3. From the referenced columns: * id is Binary, which we cast to a String so that we can DISTINCT by it. (I was already told this will improve in a later release, in a separate thread.) * timestamp_millis is a long, containing a unix timestamp with millisecond resolution This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2 instances, using 20 executors, each with 4GB memory. I can see from monitoring tools that the CPU usage is at 100% on all nodes, but incoming network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound. Does that seem slow? Can anyone offer any ideas by glancing at the query as to why this might be slow? We'll profile it meanwhile and post back if we find anything ourselves. A side issue - I've found that this query, and others, sometimes completes but doesn't return any results. There appears to be no error that I can see in the logs, and Spark reports the job as successful, but the connected JDBC client (SQLWorkbenchJ in this case), just sits there forever waiting. I did a quick Google and couldn't find anyone else having similar issues. Many thanks, James.
Re: Multiple executors writing file using java filewriter
Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Multiple executors writing file using java filewriter
Thanx for reply !! YES , Either it should write on any machine of cluster or Can you please help me ... that how to do this . Previously i was using writing using collect () , so some of my tuples are missing while writing. //previous logic that was just creating the file on master - newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { //System.out.println(v1 here is + v1 + --- + s); spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher rmarsc...@localytics.com wrote: Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
SQL vs. DataFrame API
Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Re: SQL vs. DataFrame API
That's invalid syntax. I'm pretty sure pyspark is using a DSL to create a query here and not actually doing an equality operation. On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco elnopin...@gmail.com wrote: Probably you should use === instead of == and !== instead of != Can anyone explain why the dataframe API doesn't work as I expect it to here? It seems like the column identifiers are getting confused. https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
Re: Storing an action result in HDFS
Hi Ravi, For this case, you could simply do sc.parallelize([rdd.first()]).saveAsTextFile(“hdfs:///my_file”) using pyspark or sc.parallelize(Array(rdd.first())).saveAsTextFile(“hdfs:///my_file”) using Scala Chris On Jun 22, 2015, at 5:53 PM, ddpis...@gmail.com wrote: Hi Chris, Thanks for the quick reply and the welcome. I am trying to read a file from hdfs and then writing back just the first line to hdfs. I calling first() on the RDD to get the first line. Sent from my iPhone On Jun 22, 2015, at 7:42 PM, Chris Gore cdg...@cdgore.com wrote: Hi Ravi, Welcome, you probably want RDD.saveAsTextFile(“hdfs:///my_file”) Chris On Jun 22, 2015, at 5:28 PM, ravi tella ddpis...@gmail.com wrote: Hello All, I am new to Spark. I have a very basic question.How do I write the output of an action on a RDD to HDFS? Thanks in advance for the help. Cheers, Ravi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serializer not switching
My hunch is that you changed spark.serializer to Kryo but left spark.closureSerializer unmodified, so it's still using Java for closure serialization. Kryo doesn't really work as a closure serializer but there's an open pull request to fix this: https://github.com/apache/spark/pull/6361 On Mon, Jun 22, 2015 at 5:42 AM, Sean Barzilay sesnbarzi...@gmail.com wrote: My program is written in Scala. I am creating a jar and submitting it using spark-submit. My code is on a computer in an internal network withe no internet so I can't send it. On Mon, Jun 22, 2015, 3:19 PM Akhil Das ak...@sigmoidanalytics.com wrote: How are you submitting the application? Could you paste the code that you are running? Thanks Best Regards On Mon, Jun 22, 2015 at 5:37 PM, Sean Barzilay sesnbarzi...@gmail.com wrote: I am trying to run a function on every line of a parquet file. The function is in an object. When I run the program, I get an exception that the object is not serializable. I read around the internet and found that I should use Kryo Serializer. I changed the setting in the spark conf and registered the object to the Kryo Serializer. When I run the program I still get the same exception (from the stack trace: at org.apache.spark.serializer.JavaSerializationStream.write object(JavaSerializer.scala:47)). For some reason, the program is still trying to serialize using the default java Serializer. I am working with spark 1.4.
Re: Confusion matrix for binary classification
Hi Burak, Thanks for the response. I am using Spark version 1.3.0 through Java API. Regards, CD On Tue, Jun 23, 2015 at 5:11 AM, Burak Yavuz brk...@gmail.com wrote: Hi, In Spark 1.4, you may use DataFrame.stat.crosstab to generate the confusion matrix. This would be very simple if you are using the ML Pipelines Api, and are working with DataFrames. Best, Burak On Mon, Jun 22, 2015 at 4:21 AM, CD Athuraliya cdathural...@gmail.com wrote: Hi, I am looking for a way to get confusion matrix for binary classification. I was able to get confusion matrix for multiclass classification using this [1]. But I could not find a proper way to get confusion matrix in similar class available for binary classification [2]. Later I found this class [3] which corresponds to my requirement but I am not sure about the way I should use that class to get evaluation metrics for binary classification. e.g. Given the constructor BinaryConfusionMatrixImpl(BinaryLabelCounter count, BinaryLabelCounter totalCount), from where I can get this count and totalCount? Appreciate any help on this. [1] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/MulticlassMetrics.html#confusionMatrix() [2] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html [3] http://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrixImpl.html Thanks -- *CD Athuraliya* Software Engineer WSO2, Inc. Mobile: +94 716288847 94716288847 LinkedIn http://lk.linkedin.com/in/cdathuraliya | Twitter https://twitter.com/cdathuraliya | Blog http://cdathuraliya.tumblr.com/ -- *CD Athuraliya* Software Engineer WSO2, Inc. Mobile: +94 716288847 94716288847 LinkedIn http://lk.linkedin.com/in/cdathuraliya | Twitter https://twitter.com/cdathuraliya | Blog http://cdathuraliya.tumblr.com/
Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?
Is there any way to retrieve the time of each message's arrival into a Kafka topic, when streaming in Spark, whether with receiver-based or direct streaming? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Support for Windowing and Analytics functions in Spark SQL
Yes, with should be with HiveContext, not SQLContext. From: ayan guha [mailto:guha.a...@gmail.com] Sent: Tuesday, June 23, 2015 2:51 AM To: smazumder Cc: user Subject: Re: Support for Windowing and Analytics functions in Spark SQL 1.4 supports it On 23 Jun 2015 02:59, Sourav Mazumder sourav.mazumde...@gmail.commailto:sourav.mazumde...@gmail.com wrote: Hi, Though the documentation does not explicitly mention support for Windowing and Analytics function in Spark SQL, looks like it is not supported. I tried running a query like Select Lead(column name, 1) over (Partition By column name order by column name) from table name and I got error saying that this feature is unsupported. I tried it in Databricks cloud and that supports Spark 1.4. Can anyone please confirm this ? Regards, Sourav
Re: Velox Model Server
How large are your models? Spark job server does allow synchronous job execution and with a warm long-lived context it will be quite fast - but still in the order of a second or a few seconds usually (depending on model size - for very large models possibly quite a lot more than that). What are your use cases for SQL during recommendation? Filtering? If your recommendation needs are real-time (1s) I am not sure job server and computing the refs with spark will do the trick (though those new BLAS-based methods may have given sufficient speed up). — Sent from Mailbox On Mon, Jun 22, 2015 at 11:17 PM, Debasish Das debasish.da...@gmail.com wrote: Models that I am looking for are mostly factorization based models (which includes both recommendation and topic modeling use-cases). For recommendation models, I need a combination of Spark SQL and ml model prediction api...I think spark job server is what I am looking for and it has fast http rest backend through spray which will scale fine through akka. Out of curiosity why netty? What model are you serving? Velox doesn't look like it is optimized for cases like ALS recs, if that's what you mean. I think scoring ALS at scale in real time takes a fairly different approach. The servlet engine probably doesn't matter at all in comparison. On Sat, Jun 20, 2015, 9:40 PM Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
RE: Question about SPARK_WORKER_CORES and spark.task.cpus
It’s actually not that tricky. SPARK_WORKER_CORES: is the max task thread pool size of the of the executor, the same saying of “one executor with 32 cores and the executor could execute 32 tasks simultaneously”. Spark doesn’t care about how much real physical CPU/Cores you have (OS does), so user need to give an appropriate value to reflect the real physical machine settings, otherwise the thread context switching probably be an overhead for the CPU intensive tasks. “spark.task.cpus”: I copied how to it’s used from the Spark source code: // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers private val tasksPerExecutor = conf.getInt(spark.executor.cores, 1) / conf.getInt(spark.task.cpus, 1) It means the “Number of Tasks per Executor”(parallelize task number per executor) = SPARK_WORKER_CORES / “spark.task.cpus” “spark.task.cpus” gives user an opportunity to reserve resources for a task which probably create more running threads internally. (For example, run a multithreaded external app within each task). Hope it helpful. From: Rui Li [mailto:spark.ru...@gmail.com] Sent: Tuesday, June 23, 2015 8:56 AM To: user@spark.apache.org Subject: Question about SPARK_WORKER_CORES and spark.task.cpus Hi, I was running a WordCount application on Spark, and the machine I used has 4 physical cores. However, in spark-env.sh file, I set SPARK_WORKER_CORES = 32. The web UI says it launched one executor with 32 cores and the executor could execute 32 tasks simultaneously. Does spark create 32 vCores out of 4 physical cores? How much physical CPU resource can each task get then? Also, I found a parameter “spark.task.cpus”, but I don’t quite understand this parameter. If I set it to 2, does Spark allocate 2 CPU cores for one task? I think “task” is a “thread” within executor (“process”), so how can a thread utilize two CPU cores simultaneously? I am looking forward to your reply, thanks! Best, Rui
Re: Serializer not switching
How are you submitting the application? Could you paste the code that you are running? Thanks Best Regards On Mon, Jun 22, 2015 at 5:37 PM, Sean Barzilay sesnbarzi...@gmail.com wrote: I am trying to run a function on every line of a parquet file. The function is in an object. When I run the program, I get an exception that the object is not serializable. I read around the internet and found that I should use Kryo Serializer. I changed the setting in the spark conf and registered the object to the Kryo Serializer. When I run the program I still get the same exception (from the stack trace: at org.apache.spark.serializer.JavaSerializationStream.write object(JavaSerializer.scala:47)). For some reason, the program is still trying to serialize using the default java Serializer. I am working with spark 1.4.
Re: How to get and parse whole xml file in HDFS by Spark Streaming
Like this? val rawXmls = ssc.fileStream(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text]) Thanks Best Regards On Mon, Jun 22, 2015 at 5:45 PM, Yong Feng fengyong...@gmail.com wrote: Thanks a lot, Akhil I saw this mail thread before, but still do not understand how to use XmlInputFormatof mahout in Spark Streaming (I am not Spark Streaming Expert yet ;-)). Can you show me some sample code for explanation. Thanks in advance, Yong On Mon, Jun 22, 2015 at 6:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use fileStream for that, look at the XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. It should give you full XML object as on record, (as opposed to an XML record spread across multiple line records in textFileStream). Also this thread http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-td19239.html has some discussion around it. Thanks Best Regards On Mon, Jun 22, 2015 at 12:23 AM, Yong Feng fengyong...@gmail.com wrote: Hi Spark Experts I have a customer who wants to monitor coming data files (with xml format), and then analysize them after that put analysized data into DB. The size of each file is about 30MB (or even less in future). Spark streaming seems promising. After learning Spark Streaming and also google-ing how Spark Streaming handle xml files, I found there seems no existing Spark Stream utility to recognize whole xml file and parse it. The fileStream seems line-oriented. There is suggestion of putting whole xml file into one line, however it requires pre-processing files which will bring unexpected I/O. Can anyone throw some light on it? If will be great if there are some sample codes for me to start with. Thanks Yong
Spark and HDFS ( Worker and Data Nodes Combination )
Hi All , What is the Best Way to install and Spark Cluster along side with Hadoop Cluster , Any recommendation for below deployment topology will be a great help *Also Is it necessary to put the Spark Worker on DataNodes as when it read block from HDFS it will be local to the Server / Worker or I can put the Worker on any other nodes and if i do that will it affect the performance of the Spark Data Processing ..* Hadoop Option 1 Server 1 - NameNodeSpark Master Server 2 - DataNode 1 Spark Worker Server 3 - DataNode 2 Spark Worker Server 4 - DataNode 3 Spark Worker Hadoop Option 2 Server 1 - NameNode Server 2 - Spark Master Server 2 - DataNode 1 Server 3 - DataNode 2 Server 4 - DataNode 3 Server 5 - Spark Worker 1 Server 6 - Spark Worker 2 Server 7 - Spark Worker 3 Thanks.
Re: Serializer not switching
My program is written in Scala. I am creating a jar and submitting it using spark-submit. My code is on a computer in an internal network withe no internet so I can't send it. On Mon, Jun 22, 2015, 3:19 PM Akhil Das ak...@sigmoidanalytics.com wrote: How are you submitting the application? Could you paste the code that you are running? Thanks Best Regards On Mon, Jun 22, 2015 at 5:37 PM, Sean Barzilay sesnbarzi...@gmail.com wrote: I am trying to run a function on every line of a parquet file. The function is in an object. When I run the program, I get an exception that the object is not serializable. I read around the internet and found that I should use Kryo Serializer. I changed the setting in the spark conf and registered the object to the Kryo Serializer. When I run the program I still get the same exception (from the stack trace: at org.apache.spark.serializer.JavaSerializationStream.write object(JavaSerializer.scala:47)). For some reason, the program is still trying to serialize using the default java Serializer. I am working with spark 1.4.
Re: How to get and parse whole xml file in HDFS by Spark Streaming
Thanks a lot, Akhil I saw this mail thread before, but still do not understand how to use XmlInputFormatof mahout in Spark Streaming (I am not Spark Streaming Expert yet ;-)). Can you show me some sample code for explanation. Thanks in advance, Yong On Mon, Jun 22, 2015 at 6:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use fileStream for that, look at the XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. It should give you full XML object as on record, (as opposed to an XML record spread across multiple line records in textFileStream). Also this thread http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-td19239.html has some discussion around it. Thanks Best Regards On Mon, Jun 22, 2015 at 12:23 AM, Yong Feng fengyong...@gmail.com wrote: Hi Spark Experts I have a customer who wants to monitor coming data files (with xml format), and then analysize them after that put analysized data into DB. The size of each file is about 30MB (or even less in future). Spark streaming seems promising. After learning Spark Streaming and also google-ing how Spark Streaming handle xml files, I found there seems no existing Spark Stream utility to recognize whole xml file and parse it. The fileStream seems line-oriented. There is suggestion of putting whole xml file into one line, however it requires pre-processing files which will bring unexpected I/O. Can anyone throw some light on it? If will be great if there are some sample codes for me to start with. Thanks Yong
Re: Spark and HDFS ( Worker and Data Nodes Combination )
Option 1 should be fine, Option 2 would bound a lot on network as the data increase in time. Thanks Best Regards On Mon, Jun 22, 2015 at 5:59 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , What is the Best Way to install and Spark Cluster along side with Hadoop Cluster , Any recommendation for below deployment topology will be a great help *Also Is it necessary to put the Spark Worker on DataNodes as when it read block from HDFS it will be local to the Server / Worker or I can put the Worker on any other nodes and if i do that will it affect the performance of the Spark Data Processing ..* Hadoop Option 1 Server 1 - NameNodeSpark Master Server 2 - DataNode 1 Spark Worker Server 3 - DataNode 2 Spark Worker Server 4 - DataNode 3 Spark Worker Hadoop Option 2 Server 1 - NameNode Server 2 - Spark Master Server 2 - DataNode 1 Server 3 - DataNode 2 Server 4 - DataNode 3 Server 5 - Spark Worker 1 Server 6 - Spark Worker 2 Server 7 - Spark Worker 3 Thanks.
Re: How to get and parse whole xml file in HDFS by Spark Streaming
Thanks Akhil I will have a try and then go back to you Yong On Mon, Jun 22, 2015 at 8:25 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Like this? val rawXmls = ssc.fileStream(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text]) Thanks Best Regards On Mon, Jun 22, 2015 at 5:45 PM, Yong Feng fengyong...@gmail.com wrote: Thanks a lot, Akhil I saw this mail thread before, but still do not understand how to use XmlInputFormatof mahout in Spark Streaming (I am not Spark Streaming Expert yet ;-)). Can you show me some sample code for explanation. Thanks in advance, Yong On Mon, Jun 22, 2015 at 6:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use fileStream for that, look at the XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. It should give you full XML object as on record, (as opposed to an XML record spread across multiple line records in textFileStream). Also this thread http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-td19239.html has some discussion around it. Thanks Best Regards On Mon, Jun 22, 2015 at 12:23 AM, Yong Feng fengyong...@gmail.com wrote: Hi Spark Experts I have a customer who wants to monitor coming data files (with xml format), and then analysize them after that put analysized data into DB. The size of each file is about 30MB (or even less in future). Spark streaming seems promising. After learning Spark Streaming and also google-ing how Spark Streaming handle xml files, I found there seems no existing Spark Stream utility to recognize whole xml file and parse it. The fileStream seems line-oriented. There is suggestion of putting whole xml file into one line, however it requires pre-processing files which will bring unexpected I/O. Can anyone throw some light on it? If will be great if there are some sample codes for me to start with. Thanks Yong
Re: Using Accumulators in Streaming
I stumbled upon zipWithUniqueId/zipWithIndex. Is this what you are looking for? https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDDLike.html#zipWithUniqueId() On 22 June 2015 at 06:16, Michal Čizmazia mici...@gmail.com wrote: If I am not mistaken, one way to see the accumulators is that they are just write-only for the workers and their value can be read by the driver. Therefore they cannot be used for ID generation as you wish. On 22 June 2015 at 04:30, anshu shukla anshushuk...@gmail.com wrote: But i just want to update rdd , by appending unique message ID with each element of RDD , which should be automatically(m++ ..) updated every time a new element comes to rdd . On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia mici...@gmail.com wrote: StreamingContext.sparkContext() On 21 June 2015 at 21:32, Will Briggs wrbri...@gmail.com wrote: It sounds like accumulators are not necessary in Spark Streaming - see this post ( http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html) for more details. On June 21, 2015, at 7:31 PM, anshu shukla anshushuk...@gmail.com wrote: In spark Streaming ,Since we are already having Streaming context , which does not allows us to have accumulators .We have to get sparkContext for initializing accumulator value . But having 2 spark context will not serve the problem . Please Help !! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: understanding on the waiting batches and scheduling delay in Streaming UI
Hi Das, Thanks for your reply. Somehow I missed it.. I am using Spark 1.3. The data source is from kafka. Yeah, not sure why the delay is 0. I'll run against 1.4 and give a screenshot. Thanks, Mike From: Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com Date: Thursday, June 18, 2015 at 6:05 PM To: Mike Fang chyfan...@gmail.commailto:chyfan...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: understanding on the waiting batches and scheduling delay in Streaming UI Which version of spark? and what is your data source? For some reason, your processing delay is exceeding the batch duration. And its strange that you are not seeing any scheduling delay. Thanks Best Regards On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.commailto:chyfan...@gmail.com wrote: Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the Waiting batches is 144. But the scheduling delay is 0. I am a bit confused. If the waiting batches is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
Re: Registering custom metrics
Hi Gerard, Have there been any responses? Any insights as to what you ended up doing to enable custom metrics? I'm thinking of implementing a custom metrics sink, not sure how doable that is yet... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Metrics Sink
Hi, I was wondering if there've been any responses to this? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Metrics-Sink-tp10068p23425.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org