Re: Feature Generation On Spark
Do you have one document per file or multiple document in the file? On 4 Jul 2015 23:38, Michal Čizmazia mici...@gmail.com wrote: Spark Context has a method wholeTextFiles. Is that what you need? On 4 July 2015 at 07:04, rishikesh rishikeshtha...@hotmail.com wrote: Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Get Spark version before starting context
Hey all, Is it possible to reliably get the version string of a Spark cluster prior to trying to connect via the SparkContext on the client side? Most of the errors I've seen on mismatched versions have been cryptic, so it would be helpful if I could throw an exception earlier. I know it is contained the HTML of the master, but an API point would also be helpful. Does this exist? Thanks! -Pat
Re: Get Spark version before starting context
To somewhat answer my own question - it looks like an empty request to the rest API will throw an error which returns the version in JSON as well. Still not ideal though. Would there be any objection to adding a simple version endpoint to the API? On Sat, Jul 4, 2015 at 4:00 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, Is it possible to reliably get the version string of a Spark cluster prior to trying to connect via the SparkContext on the client side? Most of the errors I've seen on mismatched versions have been cryptic, so it would be helpful if I could throw an exception earlier. I know it is contained the HTML of the master, but an API point would also be helpful. Does this exist? Thanks! -Pat
Authorisation issue in Spark while using SQL based Authorization
Though i have set hive.security.authorization.enabled=true and hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory a User X can select table belonging to User Y as for some reason Spark SQL Thrift server is not doing authorization . This is causing the data security issue as User X can any select table though permissions are NOT granted. I did checked the same in Hive Thrift sever but there authentication is happening. Thanks for your help in advance and there is the argument passed while starting spark thrift server. I will appreciate if anyone of you can suggest a work around --hiveconf hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory --hiveconf hive.server2.enable.doAs=false --hiveconf hive.security.authorization.enabled=true --hive.server2.thrift.port=10001 --hiveconf hostname.compute.amazonaws.com --hiveconf hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Authorisation-issue-in-Spark-while-using-SQL-based-Authorization-tp23619.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
calling HiveContext.table or running a query reads files unnecessarily in S3
Hi, I'm just getting started with Spark so apologies if this I'm missing something obvious. In the below, I'm using Spark 1.4. I've created a partitioned table in S3 (call it 'dataset'), with basic structure like so: s3://bucket/dataset/pk=a s3://bucket/dataset/pk=b s3://bucket/dataset/pk=c In each partition, I wrote a parquet file with some data. I also created a table in the Hive metastore I'm running, using the command hiveContext.sql(CREATE EXTERNAL TABLE dataset(k string, v bigint) PARTITIONED BY (pk string) STORED AS PARQUET LOCATION 's3a://bucket/dataset') I also added the partitions pk={a, b, c} using ALTER TABLE commands. In a different session, I create a hiveContext and call dataset = hiveContext.table('dataset') When I do this, I see logs indicating that all parquet files were opened---why is this? E.g., 15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-3.gz.parquet at pos 497 15/07/04 21:52:54 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-2.gz.parquet to seek to new offset 483 15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-2.gz.parquet at pos 483 15/07/04 21:52:54 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-4.gz.parquet to seek to new offset 483 ...and so on. This isn't to much trouble when I only have 3 partitions, but my real dataset will have thousands (think partitioned by date for 20 years). This then becomes super slow just to get a handle to the table. I would have thought that the metastore would have sufficient schema data to create the DataFrame---it's supposed to enable fast schema discovery, right? I feel like I must be missing something. I also tried a second approach, after dropping the table and removing data from my first attempt. Before writing the partitions, I used the new DataFrameWriter object to add the table to the metastore and add the path in S3 (using an empty DataFrame with the correct schema): my_data.filter('FALSE').write.partitionBy('pk').saveAsTable('dataset', format='parquet', path='s3a://bucket/dataset') I then used a DataFrameWriter to write each partition: my_data.filter(my_data.pk == 'a').write.partitionBy('pk').insertInto('dataset') and so on. (Obviously in this toy example I could write it all at once, but not in the more general case.) Now, when I start a different session and get a handle to the table, no files are touched! Hooray. But then I run into a different but similar problem; when I run a query, all files are touched even though the system recognizes they can be pruned: df = dataset.filter(dataset.pk == 'b').toPandas() 15/07/04 21:52:54 INFO S3AFileSystem: List status for path: s3a://bucket/dataset/pk=a 15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for s3a://bucket/dataset/pk=a (dataset/pk=a) 15/07/04 21:52:54 INFO S3AFileSystem: List status for path: s3a://bucket/dataset/pk=b 15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for s3a://bucket/dataset/pk=b (dataset/pk=b) 15/07/04 21:52:54 INFO S3AFileSystem: List status for path: s3a://bucket/dataset/pk=c 15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for s3a://bucket/dataset/pk=c (dataset/pk=c) 15/07/04 21:52:54 INFO DataSourceStrategy: Selected 1 partitions out of 3, pruned -200.0% partitions. ...[so far so good, some other stuff happens here]... 15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for s3a://bucket/dataset/_common_metadata (dataset/_common_metadata) 15/07/04 21:52:54 INFO S3AFileSystem: Reopening dataset/pk=a/part-r-4.gz.parquet to seek to new offset 430 15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file dataset/pk=a/part-r-4.gz.parquet at pos 430 15/07/04 21:52:54 INFO S3AFileSystem: Reopening dataset/pk=b/part-r-1.gz.parquet to seek to new offset 430 15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file dataset/pk=b/part-r-1.gz.parquet at pos 430 15/07/04 21:52:54 INFO S3AFileSystem: Reopening dataset/pk=c/part-r-1.gz.parquet to seek to new offset 430 15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file dataset/pk=c/part-r-1.gz.parquet at pos 430 So even after it recognized that it could prune all but one partition, it went through and opened files in all the rest as well! This ruins the performance of a query over a small number of partitions in a large dataset. Can anyone clarify why this is happening and how I can avoid this? I would like to be able to run queries on tables in the metastore without touching more files than are actually required, based on the partition filter. Thanks, Steve
Re: All master are unreponsive issue
Currently the number of retries is hardcoded. You may want to open a JIRA which makes the retry count configurable. Cheers On Thu, Jul 2, 2015 at 8:35 PM, luohui20...@sina.com wrote: Hi there, i check the source code and found that in org.apache.spark.deploy.client.AppClient, there is a parameter tells(line 52): val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 As I know If I wanna increase the retry times, must I modify this value,rebuild the entire Spark project and then redeply spark cluster with my modified version? Or is there a better way to solve this issue? Thanks. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:luohui20...@sina.com 收件人:user user@spark.apache.org 主题:All master are unreponsive issue 日期:2015年07月02日 17点31分 Hi there: I got an problem that Application has been killed.Reason:All masters are unresponsive!Giving up. I check the network I/O and found sometimes it is really high when running my app. Pls refer to the attached pic for more info. I also checked http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/connectivity_issues.html, and set SPARK_LOCAL_IP in every node's spark-env.sh of my spark cluster. However it does not benifit in solving this problem. I am not sure if this parameter is correctly set,my setting is like this: On node1: export SPARK_LOCAL_IP={node1's IP} On node2: export SPARK_LOCAL_IP={node2's IP} .. BTW,I guess that the akka will retry 3 times when communicate between master and slave, it is possible to increase the akka retries? And except expand the network bandwidth, is there another way to solve this problem? thanks for any coming ideas. Thanksamp;Best regards! San.Luo
Re: text file stream to HDFS
Please take a look at streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala def saveAsHadoopFiles[F : OutputFormat[K, V]]( prefix: String, suffix: String )(implicit fm: ClassTag[F]): Unit = ssc.withScope { Cheers On Sat, Jul 4, 2015 at 5:23 PM, ravi tella ddpis...@gmail.com wrote: Hello, How should I write a text file stream DStream to HDFS. I tried the the following val lines = ssc.textFileStream(hdfs:/user/hadoop/spark/streaming/input/) lines.saveAsTextFile(hdfs:/user/hadoop/output1) val lines = ssc.textFileStream(hdfs:/user/hadoop/spark/streaming/input/) lines.saveAsHadoopFiles(hdfs:/user/hadoop/output1,out) in both cases i received compile errors saying: is not a member of org.apache.spark.streaming.dstream.DStream[String] Thanks in advance for help.
Splitting dataframe using Spark 1.4 for nested json input
Hello, I am having issues with splitting contents of a dataframe column using Spark 1.4. The dataframe was created by reading a nested complex json file. I used df.explode but keep getting error message. scala val df = sqlContext.read.json(/Users/xx/target/statsfile.json) scala df.show() +++ | mi|neid| +++ |[900,[pmEs,pmS...|[SubNetwork=ONRM_...| |[900,[pmIcmpInEr...|[SubNetwork=ONRM_...| |[900,pmUnsuccessf...|[SubNetwork=ONRM_...| +++ scala df.printSchema() root |-- mi: struct (nullable = true) ||-- gp: long (nullable = true) ||-- mt: string (nullable = true) ||-- mts: string (nullable = true) ||-- mv: string (nullable = true) |-- neid: struct (nullable = true) ||-- nedn: string (nullable = true) ||-- nesw: string (nullable = true) ||-- neun: string (nullable = true) scala val df1=df.select(mi.mv²) df1: org.apache.spark.sql.DataFrame = [mv: string] scala val df1=df.select(mi.mv).show() ++ | mv| ++ |[{r:[0,0,0],mo...| |{r:[0,4,0,4],m...| |{r:5,moid:Ma...| ++ scala df1.explode(mv,mvnew)(mv = mv.split(,)) console:28: error: value split is not a member of Nothing df1.explode(mv,mvnew)(mv = mv.split(,)) The json file format looks like [ { neid:{ }, mi:{ mts:20100609071500Z, gp:900, tMOID:Aal2Ap, mt:[ ], mv:[ { moid:ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q, r: [ .] }, { moid:ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1556q, r: [ .] } ] } } ] Am i doing something wrong? I need to extract data under mi.mv in separate columns so i can apply some transformations. Regards Mike
Restarting Spark Streaming Application with new code
Hi, Just looking for some clarity on the below 1.4 documentation. And restarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors. Does this mean, new code cannot be deployed over the same checkpoints even if there are not any serialization related changes? (in other words, if the new code does not break previous checkpoint code w.r.t serialization, would new deploys work?) In this case, either start the upgraded app with a different checkpoint directory, or delete the previous checkpoint directory. Assuming this applies to metadata data checkpointing, does it mean that effectively all the computed 'state' is gone? If I am reading from Kafka, does the new code start receiving messages from where it left off? Thanks Vinoth
RE: Feature Generation On Spark
Hi Thanks, I guess this will solve my problem. I will load mutiple files using wildcard's likes *.csv. I guess if I use wholeTextFile instead of textFile, I will get whole file contents as value which will in turn ensure one feature vector per file. thanksNitin Date: Sat, 4 Jul 2015 09:37:52 -0400 Subject: Re: Feature Generation On Spark From: mici...@gmail.com To: rishikeshtha...@hotmail.com CC: user@spark.apache.org Spark Context has a method wholeTextFiles. Is that what you need? On 4 July 2015 at 07:04, rishikesh rishikeshtha...@hotmail.com wrote: Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Feature Generation On Spark
I have one document per file and each file is to be converted to a feature vector. Pretty much like standard feature construction for document classification. ThanksRishi Date: Sun, 5 Jul 2015 01:44:04 +1000 Subject: Re: Feature Generation On Spark From: guha.a...@gmail.com To: mici...@gmail.com CC: rishikeshtha...@hotmail.com; user@spark.apache.org Do you have one document per file or multiple document in the file? On 4 Jul 2015 23:38, Michal Čizmazia mici...@gmail.com wrote: Spark Context has a method wholeTextFiles. Is that what you need? On 4 July 2015 at 07:04, rishikesh rishikeshtha...@hotmail.com wrote: Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JDBC Streams
Hi All I have a requireent to connect to a DB every few minutes and bring data to HBase. Can anyone suggest if spark streaming would be appropriate for this senario or I shoud look into jobserver? Thanks in advance -- Best Regards, Ayan Guha
Re: mvn build hangs on: Dependency-reduced POM written at bagel/dependency-reduced-pom.xml
See this thread: http://search-hadoop.com/m/q3RTt4CqUGAvnPj2/Spark+master+buildsubj=Re+Can+not+build+master On Jul 4, 2015, at 9:44 PM, Alec Taylor alec.tayl...@gmail.com wrote: Running: `build/mvn -DskipTests clean package` on Ubuntu 15.04 (amd64, 3.19.0-21-generic) with Apache Maven 3.3.3 starts to build fine, then just keeps outputting these lines: [INFO] Dependency-reduced POM written at: /spark/bagel/dependency-reduced-pom.xml I've kept it running for an hour. How do I build Spark? Thanks for all suggestions - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mvn build hangs on: Dependency-reduced POM written at bagel/dependency-reduced-pom.xml
Thanks, will just build from spark-1.4.0.tgz in the meantime. On Sun, Jul 5, 2015 at 2:52 PM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTt4CqUGAvnPj2/Spark+master+buildsubj=Re+Can+not+build+master On Jul 4, 2015, at 9:44 PM, Alec Taylor alec.tayl...@gmail.com wrote: Running: `build/mvn -DskipTests clean package` on Ubuntu 15.04 (amd64, 3.19.0-21-generic) with Apache Maven 3.3.3 starts to build fine, then just keeps outputting these lines: [INFO] Dependency-reduced POM written at: /spark/bagel/dependency-reduced-pom.xml I've kept it running for an hour. How do I build Spark? Thanks for all suggestions
Spark got stuck with BlockManager after computing connected components using GraphX
I'm computing connected components using Spark GraphX on AWS EC2. I believe the computation was successful, as I saw the type information of the final result. However, it looks like Spark was doing some cleanup. The BlockManager removed a bunch of blocks and stuck at 15/07/04 21:53:06 INFO storage.BlockManager: Removing block rdd_334_4 15/07/04 21:53:06 INFO storage.MemoryStore: Block rdd_334_4 of size 25986936 dropped from memory (free 15648106262) There was no error message, no update for like an hour. If I press the Enter key, I got disconnected from the cluster. Does anyone happen to know what's going on here? I used 8 r3.4xlarge instances. I have 7 million edges and 200 million vertices. Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-got-stuck-with-BlockManager-after-computing-connected-components-using-GraphX-tp23620.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
text file stream to HDFS
Hello, How should I write a text file stream DStream to HDFS. I tried the the following val lines = ssc.textFileStream(hdfs:/user/hadoop/spark/streaming/input/) lines.saveAsTextFile(hdfs:/user/hadoop/output1) val lines = ssc.textFileStream(hdfs:/user/hadoop/spark/streaming/input/) lines.saveAsHadoopFiles(hdfs:/user/hadoop/output1,out) in both cases i received compile errors saying: is not a member of org.apache.spark.streaming.dstream.DStream[String] Thanks in advance for help.
mvn build hangs on: Dependency-reduced POM written at bagel/dependency-reduced-pom.xml
Running: `build/mvn -DskipTests clean package` on Ubuntu 15.04 (amd64, 3.19.0-21-generic) with Apache Maven 3.3.3 starts to build fine, then just keeps outputting these lines: [INFO] Dependency-reduced POM written at: /spark/bagel/dependency-reduced-pom.xml I've kept it running for an hour. How do I build Spark? Thanks for all suggestions
Re: Are Spark Streaming RDDs always processed in order?
I had a similar inquiry, copied below. I was also looking into making an SQS Receiver reliable: http://stackoverflow.com/questions/30809975/reliable-sqs-receiver-for-spark-streaming Hope this helps. -- Forwarded message -- From: Tathagata Das t...@databricks.com Date: 20 June 2015 at 17:21 Subject: Re: Serial batching with Spark Streaming To: Michal Čizmazia mici...@gmail.com Cc: Binh Nguyen Van binhn...@gmail.com, user user@spark.apache.org No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 3 July 2015 at 22:12, khaledh khal...@gmail.com wrote: I'm writing a Spark Streaming application that uses RabbitMQ to consume events. One feature of RabbitMQ that I intend to make use of is bulk ack of messages, i.e. no need to ack one-by-one, but only ack the last event in a batch and that would ack the entire batch. Before I commit to doing so, I'd like to know if Spark Streaming always processes RDDs in the same order they arrive in, i.e. if RDD1 arrives before RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is finished? This is crucial to the ack logic, since if RDD2 can be potentially processed while RDD1 is still being processed, then if I ack the the last event in RDD2 that would also ack all events in RDD1, even though they may have not been completely processed yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Feature Generation On Spark
Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org