how to print auc & prc for GBTClassifier, which is okay for RandomForestClassifier
Hi All, I need to print auc and prc for GBTClassifier model, it seems okay for RandomForestClassifier but not GBTClassifier, though rawPrediction column is neither in original data. the codes are : .. // Set up Pipeline val stages = new mutable.ArrayBuffer[PipelineStage]() val labelColName = if (algo == "GBTClassification") "indexedLabel" else "label" if (algo == "GBTClassification") { val labelIndexer = new StringIndexer() .setInputCol("label") .setOutputCol(labelColName) stages += labelIndexer } val rawFeatureSize = data.select("rawFeatures").first().toString().split(",").length; var indices : Array[Int] = new Array[Int](rawFeatureSize); for (i <- 0 until rawFeatureSize) { indices(i) = i; } val featuresSlicer = new VectorSlicer() .setInputCol("rawFeatures") .setOutputCol("features") .setIndices(indices) stages += featuresSlicer val dt = algo match { // THE PROBLEM IS HERE: //GBTClassifier will not work, error is that field rawPrediction is not there, which appeared in the last line of code as pipeline.fit(data) //however, the similar codes are okay for RandomForestClassifier//in fact, rawPrediction column seems not in original data, but generated in BinaryClassificationEvaluator pipelineModel by auto case "GBTClassification" => new GBTClassifier() .setFeaturesCol("features") .setLabelCol(labelColName) .setLabelCol(labelColName) case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") } val grid = new ParamGridBuilder() .addGrid(dt.maxDepth, Array(1)) .addGrid(dt.subsamplingRate, Array(0.5)) .build() val cv = new CrossValidator() .setEstimator(dt) .setEstimatorParamMaps(grid) .setEvaluator((new BinaryClassificationEvaluator)) .setNumFolds(6) stages += cv val pipeline = new Pipeline().setStages(stages.toArray) // Fit the Pipeline val pipelineModel = pipeline.fit(data) Thanks in advance ~~ Zhiliang
Re: how to see Pipeline model information
I have worked it out, just let java call scala class function .Thank Xiaomeng a lot~~ On Friday, November 25, 2016 1:50 AM, Xiaomeng Wan wrote: here is the scala code I use to get the best model, I never used java val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(new RegressionEvaluator).setEstimatorParamMaps(paramGrid) val cvModel = cv.fit(data) val plmodel = cvModel.bestModel.asInstanceOf[PipelineModel] val lrModel = plmodel.stages(0).asInstanceOf[LinearRegressionModel] On 24 November 2016 at 10:23, Zhiliang Zhu wrote: Hi Xiaomeng, Thanks very much for your comment, which is helpful for me. However, it seems that here met more issue about XXXClassifier and XXXClassificationModel,as the codes below: ... GBTClassifier gbtModel = new GBTClassifier(); ParamMap[] grid = new ParamGridBuilder() .addGrid(gbtModel.maxIter(), new int[] {5}) .addGrid(gbtModel.maxDepth(), new int[] {5}) .build(); CrossValidator crossValidator = new CrossValidator() .setEstimator(gbtModel) //rfModel .setEstimatorParamMaps(grid) .setEvaluator(new BinaryClassificationEvaluator( )) .setNumFolds(6); Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {labelIndexer, vectorSlicer, crossValidator}); PipelineModel plModel = pipeline.fit(data); ArrayList m = new ArrayList (); m.add(plModel); JAVA_SPARK_CONTEXT. parallelize(m, 1).saveAsObjectFile(this. outputPath + POST_MODEL_PATH); Transformer[] stages = plModel.stages(); Transformer cvStage = stages[2]; CrossValidator crossV = new TR2CVConversion(cvStage). getInstanceOfCrossValidator(); //call self defined scala class Estimator estimator = crossV.getEstimator(); GBTClassifier gbt = (GBTClassifier)estimator; //all the above is okay to compile, but it is wrong to compile for next line//however, in GBTClassifier seems not much detailed model description to get//but by GBTClassificationModel. toString(), we may get the specific trees which are just I want GBTClassificationModel model = (GBTClassificationModel)get; //wrong to compile Then how to get the specific trees or forest from the model?Thanks in advance~ Zhiliang On Thursday, November 24, 2016 2:15 AM, Xiaomeng Wan wrote: You can use pipelinemodel.stages(0). asInstanceOf[ RandomForestModel]. The number (0 in example) for stages depends on the order you call setStages. Shawn On 23 November 2016 at 10:21, Zhiliang Zhu wrote: Dear All, I am building model by spark pipeline, and in the pipeline I used Random Forest Alg as its stage. If I just use Random Forest but not make it by way of pipeline, I could see the information about the forest by API as rfModel.toDebugString() and rfModel.toString() . However, while it comes to pipeline, how to check the alg information, such as the tree, or the threshold selected by lr etc ... Thanks in advance~~ zhiliang -- -- - To unsubscribe e-mail: user-unsubscribe@spark.apache. org
Re: get specific tree or forest structure from pipeline model
scala codes are also for me, if there is some solution . On Friday, November 25, 2016 1:27 AM, Zhiliang Zhu wrote: Hi All, Here want to print the specific tree or forest structure from pipeline model. However, it seems that here met more issue about XXXClassifier and XXXClassificationModel, as the codes below: ... GBTClassifier gbtModel = new GBTClassifier(); ParamMap[] grid = new ParamGridBuilder() .addGrid(gbtModel.maxIter(), new int[] {5}) .addGrid(gbtModel.maxDepth(), new int[] {5}) .build(); CrossValidator crossValidator = new CrossValidator() .setEstimator(gbtModel) //rfModel .setEstimatorParamMaps(grid) .setEvaluator(new BinaryClassificationEvaluator()) .setNumFolds(6); Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {labelIndexer, vectorSlicer, crossValidator}); PipelineModel plModel = pipeline.fit(data); ArrayList m = new ArrayList (); m.add(plModel); JAVA_SPARK_CONTEXT.parallelize(m, 1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH); Transformer[] stages = plModel.stages(); Transformer cvStage = stages[2]; CrossValidator crossV = new TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined scala class Estimator estimator = crossV.getEstimator(); GBTClassifier gbt = (GBTClassifier)estimator; //all the above is okay to compile, but it is wrong to compile for next line//however, in GBTClassifier seems not much detailed model description to get//but by GBTClassificationModel.toString(), we may get the specific trees which are just I want GBTClassificationModel model = (GBTClassificationModel)get; //wrong to compile Then how to get the specific trees or forest from the model?Thanks in advance~ Zhiliang
get specific tree or forest structure from pipeline model
Hi All, Here want to print the specific tree or forest structure from pipeline model. However, it seems that here met more issue about XXXClassifier and XXXClassificationModel, as the codes below: ... GBTClassifier gbtModel = new GBTClassifier(); ParamMap[] grid = new ParamGridBuilder() .addGrid(gbtModel.maxIter(), new int[] {5}) .addGrid(gbtModel.maxDepth(), new int[] {5}) .build(); CrossValidator crossValidator = new CrossValidator() .setEstimator(gbtModel) //rfModel .setEstimatorParamMaps(grid) .setEvaluator(new BinaryClassificationEvaluator()) .setNumFolds(6); Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {labelIndexer, vectorSlicer, crossValidator}); PipelineModel plModel = pipeline.fit(data); ArrayList m = new ArrayList (); m.add(plModel); JAVA_SPARK_CONTEXT.parallelize(m, 1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH); Transformer[] stages = plModel.stages(); Transformer cvStage = stages[2]; CrossValidator crossV = new TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined scala class Estimator estimator = crossV.getEstimator(); GBTClassifier gbt = (GBTClassifier)estimator; //all the above is okay to compile, but it is wrong to compile for next line//however, in GBTClassifier seems not much detailed model description to get//but by GBTClassificationModel.toString(), we may get the specific trees which are just I want GBTClassificationModel model = (GBTClassificationModel)get; //wrong to compile Then how to get the specific trees or forest from the model?Thanks in advance~ Zhiliang
Re: how to see Pipeline model information
Hi Xiaomeng, Thanks very much for your comment, which is helpful for me. However, it seems that here met more issue about XXXClassifier and XXXClassificationModel,as the codes below: ... GBTClassifier gbtModel = new GBTClassifier(); ParamMap[] grid = new ParamGridBuilder() .addGrid(gbtModel.maxIter(), new int[] {5}) .addGrid(gbtModel.maxDepth(), new int[] {5}) .build(); CrossValidator crossValidator = new CrossValidator() .setEstimator(gbtModel) //rfModel .setEstimatorParamMaps(grid) .setEvaluator(new BinaryClassificationEvaluator()) .setNumFolds(6); Pipeline pipeline = new Pipeline() .setStages(new PipelineStage[] {labelIndexer, vectorSlicer, crossValidator}); PipelineModel plModel = pipeline.fit(data); ArrayList m = new ArrayList (); m.add(plModel); JAVA_SPARK_CONTEXT.parallelize(m, 1).saveAsObjectFile(this.outputPath + POST_MODEL_PATH); Transformer[] stages = plModel.stages(); Transformer cvStage = stages[2]; CrossValidator crossV = new TR2CVConversion(cvStage).getInstanceOfCrossValidator(); //call self defined scala class Estimator estimator = crossV.getEstimator(); GBTClassifier gbt = (GBTClassifier)estimator; //all the above is okay to compile, but it is wrong to compile for next line//however, in GBTClassifier seems not much detailed model description to get//but by GBTClassificationModel.toString(), we may get the specific trees which are just I want GBTClassificationModel model = (GBTClassificationModel)get; //wrong to compile Then how to get the specific trees or forest from the model?Thanks in advance~ Zhiliang On Thursday, November 24, 2016 2:15 AM, Xiaomeng Wan wrote: You can use pipelinemodel.stages(0).asInstanceOf[RandomForestModel]. The number (0 in example) for stages depends on the order you call setStages. Shawn On 23 November 2016 at 10:21, Zhiliang Zhu wrote: Dear All, I am building model by spark pipeline, and in the pipeline I used Random Forest Alg as its stage. If I just use Random Forest but not make it by way of pipeline, I could see the information about the forest by API as rfModel.toDebugString() and rfModel.toString() . However, while it comes to pipeline, how to check the alg information, such as the tree, or the threshold selected by lr etc ... Thanks in advance~~ zhiliang -- -- - To unsubscribe e-mail: user-unsubscribe@spark.apache. org
how to see Pipeline model information
Dear All, I am building model by spark pipeline, and in the pipeline I used Random Forest Alg as its stage. If I just use Random Forest but not make it by way of pipeline, I could see the information about the forest by API as rfModel.toDebugString() and rfModel.toString() . However, while it comes to pipeline, how to check the alg information, such as the tree, or the threshold selected by lr etc ... Thanks in advance~~ zhiliang - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark ml : auc on extreme distributed data
Hi All, Here I have lot of data with around 1,000,000 rows, 97% of them are negative class and 3% of them are positive class . I applied Random Forest algorithm to build the model and predict the testing data. For the data preparation,i. firstly randomly split all the data as training data and testing data by 0.7 : 0.3ii. let the testing data unchanged, its negative and positive class ratio would still be 97:3iii. try to make the training data negative and positive class ratio as 50:50, by way of sample algorithm in the different classesiv. get RF model by training data and predict testing data by modifying algorithm parameters and feature work (PCA etc ), it seems that the auc on the testing data is always above 0.8, or much more higher ... Then I lose into some confusion... It seems that the model or auc depends a lot on the original data distribution...In effect, I would like to know, for this data distribution, how its auc would be for random guess?What the auc would be for any kind of data distribution? Thanks in advance~~
Re: the spark job is so slow - almost frozen
Thanks a lot for your kind help. On Wednesday, July 20, 2016 11:35 AM, Andrew Ehrlich wrote: Try: - filtering down the data as soon as possible in the job, dropping columns you don’t need.- processing fewer partitions of the hive tables at a time- caching frequently accessed data, for example dimension tables, lookup tables, or other datasets that are repeatedly accessed- using the Spark UI to identify the bottlenecked resource- remove features or columns from the output data, until it runs, then add them back in one at a time.- creating a static dataset small enough to work, and editing the query, then retesting, repeatedly until you cut the execution time by a significant fraction- Using the Spark UI or spark shell to check the skew and make sure partitions are evenly distributed On Jul 18, 2016, at 3:33 AM, Zhiliang Zhu wrote: Thanks a lot for your reply . In effect , here we tried to run the sql on kettle, hive and spark hive (by HiveContext) respectively, the job seems frozen to finish to run . In the 6 tables , need to respectively read the different columns in different tables for specific information , then do some simple calculation before output . join operation is used most in the sql . Best wishes! On Monday, July 18, 2016 6:24 PM, Chanh Le wrote: Hi,What about the network (bandwidth) between hive and spark? Does it run in Hive before then you move to Spark?Because It's complex you can use something like EXPLAIN command to show what going on. On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu wrote: the sql logic in the program is very much complex , so do not describe the detailed codes here . On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu wrote: Hi All, Here we have one application, it needs to extract different columns from 6 hive tables, and then does some easy calculation, there is around 100,000 number of rows in each table,finally need to output another table or file (with format of consistent columns) . However, after lots of days trying, the spark hive job is unthinkably slow - sometimes almost frozen. There is 5 nodes for spark cluster. Could anyone offer some help, some idea or clue is also good. Thanks in advance~ Zhiliang
the spark job is so slow during shuffle - almost frozen
Show original message Hi All , While referring to spark UI , displayed as 198/200 - almost frozen...during shuffle stage of one task, most of the executor is with 0 byte, but just one executor is with 1 G . moreover, in the several join operation , some case is like this, one table or pairrdd is only with 40 keys, but the other table is with 10, 000 number keys. Then, could it be decided some issue as data skew ... Any help or comment will be deep appreciated . Thanks in advance ~ Here we have one application, it needs to extract different columns from 6 hive tables, and then does some easy calculation, there is around 100,000 number of rows in each table, finally need to output another table or file (with format of consistent columns) . However, after lots of days trying, the spark hive job is unthinkably slow - sometimes almost frozen. There is 5 nodes for spark cluster. Could anyone offer some help, some idea or clue is also good. Thanks in advance~ On Tuesday, July 19, 2016 11:05 AM, Zhiliang Zhu wrote: Show original message Hi Mungeol, Thanks a lot for your help. I will try that. On Tuesday, July 19, 2016 9:21 AM, Mungeol Heo wrote: Try to run a action at a Intermediate stage of your job process. Like save, insertInto, etc. Wish it can help you out. On Mon, Jul 18, 2016 at 7:33 PM, Zhiliang Zhu wrote: > Thanks a lot for your reply . > > In effect , here we tried to run the sql on kettle, hive and spark hive (by > HiveContext) respectively, the job seems frozen to finish to run . > > In the 6 tables , need to respectively read the different columns in > different tables for specific information , then do some simple calculation > before output . > join operation is used most in the sql . > > Best wishes! > > > > > On Monday, July 18, 2016 6:24 PM, Chanh Le wrote: > > > Hi, > What about the network (bandwidth) between hive and spark? > Does it run in Hive before then you move to Spark? > Because It's complex you can use something like EXPLAIN command to show what > going on. > > > > > > > On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu > wrote: > > the sql logic in the program is very much complex , so do not describe the > detailed codes here . > > > On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu > wrote: > > > Hi All, > > Here we have one application, it needs to extract different columns from 6 > hive tables, and then does some easy calculation, there is around 100,000 > number of rows in each table, > finally need to output another table or file (with format of consistent > columns) . > > However, after lots of days trying, the spark hive job is unthinkably slow > - sometimes almost frozen. There is 5 nodes for spark cluster. > > Could anyone offer some help, some idea or clue is also good. > > Thanks in advance~ > > Zhiliang >
Re: Spark driver getting out of memory
try to set --drive-memory xg , x would be as large as can be set . On Monday, July 18, 2016 6:31 PM, Saurav Sinha wrote: Hi, I am running spark job. Master memory - 5Gexecutor memort 10G(running on 4 node) My job is getting killed as no of partition increase to 20K. 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage 640(foreachPartition at WriteToKafka.java:45)16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 518, ShuffleMapStage 639)16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640 (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no missing parents16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with curMem=41923262, maxMem=277877882816/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as values in memory (estimated size 8.1 KB, free 2.5 GB)Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.io.ByteArrayChunkOutputStream.allocateNewChunkIfNeeded(ByteArrayChunkOutputStream.scala:66) at org.apache.spark.util.io.ByteArrayChunkOutputStream.write(ByteArrayChunkOutputStream.scala:55) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:294) at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:273) at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:197) at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822) Help needed. -- Thanks and Regards, Saurav Sinha Contact: 9742879062
Re: the spark job is so slow - almost frozen
Thanks a lot for your reply . In effect , here we tried to run the sql on kettle, hive and spark hive (by HiveContext) respectively, the job seems frozen to finish to run . In the 6 tables , need to respectively read the different columns in different tables for specific information , then do some simple calculation before output . join operation is used most in the sql . Best wishes! On Monday, July 18, 2016 6:24 PM, Chanh Le wrote: Hi,What about the network (bandwidth) between hive and spark? Does it run in Hive before then you move to Spark?Because It's complex you can use something like EXPLAIN command to show what going on. On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu wrote: the sql logic in the program is very much complex , so do not describe the detailed codes here . On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu wrote: Hi All, Here we have one application, it needs to extract different columns from 6 hive tables, and then does some easy calculation, there is around 100,000 number of rows in each table,finally need to output another table or file (with format of consistent columns) . However, after lots of days trying, the spark hive job is unthinkably slow - sometimes almost frozen. There is 5 nodes for spark cluster. Could anyone offer some help, some idea or clue is also good. Thanks in advance~ Zhiliang
Re: the spark job is so slow - almost frozen
the sql logic in the program is very much complex , so do not describe the detailed codes here . On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu wrote: Hi All, Here we have one application, it needs to extract different columns from 6 hive tables, and then does some easy calculation, there is around 100,000 number of rows in each table,finally need to output another table or file (with format of consistent columns) . However, after lots of days trying, the spark hive job is unthinkably slow - sometimes almost frozen. There is 5 nodes for spark cluster. Could anyone offer some help, some idea or clue is also good. Thanks in advance~ Zhiliang
the spark job is so slow - almost frozen
Hi All, Here we have one application, it needs to extract different columns from 6 hive tables, and then does some easy calculation, there is around 100,000 number of rows in each table,finally need to output another table or file (with format of consistent columns) . However, after lots of days trying, the spark hive job is unthinkably slow - sometimes almost frozen. There is 5 nodes for spark cluster. Could anyone offer some help, some idea or clue is also good. Thanks in advance~ Zhiliang
Re: spark job automatically killed without rhyme or reason
Thanks a lot for all the comments, and the useful information . Yes, I have much experience to write and run spark jobs, something unstable will be there while it run on more data or more time. Sometimes it would be not okay while reset some parameter in command line, but will be okay while removing it by using default setting. Sometimes it is opposite, proper parameter setting needs to be set. Here is installing spark 1.5 by other person. On Wednesday, June 22, 2016 1:59 PM, Nirav Patel wrote: spark is memory hogger and suicidal if you have a job processing bigger dataset. however databricks claims that spark > 1.6 have optimization related to memory footprint as well as processing. It will only be available if you use dataframe or dataset. if you are using rdd you have to do lot of testing and tuning. On Mon, Jun 20, 2016 at 1:34 AM, Sean Owen wrote: I'm not sure that's the conclusion. It's not trivial to tune and configure YARN and Spark to match your app's memory needs and profile, but, it's also just a matter of setting them properly. I'm not clear you've set the executor memory for example, in particular spark.yarn.executor.memoryOverhead Everything else you mention is a symptom of YARN shutting down your jobs because your memory settings don't match what your app does. They're not problems per se, based on what you have provided. On Mon, Jun 20, 2016 at 9:17 AM, Zhiliang Zhu wrote: > Hi Alexander , > > Thanks a lot for your comments. > > Spark seems not that stable when it comes to run big job, too much data or > too much time, yes, the problem is gone when reducing the scale. > Sometimes reset some job running parameter (such as --drive-memory may help > in GC issue) , sometimes may rewrite the codes by applying other algorithm. > > As you commented the shuffle operation, it sounds some as the reason ... > > Best Wishes ! > > > > On Friday, June 17, 2016 8:45 PM, Alexander Kapustin > wrote: > > > Hi Zhiliang, > > Yes, find the exact reason of failure is very difficult. We have issue with > similar behavior, due to limited time for investigation, we reduce the > number of processed data, and problem has gone. > > Some points which may help you in investigations: > · If you start spark-history-server (or monitoring running > application on 4040 port), look into failed stages (if any). By default > Spark try to retry stage execution 2 times, after that job fails > · Some useful information may contains in yarn logs on Hadoop nodes > (yarn--nodemanager-.log), but this is only information about > killed container, not about the reasons why this stage took so much memory > > As I can see in your logs, failed step relates to shuffle operation, could > you change your job to avoid massive shuffle operation? > > -- > WBR, Alexander > > From: Zhiliang Zhu > Sent: 17 июня 2016 г. 14:10 > To: User; kp...@hotmail.com > Subject: Re: spark job automatically killed without rhyme or reason > > > Show original message > > > Hi Alexander, > > is your yarn userlog just for the executor log ? > > as for those logs seem a little difficult to exactly decide the wrong point, > due to sometimes successful job may also have some kinds of the error ... > but will repair itself. > spark seems not that stable currently ... > > Thank you in advance~ > > > > On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu wrote: > > > Hi Alexander, > > Thanks a lot for your reply. > > Yes, submitted by yarn. > Do you just mean in the executor log file by way of yarn logs -applicationId > id, > > in this file, both in some containers' stdout and stderr : > > 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive > connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. > 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while > beginning fetch of 1 outstanding blocks > java.io.IOException: Failed to connect to > ip-172-31-20-104/172.31.20.104:49991 <-- may it be due to > that spark is not stable, and spark may repair itself for these kinds of > error ? (saw some in successful run ) > > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > > Caused by: java.net.ConnectException: Connection refused: > ip-172-31-20-104/172.31.20.104:49991 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelIm
Re: spark job automatically killed without rhyme or reason
Hi Alexander , Thanks a lot for your comments. Spark seems not that stable when it comes to run big job, too much data or too much time, yes, the problem is gone when reducing the scale.Sometimes reset some job running parameter (such as --drive-memory may help in GC issue) , sometimes may rewrite the codes by applying other algorithm. As you commented the shuffle operation, it sounds some as the reason ... Best Wishes ! On Friday, June 17, 2016 8:45 PM, Alexander Kapustin wrote: #yiv4291334619 #yiv4291334619 -- _filtered #yiv4291334619 {font-family:Wingdings;panose-1:5 0 0 0 0 0 0 0 0 0;} _filtered #yiv4291334619 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv4291334619 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv4291334619 #yiv4291334619 p.yiv4291334619MsoNormal, #yiv4291334619 li.yiv4291334619MsoNormal, #yiv4291334619 div.yiv4291334619MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv4291334619 a:link, #yiv4291334619 span.yiv4291334619MsoHyperlink {color:blue;text-decoration:underline;}#yiv4291334619 a:visited, #yiv4291334619 span.yiv4291334619MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv4291334619 p.yiv4291334619MsoListParagraph, #yiv4291334619 li.yiv4291334619MsoListParagraph, #yiv4291334619 div.yiv4291334619MsoListParagraph {margin-top:0cm;margin-right:0cm;margin-bottom:0cm;margin-left:36.0pt;margin-bottom:.0001pt;font-size:11.0pt;}#yiv4291334619 span.yiv4291334619qtd-expansion-text {}#yiv4291334619 .yiv4291334619MsoChpDefault {} _filtered #yiv4291334619 {margin:2.0cm 42.5pt 2.0cm 3.0cm;}#yiv4291334619 div.yiv4291334619WordSection1 {}#yiv4291334619 _filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Symbol;} _filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Wingdings;} _filtered #yiv4291334619 {font-family:Symbol;} _filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Wingdings;} _filtered #yiv4291334619 {font-family:Symbol;} _filtered #yiv4291334619 {} _filtered #yiv4291334619 {font-family:Wingdings;}#yiv4291334619 ol {margin-bottom:0cm;}#yiv4291334619 ul {margin-bottom:0cm;}#yiv4291334619 Hi Zhiliang, Yes, find the exact reason of failure is very difficult. We have issue with similar behavior, due to limited time for investigation, we reduce the number of processed data, and problem has gone. Some points which may help you in investigations: · If you start spark-history-server (or monitoring running application on 4040 port), look into failed stages (if any). By default Spark try to retry stage execution 2 times, after that job fails·Some useful information may contains in yarn logs on Hadoop nodes (yarn--nodemanager-.log), but this is only information about killed container, not about the reasons why this stage took so much memory As I can see in your logs, failed step relates to shuffle operation, could you change your job to avoid massive shuffle operation? --WBR, Alexander From: Zhiliang Zhu Sent: 17 июня 2016 г. 14:10 To: User; kp...@hotmail.com Subject: Re: spark job automatically killed without rhyme or reason Show original message Hi Alexander, is your yarn userlog just for the executor log ? as for those logs seem a little difficult to exactly decide the wrong point, due to sometimes successful job may also have some kinds of the error ... but will repair itself.spark seems not that stable currently ... Thank you in advance~ On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu wrote: Hi Alexander, Thanks a lot for your reply. Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn logs -applicationId id, in this file, both in some containers' stdout and stderr : 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocksjava.io.IOException:Failed to connect to ip-172-31-20-104/172.31.20.104:49991 <--may it be due to that spark is not stable, and spark may repair itself for these kinds of error ? (saw some in successful run ) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused by: java.net.ConnectException: Connection refused: ip-172-31-20-104/172.31.20.104:49991 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSel
Re: spark job automatically killed without rhyme or reason
Show original message Hi Alexander, is your yarn userlog just for the executor log ? as for those logs seem a little difficult to exactly decide the wrong point, due to sometimes successful job may also have some kinds of the error ... but will repair itself.spark seems not that stable currently ... Thank you in advance~ On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu wrote: Hi Alexander, Thanks a lot for your reply. Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn logs -applicationId id, in this file, both in some containers' stdout and stderr : 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocksjava.io.IOException: Failed to connect to ip-172-31-20-104/172.31.20.104:49991 <-- may it be due to that spark is not stable, and spark may repair itself for these kinds of error ? (saw some in successful run ) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused by: java.net.ConnectException: Connection refused: ip-172-31-20-104/172.31.20.104:49991 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 16/06/17 11:54:38 ERROR executor.Executor: Managed memory leak detected; size = 16777216 bytes, TID = 100323 <- would it be memory leak issue? though no GC exception threw for other normal kinds of out of memory 16/06/17 11:54:38 ERROR executor.Executor: Exception in task 145.0 in stage 112.0 (TID 100323)java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:837) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:679) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635)... sorry, there is some information in the middle of the log file, but all is okay at the end part of the log .in the run log file as log_file generated by command:nohup spark-submit --driver-memory 20g --num-executors 20 --class com.dianrong.Main --master yarn-client dianrong-retention_2.10-1.0.jar doAnalysisExtremeLender /tmp/drretention/test/output 0.96 /tmp/drretention/evaluation/test_karthik/lgmodel /tmp/drretention/input/feature_6.0_20151001_20160531_behavior_201511_201604_summary/lenderId_feature_live 50 > log_file executor 40 lost <-- would it be due to this, sometimes job may fail for the reason .. at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635).. Thanks in advance! On Friday, June 17, 2016 3:52 PM, Alexander Kapustin wrote: #yiv7679307012 -- filtered {panose-1:2 4 5 3 5 4 6 3 2 4;}#yiv7679307012 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv7679307012 p.yiv7679307012MsoNormal, #yiv7679307012 li.yiv7679307012MsoNormal, #yiv7679307012 div.yiv7679307012MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv7679307012 a:link, #yiv7679307012 span.yiv7679307012MsoHyperlink {color:blue;text-decoration:underline;}#yiv7679307012 a:visited, #yiv7679307012 span.yiv7679307012MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv7679307012 .yiv7679307012MsoChpDefault {}#yiv7679307012 filtered {margin:2.0cm 42.5pt 2.0cm 3.0cm;}#yiv7679307012 div.yiv7679307012WordSection1 {}#yiv76793070
Re: spark job automatically killed without rhyme or reason
Hi Alexander, Thanks a lot for your reply. Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn logs -applicationId id, in this file, both in some containers' stdout and stderr : 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocksjava.io.IOException: Failed to connect to ip-172-31-20-104/172.31.20.104:49991 <-- may it be due to that spark is not stable, and spark may repair itself for these kinds of error ? (saw some in successful run ) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused by: java.net.ConnectException: Connection refused: ip-172-31-20-104/172.31.20.104:49991 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 16/06/17 11:54:38 ERROR executor.Executor: Managed memory leak detected; size = 16777216 bytes, TID = 100323 <- would it be memory leak issue? though no GC exception threw for other normal kinds of out of memory 16/06/17 11:54:38 ERROR executor.Executor: Exception in task 145.0 in stage 112.0 (TID 100323)java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:837) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:679) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635)... sorry, there is some information in the middle of the log file, but all is okay at the end part of the log .in the run log file as log_file generated by command:nohup spark-submit --driver-memory 20g --num-executors 20 --class com.dianrong.Main --master yarn-client dianrong-retention_2.10-1.0.jar doAnalysisExtremeLender /tmp/drretention/test/output 0.96 /tmp/drretention/evaluation/test_karthik/lgmodel /tmp/drretention/input/feature_6.0_20151001_20160531_behavior_201511_201604_summary/lenderId_feature_live 50 > log_file executor 40 lost <-- would it be due to this, sometimes job may fail for the reason .. at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635).. Thanks in advance! On Friday, June 17, 2016 3:52 PM, Alexander Kapustin wrote: #yiv1365829940 -- filtered {panose-1:2 4 5 3 5 4 6 3 2 4;}#yiv1365829940 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv1365829940 p.yiv1365829940MsoNormal, #yiv1365829940 li.yiv1365829940MsoNormal, #yiv1365829940 div.yiv1365829940MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv1365829940 a:link, #yiv1365829940 span.yiv1365829940MsoHyperlink {color:blue;text-decoration:underline;}#yiv1365829940 a:visited, #yiv1365829940 span.yiv1365829940MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv1365829940 .yiv1365829940MsoChpDefault {}#yiv1365829940 filtered {margin:2.0cm 42.5pt 2.0cm 3.0cm;}#yiv1365829940 div.yiv1365829940WordSection1 {}#yiv1365829940 Hi, Did you submit spark job via YARN? In some cases (memory configuration probably), yarn can kill containers where spark tasks are executed. In this situation, please check yarn userlogs for more information… --WBR, Alexander From: Zhiliang Zhu Sent: 17 июня 2016 г. 9:36 To: Zhiliang Zhu; User Subject: Re: spark job automatically killed without rhyme or reason anyone ever met th
Re: spark job automatically killed without rhyme or reason
Hi Alexander, is your yarn userlog just for the executor log ? as for those logs seem a little difficult to exactly decide the wrong point, due to sometimes successful job may also have some kinds of the error ... but will repair itself.spark seems not that stable currently ... Thank you in advance~ On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu wrote: Hi Alexander, Thanks a lot for your reply. Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn logs -applicationId id, in this file, both in some containers' stdout and stderr : 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocksjava.io.IOException: Failed to connect to ip-172-31-20-104/172.31.20.104:49991 <-- may it be due to that spark is not stable, and spark may repair itself for these kinds of error ? (saw some in successful run ) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused by: java.net.ConnectException: Connection refused: ip-172-31-20-104/172.31.20.104:49991 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 16/06/17 11:54:38 ERROR executor.Executor: Managed memory leak detected; size = 16777216 bytes, TID = 100323 <- would it be memory leak issue? though no GC exception threw for other normal kinds of out of memory 16/06/17 11:54:38 ERROR executor.Executor: Exception in task 145.0 in stage 112.0 (TID 100323)java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:837) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:679) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635)... sorry, there is some information in the middle of the log file, but all is okay at the end part of the log .in the run log file as log_file generated by command:nohup spark-submit --driver-memory 20g --num-executors 20 --class com.dianrong.Main --master yarn-client dianrong-retention_2.10-1.0.jar doAnalysisExtremeLender /tmp/drretention/test/output 0.96 /tmp/drretention/evaluation/test_karthik/lgmodel /tmp/drretention/input/feature_6.0_20151001_20160531_behavior_201511_201604_summary/lenderId_feature_live 50 > log_file executor 40 lost <-- would it be due to this, sometimes job may fail for the reason .. at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635).. Thanks in advance! On Friday, June 17, 2016 3:52 PM, Alexander Kapustin wrote: #yiv1365829940 -- filtered {panose-1:2 4 5 3 5 4 6 3 2 4;}#yiv1365829940 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv1365829940 p.yiv1365829940MsoNormal, #yiv1365829940 li.yiv1365829940MsoNormal, #yiv1365829940 div.yiv1365829940MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv1365829940 a:link, #yiv1365829940 span.yiv1365829940MsoHyperlink {color:blue;text-decoration:underline;}#yiv1365829940 a:visited, #yiv1365829940 span.yiv1365829940MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv1365829940 .yiv1365829940MsoChpDefault {}#yiv1365829940 filtered {margin:2.0cm 42.5pt 2.0cm 3.0cm;}#yiv1365829940 div.yiv1365829940WordSection1 {}#yiv1365829940 Hi, Did you submit s
Re: spark job automatically killed without rhyme or reason
Hi Alexander, Thanks a lot for your reply. Yes, submitted by yarn.Do you just mean in the executor log file by way of yarn logs -applicationId id, in this file, both in some containers' stdout and stderr : 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one. 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocksjava.io.IOException: Failed to connect to ip-172-31-20-104/172.31.20.104:49991 <-- may it be due to that spark is not stable, and spark may repair itself for these kinds of error ? (saw some in successful run ) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)Caused by: java.net.ConnectException: Connection refused: ip-172-31-20-104/172.31.20.104:49991 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 16/06/17 11:54:38 ERROR executor.Executor: Managed memory leak detected; size = 16777216 bytes, TID = 100323 <- would it be memory leak issue? though no GC exception threw for other normal kinds of out of memory 16/06/17 11:54:38 ERROR executor.Executor: Exception in task 145.0 in stage 112.0 (TID 100323)java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:837) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:679) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635)... sorry, there is some information in the middle of the log file, but all is okay at the end part of the log .in the run log file as log_file generated by command:nohup spark-submit --driver-memory 20g --num-executors 20 --class com.dianrong.Main --master yarn-client dianrong-retention_2.10-1.0.jar doAnalysisExtremeLender /tmp/drretention/test/output 0.96 /tmp/drretention/evaluation/test_karthik/lgmodel /tmp/drretention/input/feature_6.0_20151001_20160531_behavior_201511_201604_summary/lenderId_feature_live 50 > log_file executor 40 lost <-- would it be due to this, sometimes job may fail for the reason .. at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripeFooter(RecordReaderImpl.java:2265) at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.readStripe(RecordReaderImpl.java:2635).. Thanks in advance! On Friday, June 17, 2016 3:52 PM, Alexander Kapustin wrote: #yiv8423914567 #yiv8423914567 -- _filtered #yiv8423914567 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8423914567 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv8423914567 #yiv8423914567 p.yiv8423914567MsoNormal, #yiv8423914567 li.yiv8423914567MsoNormal, #yiv8423914567 div.yiv8423914567MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:11.0pt;}#yiv8423914567 a:link, #yiv8423914567 span.yiv8423914567MsoHyperlink {color:blue;text-decoration:underline;}#yiv8423914567 a:visited, #yiv8423914567 span.yiv8423914567MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv8423914567 .yiv8423914567MsoChpDefault {} _filtered #yiv8423914567 {margin:2.0cm 42.5pt 2.0cm 3.0cm;}#yiv8423914567 div.yiv8423914567WordSection1 {}#yiv8423914567 Hi, Did you submit spark job via YARN? In some cases (memory configuration probably), yarn can kill containers where spark tasks are executed. In this situation, please check yarn userlogs for more information… --WBR, Alexander From: Zhiliang Zhu Sent: 17 июня 2016 г. 9:36 To: Zhiliang Zhu; User Subject: Re: spark job automatically killed
Re: spark job automatically killed without rhyme or reason
anyone ever met the similar problem, which is quite strange ... On Friday, June 17, 2016 2:13 PM, Zhiliang Zhu wrote: Hi All, I have a big job which mainly takes more than one hour to run the whole, however, it is very much unreasonable to exit & finish to run midway (almost 80% of the job finished actually, but not all), without any apparent error or exception log. I submitted the same job for many times, it is same as that.In the last line of the run log, just one word "killed" to end, or sometimes not any other wrong log, all seems okay but should not finish. What is the way for the problem? Is there any other friends that ever met the similar issue ... Thanks in advance!
spark job killed without rhyme or reason
Hi All, I have a big job which mainly takes more than one hour to run the whole, however, it is very much unreasonable to exit & finish to run midway (almost 80% of the job finished actually, but not all), without any apparent error or exception log. I submitted the same job for many times, it is same as that.In the last line of the run log, just one word "killed" to end, or sometimes not any other wrong log, all seems okay but should not finish. What is the way for the problem? Is there any other friends that ever met the similar issue ... Thanks in advance!
Re: test - what is the wrong while adding one column in the dataframe
just for test, since it seemed that the user email system was something wrong ago, is okay now. On Friday, June 17, 2016 12:18 PM, Zhiliang Zhu wrote: On Tuesday, May 17, 2016 10:44 AM, Zhiliang Zhu wrote: Hi All, For the given DataFrame created by hive sql, however, then it is required to add one more column based on the existing column, and should also keep the previous columns there for the result DataFrame. final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0; //DAYS_30 seems difficult to call in the sql ? DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, cast (server_timestamp as double) as server_timestamp, url, referer, source, app_version, params FROM log.request"); //it is okay to run, but behavior_df.printSchema() not changed any behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); //it is okay to run, but behavior_df.printSchema() only has one column as daysLater30 .//it would be the schema is with the previous all columns and added one as daysLater30 behavior_df = behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); Then, how would do it? Thank you, the issue was resolved.
test - what is the wrong while adding one column in the dataframe
On Tuesday, May 17, 2016 10:44 AM, Zhiliang Zhu wrote: Hi All, For the given DataFrame created by hive sql, however, then it is required to add one more column based on the existing column, and should also keep the previous columns there for the result DataFrame. final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0; //DAYS_30 seems difficult to call in the sql ? DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, cast (server_timestamp as double) as server_timestamp, url, referer, source, app_version, params FROM log.request"); //it is okay to run, but behavior_df.printSchema() not changed any behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); //it is okay to run, but behavior_df.printSchema() only has one column as daysLater30 .//it would be the schema is with the previous all columns and added one as daysLater30 behavior_df = behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); Then, how would do it? Thank you,
what is the wrong while adding one column in the dataframe
Hi All, For the given DataFrame created by hive sql, however, then it is required to add one more column based on the existing column, and should also keep the previous columns there for the result DataFrame. final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0; //DAYS_30 seems difficult to call in the sql ? DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, cast (server_timestamp as double) as server_timestamp, url, referer, source, app_version, params FROM log.request"); //it is okay to run, but behavior_df.printSchema() not changed any behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); //it is okay to run, but behavior_df.printSchema() only has one column as daysLater30 .//it would be the schema is with the previous all columns and added one as daysLater30 behavior_df = behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); Then, how would do it? Thank you,
how to add one more column in DataFrame
Hi All, For the given DataFrame created by hive sql, however, then it is required to add one more column based on the existing column, and should also keep the previous columns there for the result DataFrame. final double DAYS_30 = 1000 * 60 * 60 * 24 * 30.0; //DAYS_30 seems difficult to call in the sql ? DataFrame behavior_df = jhql.sql("SELECT cast (user_id as double) as user_id, cast (server_timestamp as double) as server_timestamp, url, referer, source, app_version, params FROM log.request"); //it is okay to run, but behavior_df.printSchema() not changed any behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); //it is okay to run, but behavior_df.printSchema() only has one column as daysLater30 .//it would be the schema is with the previous all columns and added one as daysLater30 behavior_df = behavior_df.withColumn("daysLater30", behavior_df.col("server_timestamp").plus(DAYS_30)); Then, how would do it? Thank you,
copy/mv hdfs file to another directory by spark program
For some file on hdfs, it is necessary to copy/move it to some another specific hdfs directory, and the directory name would keep unchanged.Just need finish it in spark program, but not hdfs commands.Is there any codes, it seems not to be done by searching spark doc ... Thanks in advance!
what is the proper number set about --num-executors etc
In order to make job run faster, some parameters would be specified in the command lines, such as --executor-cores , --executor-memory and --num-executors ... However, as tested, it seemed that those numbers would not be reset randomly, or some trouble would be caused for the cluster.What is more, the run speed seems not to be faster since those numbers are reset bigger. Would someone provide help?Thanks in advance!
Re: [Beg for help] spark job with very low efficiency
Dear Sab , I must appreciate your kind reply very much, it would be much helpful. On Monday, December 21, 2015 8:49 PM, Sabarish Sasidharan wrote: collect() will bring everything to driver and is costly. Instead of using collect() + parallelize, you could use rdd1.checkpoint() along with a more efficient action like rdd1.count(). This you can do within the for loop. -Do you want to apply checkpoint to cut out the lineage of DAG , however, as tested, it seemed that checkpoint is more costlythan collect ... Hopefully you are using the Kryo serializer already. This would be all right. From your experience , is Kryo improve efficiency obviously ... RegardsSab On Mon, Dec 21, 2015 at 5:51 PM, Zhiliang Zhu wrote: Dear All. I have some kind of iteration job, that is, the next stag's input would be the previous stag's output , and it must do quite lots of times of iteration. JavaRDD rdd1 = //rdd1 may be with one or more partitions for (int i=0, JavaRDD rdd2 = rdd1; i < N; ++i) { JavaRDD rdd3 = rdd2.map(new MapName1(...)); // 1 rdd4 = rdd3.map(new MapName2()); // 2 List list = rdd4.collect(); //however, N is very big, then this line will be VERY MUCH COST //Would checkpoint be used in the rdd which will be generated after lots of steps.//here rdd2 or rdd1 seemed not proper to checkpoint rdd2 = jsc.parallelize(list, M).cache();} Is there way to properly improve the run speed? In fact, I would like to apply spark to mathematica optimization by genetic algorithm , in the above codes, rdd would be the Vector lines storing ,1 is to count fitness number, and 2 is to breed and variate .To get good solution, the iteration number will be big (for example more than 1000 ) ... Thanks in advance!Zhiliang On Monday, December 21, 2015 7:44 PM, Zhiliang Zhu wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang Thanks in advance ! -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan India ICT)+++
Re: number limit of map for spark
Thanks a lot for Zhan's comment, it really offered much help. On Tuesday, December 22, 2015 5:11 AM, Zhan Zhang wrote: What I mean is to combine multiple map functions into one. Don’t know how exactly your algorithms works. Did your one iteration result depend on last iteration? If so, how do they depend on?I think either you can optimize your implementation, or Spark is not the right one for your specific application. Thanks. Zhan Zhang On Dec 21, 2015, at 10:43 AM, Zhiliang Zhu wrote: What is difference between repartition / collect and collapse ...Is collapse the same costly as collect or repartition ? Thanks in advance ~ On Tuesday, December 22, 2015 2:24 AM, Zhan Zhang wrote: In what situation, you have such cases? If there is no shuffle, you can collapse all these functions into one, right? In the meantime, it is not recommended to collectall data to driver. Thanks. Zhan Zhang On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang
Re: number limit of map for spark
What is difference between repartition / collect and collapse ...Is collapse the same costly as collect or repartition ? Thanks in advance ~ On Tuesday, December 22, 2015 2:24 AM, Zhan Zhang wrote: In what situation, you have such cases? If there is no shuffle, you can collapse all these functions into one, right? In the meantime, it is not recommended to collectall data to driver. Thanks. Zhan Zhang On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang
Re: rdd only with one partition
You may just refer to my another letter with title : [Beg for help] spark job with very low efficiency On Tuesday, December 22, 2015 1:49 AM, Ted Yu wrote: I am not familiar with your use case, is it possible to perform the randomized combination operation based on subset of the rows in rdd0 ?That way you can increase the parallelism. Cheers On Mon, Dec 21, 2015 at 9:40 AM, Zhiliang Zhu wrote: Hi Ted, Thanks a lot for your kind reply. I needs to convert this rdd0 into another rdd1, rows of rdd1 are generated from rdd0's row randomly combination operation.From that perspective, rdd0 would be with one partition in order to randomly operate on its all rows, however, it would also lose spark parallelism benefit . Best Wishes!Zhiliang On Monday, December 21, 2015 11:17 PM, Ted Yu wrote: Have you tried the following method ? * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) Cheers On Mon, Dec 21, 2015 at 2:47 AM, Zhiliang Zhu wrote: Dear All, For some rdd, while there is just one partition, then the operation & arithmetic would only be single, the rdd has lose all the parallelism benefit from spark system ... Is it exactly like that? Thanks very much in advance!Zhiliang
Re: number limit of map for spark
Dear Zhan, Thanks very much for your kind reply! You may just refer to my another letter with title : [Beg for help] spark job with very low efficiency I just need to apply spark to mathematica optimization by genetic algorithm , and theoretically the algorithm would iterate for lots of times.Then I lost into the problem:1) spark job will only have limited number of map successive calling before it meets one action ;2) action Function as collect / reduce will increase run time VERY MUCH ;3) if for parallelism, I understand rdd only with one partition will lose all the parallelism provided by spark , is it ... if it is with many partitions then it is difficult to randomly combine all its rows to generate another rdd. Thank you,Zhiliang On Tuesday, December 22, 2015 2:24 AM, Zhan Zhang wrote: In what situation, you have such cases? If there is no shuffle, you can collapse all these functions into one, right? In the meantime, it is not recommended to collectall data to driver. Thanks. Zhan Zhang On Dec 21, 2015, at 3:44 AM, Zhiliang Zhu wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang
Re: rdd only with one partition
Hi Ted, Thanks a lot for your kind reply. I needs to convert this rdd0 into another rdd1, rows of rdd1 are generated from rdd0's row randomly combination operation.From that perspective, rdd0 would be with one partition in order to randomly operate on its all rows, however, it would also lose spark parallelism benefit . Best Wishes!Zhiliang On Monday, December 21, 2015 11:17 PM, Ted Yu wrote: Have you tried the following method ? * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) Cheers On Mon, Dec 21, 2015 at 2:47 AM, Zhiliang Zhu wrote: Dear All, For some rdd, while there is just one partition, then the operation & arithmetic would only be single, the rdd has lose all the parallelism benefit from spark system ... Is it exactly like that? Thanks very much in advance!Zhiliang
[Beg for help] spark job with very low efficiency
Dear All. I have some kind of iteration job, that is, the next stag's input would be the previous stag's output , and it must do quite lots of times of iteration. JavaRDD rdd1 = //rdd1 may be with one or more partitions for (int i=0, JavaRDD rdd2 = rdd1; i < N; ++i) { JavaRDD rdd3 = rdd2.map(new MapName1(...)); // 1 rdd4 = rdd3.map(new MapName2()); // 2 List list = rdd4.collect(); //however, N is very big, then this line will be VERY MUCH COST rdd2 = jsc.parallelize(list, M).cache();} Is there way to properly improve the run speed? In fact, I would like to apply spark to mathematica optimization by genetic algorithm , in the above codes, rdd would be the Vector lines storing ,1 is to count fitness number, and 2 is to breed and variate .To get good solution, the iteration number will be big (for example more than 1000 ) ... Thanks in advance!Zhiliang On Monday, December 21, 2015 7:44 PM, Zhiliang Zhu wrote: Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang
number limit of map for spark
Dear All, I need to iterator some job / rdd quite a lot of times, but just lost in the problem of spark only accept to call around 350 number of map before it meets one action Function , besides, dozens of action will obviously increase the run time.Is there any proper way ... As tested, there is piece of codes as follows: .. 83 int count = 0; 84 JavaRDD dataSet = jsc.parallelize(list, 1).cache(); //with only 1 partition 85 int m = 350; 86 JavaRDD r = dataSet.cache(); 87 JavaRDD t = null; 88 89 for(int j=0; j < m; ++j) { //outer loop to temporarily convert the rdd r to t 90 if(null != t) { 91 r = t; 92 } //inner loop to call map 350 times , if m is much more than 350 (for instance, around 400), then the job will throw exception message "15/12/21 19:36:17 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError java.lang.StackOverflowError") 93 for(int i=0; i < m; ++i) { 94 r = r.map(new Function() { 95 @Override 96 public Integer call(Integer integer) { 97 double x = Math.random() * 2 - 1; 98 double y = Math.random() * 2 - 1; 99 return (x * x + y * y < 1) ? 1 : 0;100 }101 }); 104 }105106 List lt = r.collect(); //then collect this rdd to get another rdd, however, dozens of action Function as collect is VERY MUCH COST107 t = jsc.parallelize(lt, 1).cache();108109 }110.. Thanks very much in advance!Zhiliang
rdd only with one partition
Dear All, For some rdd, while there is just one partition, then the operation & arithmetic would only be single, the rdd has lose all the parallelism benefit from spark system ... Is it exactly like that? Thanks very much in advance!Zhiliang
Re: Inverse of the matrix
use matrix SVD decomposition and spark has the lib . http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html#singular-value-decomposition-svd On Thursday, December 10, 2015 7:33 PM, Arunkumar Pillai wrote: Hi I need to find inverse (X(Transpose) * X) matrix. I have found X transpose and matrix multiplication. is there any way to find to find the inverse of the matrix. -- Thanks and Regards Arun
Re: is repartition very cost
Thanks very much for Yong's help. Sorry that for one more issue, is it that different partitions must be in different nodes? that is, each node would only have one partition, in cluster mode ... On Wednesday, December 9, 2015 6:41 AM, "Young, Matthew T" wrote: #yiv1938266569 #yiv1938266569 -- _filtered #yiv1938266569 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv1938266569 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv1938266569 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv1938266569 {font-family:Cambria;panose-1:2 4 5 3 5 4 6 3 2 4;}#yiv1938266569 #yiv1938266569 p.yiv1938266569MsoNormal, #yiv1938266569 li.yiv1938266569MsoNormal, #yiv1938266569 div.yiv1938266569MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv1938266569 a:link, #yiv1938266569 span.yiv1938266569MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv1938266569 a:visited, #yiv1938266569 span.yiv1938266569MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv1938266569 p.yiv1938266569msonormal0, #yiv1938266569 li.yiv1938266569msonormal0, #yiv1938266569 div.yiv1938266569msonormal0 {margin-right:0in;margin-left:0in;font-size:12.0pt;}#yiv1938266569 span.yiv1938266569EmailStyle18 {color:windowtext;font-weight:normal;font-style:normal;text-decoration:none none;}#yiv1938266569 .yiv1938266569MsoChpDefault {font-size:10.0pt;} _filtered #yiv1938266569 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv1938266569 div.yiv1938266569WordSection1 {}#yiv1938266569 Shuffling large amounts of data over the network is expensive, yes. The cost is lower if you are just using a single node where no networking needs to be involved to do the repartition (using Spark as a multithreading engine). In general you need to do performance testing to see if a repartition is worth the shuffle time. A common model is to repartition the data once after ingest to achieve parallelism and avoid shuffles whenever possible later. From: Zhiliang Zhu [mailto:zchl.j...@yahoo.com.INVALID] Sent: Tuesday, December 08, 2015 5:05 AM To: User Subject: is repartition very cost Hi All, I need to do optimize objective function with some linear constraints by genetic algorithm. I would like to make as much parallelism for it by spark. repartition / shuffle may be used sometimes in it, however, is repartition API very cost ? Thanks in advance! Zhiliang
is repartition very cost
Hi All, I need to do optimize objective function with some linear constraints by genetic algorithm. I would like to make as much parallelism for it by spark. repartition / shuffle may be used sometimes in it, however, is repartition API very cost ? Thanks in advance!Zhiliang
what's the way to access the last element from another partition
In some given partition, it seems difficult to access the last element in another partition, but in my application I need do as that.Exactly how to do it ? Just by repartition /shuffle the rdd into one partition and get the specific "last" element ? Will this will change the previous order among the elements, and will it also not work ? Thanks very much in advance! On Monday, December 7, 2015 11:32 AM, Zhiliang Zhu wrote: On Monday, December 7, 2015 10:37 AM, DB Tsai wrote: Only beginning and ending part of data. The rest in the partition can be compared without shuffle. Would you help write a few pseudo-code about it...It seems that there is not shuffle related API , or repartition ? Thanks a lot in advance! Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Dec 6, 2015 at 6:27 PM, Zhiliang Zhu wrote: > > > > > On Saturday, December 5, 2015 3:00 PM, DB Tsai wrote: > > > This is tricky. You need to shuffle the ending and beginning elements > using mapPartitionWithIndex. > > > Does this mean that I need to shuffle the all elements in different > partitions into one partition, then compare them by way of any two adjacent > elements? > It seems good, if it is like that. > > One more issue, will it loss parallelism since there become only one > partition ... > > Thanks very much in advance! > > > > > > > Sincerely, > > DB Tsai > -- > Web: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > > On Fri, Dec 4, 2015 at 10:30 PM, Zhiliang Zhu wrote: >> Hi All, >> >> I would like to compare any two adjacent elements in one given rdd, just >> as >> the single machine code part: >> >> int a[N] = {...}; >> for (int i=0; i < N - 1; ++i) { >> compareFun(a[i], a[i+1]); >> } >> ... >> >> mapPartitions may work for some situations, however, it could not compare >> elements in different partitions. >> foreach also seems not work. >> >> Thanks, >> Zhiliang > >> >> > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: the way to compare any two adjacent elements in one rdd
On Monday, December 7, 2015 10:37 AM, DB Tsai wrote: Only beginning and ending part of data. The rest in the partition can be compared without shuffle. Would you help write a few pseudo-code about it...It seems that there is not shuffle related API , or repartition ? Thanks a lot in advance! Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Dec 6, 2015 at 6:27 PM, Zhiliang Zhu wrote: > > > > > On Saturday, December 5, 2015 3:00 PM, DB Tsai wrote: > > > This is tricky. You need to shuffle the ending and beginning elements > using mapPartitionWithIndex. > > > Does this mean that I need to shuffle the all elements in different > partitions into one partition, then compare them by way of any two adjacent > elements? > It seems good, if it is like that. > > One more issue, will it loss parallelism since there become only one > partition ... > > Thanks very much in advance! > > > > > > > Sincerely, > > DB Tsai > -- > Web: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > > On Fri, Dec 4, 2015 at 10:30 PM, Zhiliang Zhu wrote: >> Hi All, >> >> I would like to compare any two adjacent elements in one given rdd, just >> as >> the single machine code part: >> >> int a[N] = {...}; >> for (int i=0; i < N - 1; ++i) { >> compareFun(a[i], a[i+1]); >> } >> ... >> >> mapPartitions may work for some situations, however, it could not compare >> elements in different partitions. >> foreach also seems not work. >> >> Thanks, >> Zhiliang > >> >> > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: the way to compare any two adjacent elements in one rdd
On Saturday, December 5, 2015 3:00 PM, DB Tsai wrote: This is tricky. You need to shuffle the ending and beginning elements using mapPartitionWithIndex. Does this mean that I need to shuffle the all elements in different partitions into one partition, then compare them by way of any two adjacent elements?It seems good, if it is like that. One more issue, will it loss parallelism since there become only one partition ... Thanks very much in advance! Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Dec 4, 2015 at 10:30 PM, Zhiliang Zhu wrote: > Hi All, > > I would like to compare any two adjacent elements in one given rdd, just as > the single machine code part: > > int a[N] = {...}; > for (int i=0; i < N - 1; ++i) { > compareFun(a[i], a[i+1]); > } > ... > > mapPartitions may work for some situations, however, it could not compare > elements in different partitions. > foreach also seems not work. > > Thanks, > Zhiliang > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: the way to compare any two adjacent elements in one rdd
For this, mapWithPartitionsWithIndex would also properly work for filter. Here is the code copied for stack-overflow, which is used to remove the first line of a csv file: JavaRDD rawInputRdd = sparkContext.textFile(dataFile); Function2 removeHeader= new Function2, Iterator>() { @Override public Iterator call(Integer index, Iterator iterator) throws Exception { if(index == 0 && iterator.hasNext()) { //for my usage, iterator.next(); //compare any two adjacent elements, or do filter, return iterator; //then index parameter is useless here, just is OK to view iterator as from one logical iterator/partition // is it } else return iterator; } }; JavaRDD inputRdd = rawInputRdd.mapPartitionsWithIndex(removeHeader, false);On Saturday, December 5, 2015 3:52 PM, Zhiliang Zhu wrote: Hi DB Tsai, Thanks very much for your kind reply! Sorry that for one more issue, as tested it seems that filter could only return JavaRDD but not any JavaRDD , is it ?Then it is not much convenient to do general filter for RDD, mapPartitions could work some, but if some partition will left and return none element after filter by mapPartitions, some problemwill be there. Best Wishes!Zhiliang On Saturday, December 5, 2015 3:00 PM, DB Tsai wrote: This is tricky. You need to shuffle the ending and beginning elements using mapPartitionWithIndex. Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Dec 4, 2015 at 10:30 PM, Zhiliang Zhu wrote: > Hi All, > > I would like to compare any two adjacent elements in one given rdd, just as > the single machine code part: > > int a[N] = {...}; > for (int i=0; i < N - 1; ++i) { > compareFun(a[i], a[i+1]); > } > ... > > mapPartitions may work for some situations, however, it could not compare > elements in different partitions. > foreach also seems not work. > > Thanks, > Zhiliang > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: the way to compare any two adjacent elements in one rdd
Hi DB Tsai, Thanks very much for your kind reply! Sorry that for one more issue, as tested it seems that filter could only return JavaRDD but not any JavaRDD , is it ?Then it is not much convenient to do general filter for RDD, mapPartitions could work some, but if some partition will left and return none element after filter by mapPartitions, some problemwill be there. Best Wishes!Zhiliang On Saturday, December 5, 2015 3:00 PM, DB Tsai wrote: This is tricky. You need to shuffle the ending and beginning elements using mapPartitionWithIndex. Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Fri, Dec 4, 2015 at 10:30 PM, Zhiliang Zhu wrote: > Hi All, > > I would like to compare any two adjacent elements in one given rdd, just as > the single machine code part: > > int a[N] = {...}; > for (int i=0; i < N - 1; ++i) { > compareFun(a[i], a[i+1]); > } > ... > > mapPartitions may work for some situations, however, it could not compare > elements in different partitions. > foreach also seems not work. > > Thanks, > Zhiliang > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
the way to compare any two adjacent elements in one rdd
Hi All, I would like to compare any two adjacent elements in one given rdd, just as the single machine code part: int a[N] = {...};for (int i=0; i < N - 1; ++i) { compareFun(a[i], a[i+1]);}... mapPartitions may work for some situations, however, it could not compare elements in different partitions. foreach also seems not work. Thanks,Zhiliang
Re: what is algorithm to optimize function with nonlinear constraints
Thanks a lot for Ushnish 's kind reply. I am considering to apply simulated annealing algorithm for this question.However, there may be just one issue - sensibility , that is to say, for the totally same system of the function / constraints, the solution may be different while runningthe program twice. As is know, simulated annealing algorithm is based on randomly searching. Theoretically, it will get the global optimized solution, but since in practice, it is difficult to matchthe all theoretical conditions , then local optimized solution will be got. Then I think different local optimized solution may be got for different run.Is there any comment? Best Wishes ! On Friday, November 20, 2015 2:02 AM, Ushnish De wrote: Have you looked into Gurobi? Supposedly it can solve QCP (quadratically constrained programming) problems which is what your example problem looks like, based on your line 3. However if the problem is non convex then it'll be hard to solve in most cases. On Thu, Nov 19, 2015, 9:42 AM 'Zhiliang Zhu' via All ADATAO Team Members wrote: Hi all, I have some optimization problem, I have googled a lot but still did not get the exact algorithm or third-party open package to apply in it. Its type is like this, Objective function: f(x1, x2, ..., xn) (n >= 100, and f may be linear or non-linear)Constraint functions: x1 + x2 + ... + xn = 1, 1) b1 * x1 + b2 * x2 + ... + bn * xn = b, 2) c1 * x1 * x1 + c2 * x2 * x2 + ... + cn * xn * xn = c, 3) <- nonlinear constraint x1, x2, ..., xn >= 0 . To find the solution of x which lets objective function globally or locally the biggest. I was thinking about to apply gradient descent or Levenberg-Marquardt algorithm to solve it, however, the two are used for none constraint.I also considered Lagrange multiplier method, but the system of gradient equations is nonlinear, which seems difficult to solve, Which algorithm would be proper to apply here, and is there any open package like breeze for it?Any comment or link will be helpful. Thanks a lot in advance! Zhiliang
what is algorithm to optimize function with nonlinear constraints
Hi all, I have some optimization problem, I have googled a lot but still did not get the exact algorithm or third-party open package to apply in it. Its type is like this, Objective function: f(x1, x2, ..., xn) (n >= 100, and f may be linear or non-linear)Constraint functions: x1 + x2 + ... + xn = 1, 1) b1 * x1 + b2 * x2 + ... + bn * xn = b, 2) c1 * x1 * x1 + c2 * x2 * x2 + ... + cn * xn * xn = c, 3) <- nonlinear constraint x1, x2, ..., xn >= 0 . To find the solution of x which lets objective function globally or locally the biggest. I was thinking about to apply gradient descent or Levenberg-Marquardt algorithm to solve it, however, the two are used for none constraint.I also considered Lagrange multiplier method, but the system of gradient equations is nonlinear, which seems difficult to solve, Which algorithm would be proper to apply here, and is there any open package like breeze for it?Any comment or link will be helpful. Thanks a lot in advance! Zhiliang
Re: spark with breeze error of NoClassDefFoundError
Dear Ted,I just looked at the link you provided, it is great! For my understanding, I could also directly use other Breeze part (except spark mllib package linalg ) in spark (scala or java ) program after importing Breeze package,it is right? Thanks a lot in advance again!Zhiliang On Thursday, November 19, 2015 1:46 PM, Ted Yu wrote: Have you looked athttps://github.com/scalanlp/breeze/wiki Cheers On Nov 18, 2015, at 9:34 PM, Zhiliang Zhu wrote: Dear Jack, As is known, Breeze is numerical calculation package wrote by scala , spark mllib also use it as underlying package for algebra usage.Here I am also preparing to use Breeze for nonlinear equation optimization, however, it seemed that I could not find the exact doc or API for Breeze except spark linalg package... Could you help some to provide me the official doc or API website for Breeze ?Thank you in advance! Zhiliang On Thursday, November 19, 2015 7:32 AM, Jack Yang wrote: #yiv6155504207 #yiv6155504207 -- filtered {font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv6155504207 filtered {font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv6155504207 filtered {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv6155504207 filtered {font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;}#yiv6155504207 filtered {panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv6155504207 p.yiv6155504207MsoNormal, #yiv6155504207 li.yiv6155504207MsoNormal, #yiv6155504207 div.yiv6155504207MsoNormal {margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;}#yiv6155504207 a:link, #yiv6155504207 span.yiv6155504207MsoHyperlink {color:blue;text-decoration:underline;}#yiv6155504207 a:visited, #yiv6155504207 span.yiv6155504207MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv6155504207 p.yiv6155504207MsoAcetate, #yiv6155504207 li.yiv6155504207MsoAcetate, #yiv6155504207 div.yiv6155504207MsoAcetate {margin:0cm;margin-bottom:.0001pt;font-size:8.0pt;}#yiv6155504207 span.yiv6155504207apple-converted-space {}#yiv6155504207 span.yiv6155504207EmailStyle18 {color:#1F497D;}#yiv6155504207 span.yiv6155504207BalloonTextChar {}#yiv6155504207 .yiv6155504207MsoChpDefault {font-size:10.0pt;}#yiv6155504207 filtered {margin:72.0pt 72.0pt 72.0pt 72.0pt;}#yiv6155504207 div.yiv6155504207WordSection1 {}#yiv6155504207 If I tried to change “provided” to “compile”.. then the error changed to : Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) atsmartapp.smart.sparkwithscala.textMingApp.main(textMingApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/11/19 10:28:29 INFO util.Utils: Shutdown hook called Meanwhile, I will prefer to use maven to compile the jar file rather than sbt, although it is indeed another option. Best regards, Jack From: Fengdong Yu [mailto:fengdo...@everstring.com] Sent: Wednesday, 18 November 2015 7:30 PM To: Jack Yang Cc: Ted Yu; user@spark.apache.org Subject: Re: spark with breeze error of NoClassDefFoundError The simplest way is remove all “provided” in your pom. then ‘sbt assembly” to build your final package. then get rid of ‘—jars’ because assembly already includes all dependencies. On Nov 18, 2015, at 2:15 PM, Jack Yang wrote: So weird. Is there anything wrong with the way I made the pom file (I labelled them as provided)? Is there missing jar I forget to add in “--jar”? See the trace below: Exception in thread "main" java.lang.NoClassD
Re: spark with breeze error of NoClassDefFoundError
Dear Jack, As is known, Breeze is numerical calculation package wrote by scala , spark mllib also use it as underlying package for algebra usage.Here I am also preparing to use Breeze for nonlinear equation optimization, however, it seemed that I could not find the exact doc or API for Breeze except spark linalg package... Could you help some to provide me the official doc or API website for Breeze ?Thank you in advance! Zhiliang On Thursday, November 19, 2015 7:32 AM, Jack Yang wrote: If I tried to change “provided” to “compile”.. then the error changed to : Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) atsmartapp.smart.sparkwithscala.textMingApp.main(textMingApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/11/19 10:28:29 INFO util.Utils: Shutdown hook called Meanwhile, I will prefer to use maven to compile the jar file rather than sbt, although it is indeed another option. Best regards, Jack From: Fengdong Yu [mailto:fengdo...@everstring.com] Sent: Wednesday, 18 November 2015 7:30 PM To: Jack Yang Cc: Ted Yu; user@spark.apache.org Subject: Re: spark with breeze error of NoClassDefFoundError The simplest way is remove all “provided” in your pom. then ‘sbt assembly” to build your final package. then get rid of ‘—jars’ because assembly already includes all dependencies. On Nov 18, 2015, at 2:15 PM, Jack Yang wrote: So weird. Is there anything wrong with the way I made the pom file (I labelled them as provided)? Is there missing jar I forget to add in “--jar”? See the trace below: Exception in thread "main" java.lang.NoClassDefFoundError: breeze/storage/DefaultArrayValue at smartapp.smart.sparkwithscala.textMingApp.main(textMingApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: breeze.storage.DefaultArrayValue at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 10 more 15/11/18 17:15:15 INFO util.Utils: Shutdown hook called From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, 18 November 2015 4:01 PM To: Jack Yang Cc: user@spark.apache.org Subject: Re: spark with breeze error of NoClassDefFoundError Looking in local maven repo, breeze_2.10-0.7.jar contains DefaultArrayValue : jar tvf /Users/tyu/.m2/repository//org/scalanlp/breeze_2.10/0.7/breeze_2.10-0.7.jar | grep !$ jar tvf /User
Re: How to properly read the first number lines of file into a RDD
Thanks a lot for your reply.I have also worked it out by some other ways. In fact, firstly I was thinking about using filter to do it but failed. On Monday, November 9, 2015 9:52 PM, Akhil Das wrote: There's multiple way to achieve this: 1. Read the N lines from the driver and then do a sc.parallelize(nlines) to create an RDD out of it.2. Create an RDD with N+M, do a take on N and then broadcast or parallelize the returning list.3. Something like this if the file is in hdfs: val n_f = (5,file_name) val n_lines = sc.parallelize(Array(n_f)) val n_linesRDD = n_lines.map(n => { //Read and return 5 lines (n._1) from the file (n._2) }) ThanksBest Regards On Thu, Oct 29, 2015 at 9:51 PM, Zhiliang Zhu wrote: Hi All, There is some file with line number N + M,, as I need to read the first N lines into one RDD . 1. i) read all the N + M lines as one RDD, ii) select the RDD's top N rows, may be some one solution;2. if introduced some broadcast variable set N, then it is used to decide while map the file RDD. Only map its first N rows, this may notwork, however. Is there some better solution? Thank you,Zhiliang
Re: could not understand issue about static spark Function (map / sortBy ...)
I have got the issues all, after quite a lot of test. Function would only be defined in static normal function body, or defined as static member variable.Function would also be defined as inner static class, some its own member variable or functions could be defined, the variable can be passed while new the Function obj, and in the Function inner class the inner normal function can be called. On Tuesday, November 10, 2015 5:12 PM, Zhiliang Zhu wrote: As more test, the Function call by map/sortBy etc must be defined as static, or it can be defined as non-static and must be called by other static normal function.I am really confused by it. On Tuesday, November 10, 2015 4:12 PM, Zhiliang Zhu wrote: Hi All, I have met some bug not understandable as follows: class A { private JavaRDD _com_rdd; ... ... //here it must be static, but not every Function as map etc would be static, as the code examples in spark self official doc static Function mapParseRow = new Function() { @Override public Vector call (Vector v) { System.out.println("mark. map log is here"); Vector rt; ... //if here needs to call some other non-static function, how can it be ? return rt; } }; public void run() { //it will be called outside some other public class by A object ... JavaRDD rdd = (this._com_rdd).map(mapParseRow); //it will cause failure while map is not static ... } } Would you help comment some for it? What would be done? Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
could not understand issue about static spark Function (map / sortBy ...)
As more test, the Function call by map/sortBy etc must be defined as static, or it can be defined as non-static and must be called by other static normal function.I am really confused by it. On Tuesday, November 10, 2015 4:12 PM, Zhiliang Zhu wrote: Hi All, I have met some bug not understandable as follows: class A { private JavaRDD _com_rdd; ... ... //here it must be static, but not every Function as map etc would be static, as the code examples in spark self official doc static Function mapParseRow = new Function() { @Override public Vector call (Vector v) { System.out.println("mark. map log is here"); Vector rt; ... //if here needs to call some other non-static function, how can it be ? return rt; } }; public void run() { //it will be called outside some other public class by A object ... JavaRDD rdd = (this._com_rdd).map(mapParseRow); //it will cause failure while map is not static ... } } Would you help comment some for it? What would be done? Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
static spark Function as map
Hi All, I have met some bug not understandable as follows: class A { private JavaRDD _com_rdd; ... ... //here it must be static, but not every Function as map etc would be static, as the code examples in spark self official doc static Function mapParseRow = new Function() { @Override public Vector call (Vector v) { System.out.println("mark. map log is here"); Vector rt; ... //if here needs to call some other non-static function, how can it be ? return rt; } }; public void run() { //it will be called outside some other public class by A object ... JavaRDD rdd = (this._com_rdd).map(mapParseRow); //it will cause failure while map is not static ... } } Would you help comment some for it? What would be done? Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
Re: could not see the print out log in spark functions as mapPartitions
Hi Ching-Mallete, I have found the log and the reason for that. Thanks a lot!Zhiliang On Tuesday, November 10, 2015 12:23 PM, Zhiliang Zhu wrote: Also for Spark UI , that is, log from other places could be found, but the log from the functions as mapPartitions could not. On Tuesday, November 10, 2015 11:52 AM, Zhiliang Zhu wrote: Dear Ching-Mallete , There are machines master01, master02 and master03 for the cluster, I could see the stderr log and stdout log in the directories /.../yarn/logs under those machines.In the stderr there are some notes, I just use log to follow the track of the bug, however log in mapPartitions could not be found, and any other log is there.Will only Spark UI will see them? Or some places in the machine will also see ... Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
Re: could not see the print out log in spark functions as mapPartitions
Also for Spark UI , that is, log from other places could be found, but the log from the functions as mapPartitions could not. On Tuesday, November 10, 2015 11:52 AM, Zhiliang Zhu wrote: Dear Ching-Mallete , There are machines master01, master02 and master03 for the cluster, I could see the stderr log and stdout log in the directories /.../yarn/logs under those machines.In the stderr there are some notes, I just use log to follow the track of the bug, however log in mapPartitions could not be found, and any other log is there.Will only Spark UI will see them? Or some places in the machine will also see ... Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
Re: could not see the print out log in spark functions as mapPartitions
Dear Ching-Mallete , There are machines master01, master02 and master03 for the cluster, I could see the stderr log and stdout log in the directories /.../yarn/logs under those machines.In the stderr there are some notes, I just use log to follow the track of the bug, however log in mapPartitions could not be found, and any other log is there.Will only Spark UI will see them? Or some places in the machine will also see ... Thank you in advance!Zhiliang On Tuesday, November 10, 2015 11:42 AM, Deng Ching-Mallete wrote: Hi Zhiliang, You should be able to see them in the executor logs, which you can view via the Spark UI, in the Executors page (stderr log). HTH,Deng On Tue, Nov 10, 2015 at 11:33 AM, Zhiliang Zhu wrote: Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
could not see the print out log in spark functions as mapPartitions
Hi All, I need debug spark job, my general way is to print out the log, however, some bug is in spark functions as mapPartitions etc, and not any log printed from those functionscould be found...Would you help point what is way to the log in the spark own function as mapPartitions? Or, what is general way to debug spark job. Thank you in advance! Zhiliang
Re: apply simplex method to fix linear programming in spark
Dear Debasish Das, Thanks very much for your kind reply. I am very sorry that, but may you clearify a little more about the places, since I could not find them. On Thursday, November 5, 2015 5:50 AM, Debasish Das wrote: Yeah for this you can use breeze quadratic minimizer...that's integrated with spark in one of my spark pr. You have quadratic objective with equality which is primal and your proximal is positivity that we already support. I have not given an API for linear objective but that should be simple to add. You can add an issue in breeze for the enhancememt. Where is the API or link site for the breeze quadratic minimizer integrated with spark?And where is the breeze lpsolver... Alternatively you can use breeze lpsolver as well that uses simplex from apache math. Thank you,Zhiliang On Nov 4, 2015 1:05 AM, "Zhiliang Zhu" wrote: Hi Debasish Das, Firstly I must show my deep appreciation towards you kind help. Yes, my issue is some typical LP related, it is as:Objective function:f(x1, x2, ..., xn) = a1 * x1 + a2 * x2 + ... + an * xn, (n would be some number bigger than 100) There are only 4 constraint functions,x1 + x2 + ... + xn = 1, 1) b1 * x1 + b2 * x2 + ... + bn * xn = b, 2) c1 * x1 + c2 * x2 + ... + cn * xn = c, 3) x1, x2, ..., xn >= 0 . To find the solution of x which lets objective function the biggest. Since simplex method may not be supported by spark. Then I may switch to the way as, since the likely solution x must be on the boundary of 1), 2) and 3) geometry,that is to say, only three xi may be >= 0, all the others must be 0. Just look for all that kinds of solutions of 1), 2) and 3), the number would be C(n, 3) + C(n, 2) + C(n, 1), at last to select the most optimized one. Since the constraint number is not that large, I think this might be some way. Thank you,Zhiliang On Wednesday, November 4, 2015 2:25 AM, Debasish Das wrote: Spark has nnls in mllib optimization. I have refactored nnls to breeze as well but we could not move out nnls from mllib due to some runtime issues from breeze.Issue in spark or breeze nnls is that it takes dense gram matrix which does not scale if rank is high but it has been working fine for nnmf till 400 rank.I agree with Sean that you need to see if really simplex is needed. Many constraints can be formulated as proximal operator and then you can use breeze nonlinearminimizer or spark-tfocs package if it is stable.On Nov 2, 2015 10:13 AM, "Sean Owen" wrote: I might be steering this a bit off topic: does this need the simplex method? this is just an instance of nonnegative least squares. I don't think it relates to LDA either. Spark doesn't have any particular support for NNLS (right?) or simplex though. On Mon, Nov 2, 2015 at 6:03 PM, Debasish Das wrote: > Use breeze simplex which inturn uses apache maths simplex...if you want to > use interior point method you can use ecos > https://github.com/embotech/ecos-java-scala ...spark summit 2014 talk on > quadratic solver in matrix factorization will show you example integration > with spark. ecos runs as jni process in every executor. > > On Nov 1, 2015 9:52 AM, "Zhiliang Zhu" wrote: >> >> Hi Ted Yu, >> >> Thanks very much for your kind reply. >> Do you just mean that in spark there is no specific package for simplex >> method? >> >> Then I may try to fix it by myself, do not decide whether it is convenient >> to finish by spark, before finally fix it. >> >> Thank you, >> Zhiliang >> >> >> >> >> On Monday, November 2, 2015 1:43 AM, Ted Yu wrote: >> >> >> A brief search in code base shows the following: >> >> TODO: Add simplex constraints to allow alpha in (0,1). >> ./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala >> >> I guess the answer to your question is no. >> >> FYI >> >> On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu >> wrote: >> >> Dear All, >> >> As I am facing some typical linear programming issue, and I know simplex >> method is specific in solving LP question, >> I am very sorry that whether there is already some mature package in spark >> about simplex method... >> >> Thank you very much~ >> Best Wishes! >> Zhiliang >> >> >> >> >> >
Re: [Spark MLlib] about linear regression issue
Hi DB Tsai, Firstly I must show my deep appreciation towards your kind help. Did you just mean like this, currently there is no way for users to deal with constrains like all weights >= 0 in spark, though spark also has LBFGS ... Moreover, I did not know whether spark SVD will help some for that issue... But people do LBFGS-B for the constrains like all weights >= 0 , since LBFGS-B is already working. I should get familar to use breeze in the program then . Thank you, Zhiliang On Monday, November 2, 2015 11:12 AM, DB Tsai wrote: For the constrains like all weights >=0, people do LBFGS-B which is supported in our optimization library, Breeze. https://github.com/scalanlp/breeze/issues/323 However, in Spark's LiR, our implementation doesn't have constrain implementation. I do see this is useful given we're experimenting SLIM: Sparse Linear Methods for recommendation, http://www-users.cs.umn.edu/~xning/papers/Ning2011c.pdf which requires all the weights to be positive (Eq. 3) to represent positive relations between items. In summary, it's possible and not difficult to add this constrain to our current linear regression, but currently, there is no open source implementation in Spark. Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Nov 1, 2015 at 9:22 AM, Zhiliang Zhu wrote: > Dear All, > > As for N dimension linear regression, while the labeled training points > number (or the rank of the labeled point space) is less than N, > then from perspective of math, the weight of the trained linear model may be > not unique. > > However, the output of model.weight() by spark may be with some wi < 0. My > issue is, is there some proper way only to get > some specific output weight with all wi >= 0 ... > > Yes, the above goes same with the issue about solving linear system of > equations, Aw = b, and r(A, b) = r(A) < columnNo(A), then w is > with infinite solutions, but here only needs one solution with all wi >= 0. > When there is only unique solution, both LR and SVD will work perfect. > > I will appreciate your all kind help very much~~ > Best Regards, > Zhiliang > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: apply simplex method to fix linear programming in spark
Hi Debasish Das, I found that there are lots of much useful information for me in your kind reply.However, I am sorry that still I could not exactly catch each words you said. I just know spark mllib will use breeze as its underlying package, however, I did not practise and do not know how to directly apply breeze in spark program.What is and where is nnls in mllib optimization, and what is spark-tfocs package , I do not find them in spark official website...Where is to use breeze nonlinearminimizer... Next I will also meet with optimized function with nonlinear constraint functions. I am sorry that I do not know whether it is convenient for you to comment some more for my above issues. Thank you in advance. Zhiliang On Wednesday, November 4, 2015 2:25 AM, Debasish Das wrote: Spark has nnls in mllib optimization. I have refactored nnls to breeze as well but we could not move out nnls from mllib due to some runtime issues from breeze.Issue in spark or breeze nnls is that it takes dense gram matrix which does not scale if rank is high but it has been working fine for nnmf till 400 rank.I agree with Sean that you need to see if really simplex is needed. Many constraints can be formulated as proximal operator and then you can use breeze nonlinearminimizer or spark-tfocs package if it is stable.On Nov 2, 2015 10:13 AM, "Sean Owen" wrote: I might be steering this a bit off topic: does this need the simplex method? this is just an instance of nonnegative least squares. I don't think it relates to LDA either. Spark doesn't have any particular support for NNLS (right?) or simplex though. On Mon, Nov 2, 2015 at 6:03 PM, Debasish Das wrote: > Use breeze simplex which inturn uses apache maths simplex...if you want to > use interior point method you can use ecos > https://github.com/embotech/ecos-java-scala ...spark summit 2014 talk on > quadratic solver in matrix factorization will show you example integration > with spark. ecos runs as jni process in every executor. > > On Nov 1, 2015 9:52 AM, "Zhiliang Zhu" wrote: >> >> Hi Ted Yu, >> >> Thanks very much for your kind reply. >> Do you just mean that in spark there is no specific package for simplex >> method? >> >> Then I may try to fix it by myself, do not decide whether it is convenient >> to finish by spark, before finally fix it. >> >> Thank you, >> Zhiliang >> >> >> >> >> On Monday, November 2, 2015 1:43 AM, Ted Yu wrote: >> >> >> A brief search in code base shows the following: >> >> TODO: Add simplex constraints to allow alpha in (0,1). >> ./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala >> >> I guess the answer to your question is no. >> >> FYI >> >> On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu >> wrote: >> >> Dear All, >> >> As I am facing some typical linear programming issue, and I know simplex >> method is specific in solving LP question, >> I am very sorry that whether there is already some mature package in spark >> about simplex method... >> >> Thank you very much~ >> Best Wishes! >> Zhiliang >> >> >> >> >> >
Re: apply simplex method to fix linear programming in spark
Hi Debasish Das, Firstly I must show my deep appreciation towards you kind help. Yes, my issue is some typical LP related, it is as:Objective function:f(x1, x2, ..., xn) = a1 * x1 + a2 * x2 + ... + an * xn, (n would be some number bigger than 100) There are only 4 constraint functions,x1 + x2 + ... + xn = 1, 1) b1 * x1 + b2 * x2 + ... + bn * xn = b, 2) c1 * x1 + c2 * x2 + ... + cn * xn = c, 3) x1, x2, ..., xn >= 0 . To find the solution of x which lets objective function the biggest. Since simplex method may not be supported by spark. Then I may switch to the way as, since the likely solution x must be on the boundary of 1), 2) and 3) geometry,that is to say, only three xi may be >= 0, all the others must be 0. Just look for all that kinds of solutions of 1), 2) and 3), the number would be C(n, 3) + C(n, 2) + C(n, 1), at last to select the most optimized one. Since the constraint number is not that large, I think this might be some way. Thank you,Zhiliang On Wednesday, November 4, 2015 2:25 AM, Debasish Das wrote: Spark has nnls in mllib optimization. I have refactored nnls to breeze as well but we could not move out nnls from mllib due to some runtime issues from breeze.Issue in spark or breeze nnls is that it takes dense gram matrix which does not scale if rank is high but it has been working fine for nnmf till 400 rank.I agree with Sean that you need to see if really simplex is needed. Many constraints can be formulated as proximal operator and then you can use breeze nonlinearminimizer or spark-tfocs package if it is stable.On Nov 2, 2015 10:13 AM, "Sean Owen" wrote: I might be steering this a bit off topic: does this need the simplex method? this is just an instance of nonnegative least squares. I don't think it relates to LDA either. Spark doesn't have any particular support for NNLS (right?) or simplex though. On Mon, Nov 2, 2015 at 6:03 PM, Debasish Das wrote: > Use breeze simplex which inturn uses apache maths simplex...if you want to > use interior point method you can use ecos > https://github.com/embotech/ecos-java-scala ...spark summit 2014 talk on > quadratic solver in matrix factorization will show you example integration > with spark. ecos runs as jni process in every executor. > > On Nov 1, 2015 9:52 AM, "Zhiliang Zhu" wrote: >> >> Hi Ted Yu, >> >> Thanks very much for your kind reply. >> Do you just mean that in spark there is no specific package for simplex >> method? >> >> Then I may try to fix it by myself, do not decide whether it is convenient >> to finish by spark, before finally fix it. >> >> Thank you, >> Zhiliang >> >> >> >> >> On Monday, November 2, 2015 1:43 AM, Ted Yu wrote: >> >> >> A brief search in code base shows the following: >> >> TODO: Add simplex constraints to allow alpha in (0,1). >> ./mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala >> >> I guess the answer to your question is no. >> >> FYI >> >> On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu >> wrote: >> >> Dear All, >> >> As I am facing some typical linear programming issue, and I know simplex >> method is specific in solving LP question, >> I am very sorry that whether there is already some mature package in spark >> about simplex method... >> >> Thank you very much~ >> Best Wishes! >> Zhiliang >> >> >> >> >> >
spark filter function
Hi All, I would like to filter some elements in some given RDD, only the needed left, at the time the row number of the result RDD is smaller. Then I select filter function, however, by test, filter function would only accept Boolean type, that is to say, will only JavaRDDbe returned for filter.For my test, the input RDD would also be JavaRDD , the version is 1.4.1 . Would you help comment some? Thanks in advance. Zhiliang
Re: apply simplex method to fix linear programming in spark
Hi Ted Yu, Thanks very much for your kind reply.Do you just mean that in spark there is no specific package for simplex method? Then I may try to fix it by myself, do not decide whether it is convenient to finish by spark, before finally fix it. Thank you,Zhiliang On Monday, November 2, 2015 1:43 AM, Ted Yu wrote: A brief search in code base shows the following: TODO: Add simplex constraints to allow alpha in (0,1)../mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala I guess the answer to your question is no. FYI On Sun, Nov 1, 2015 at 9:37 AM, Zhiliang Zhu wrote: Dear All, As I am facing some typical linear programming issue, and I know simplex method is specific in solving LP question, I am very sorry that whether there is already some mature package in spark about simplex method... Thank you very much~Best Wishes!Zhiliang
apply simplex method to fix linear programming in spark
Dear All, As I am facing some typical linear programming issue, and I know simplex method is specific in solving LP question, I am very sorry that whether there is already some mature package in spark about simplex method... Thank you very much~Best Wishes!Zhiliang
[Spark MLlib] about linear regression issue
Dear All, As for N dimension linear regression, while the labeled training points number (or the rank of the labeled point space) is less than N, then from perspective of math, the weight of the trained linear model may be not unique. However, the output of model.weight() by spark may be with some wi < 0. My issue is, is there some proper way only to getsome specific output weight with all wi >= 0 ... Yes, the above goes same with the issue about solving linear system of equations, Aw = b, and r(A, b) = r(A) < columnNo(A), then w iswith infinite solutions, but here only needs one solution with all wi >= 0.When there is only unique solution, both LR and SVD will work perfect. I will appreciate your all kind help very much~~Best Regards,Zhiliang
How to properly read the first number lines of file into a RDD
Hi All, There is some file with line number N + M,, as I need to read the first N lines into one RDD . 1. i) read all the N + M lines as one RDD, ii) select the RDD's top N rows, may be some one solution;2. if introduced some broadcast variable set N, then it is used to decide while map the file RDD. Only map its first N rows, this may notwork, however. Is there some better solution? Thank you,Zhiliang
is it proper to make RDD as function parameter in the codes
Dear All, I will program a small project by spark, and the run speed is big concern. I have a question, since RDD is always big on the cluster, is it proper to make RDD variable as parameter transferred during function call ? Thank you,Zhiliang
Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
Hi Meihua, I just found that setFitIntercept(false) is introduced since Spark 1.5.0, my current version is 1.4.0 . I shall also try that after update the version . Since you said brezee is probably used, I knew brezee is used under the bottom of spark ml.Would you help comment some more how to use it here to solve systems of linear equations ... Thank you very much~Zhiliang On Monday, October 26, 2015 2:58 PM, Meihua Wu wrote: please add "setFitIntercept(false)" to your LinearRegression. LinearRegression by default includes an intercept in the model, e.g. label = intercept + features dot weight To get the result you want, you need to force the intercept to be zero. Just curious, are you trying to solve systems of linear equations? If so, you can probably try breeze. On Sun, Oct 25, 2015 at 9:10 PM, Zhiliang Zhu wrote: > > > > On Monday, October 26, 2015 11:26 AM, Zhiliang Zhu > wrote: > > > Hi DB Tsai, > > Thanks very much for your kind help. I get it now. > > I am sorry that there is another issue, the weight/coefficient result is > perfect while A is triangular matrix, however, while A is not triangular > matrix (but > transformed from triangular matrix, still is invertible), the result seems > not perfect and difficult to make it better by resetting the parameter. > Would you help comment some about that... > > List localTraining = Lists.newArrayList( > new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), > new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), > new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), > new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0))); > ... > LinearRegression lr = new LinearRegression() > .setMaxIter(2) > .setRegParam(0) > .setElasticNetParam(0); > > > -- > > It seems that no matter how to reset the parameters for lr , the output of > x3 and x4 is always nearly the same . > Whether there is some way to make the result a little better... > > > -- > > x3 and x4 could not become better, the output is: > Final w: > [0.999477672867,1.999748740578,3.500112393734,3.50011239377] > > Thank you, > Zhiliang > > > > On Monday, October 26, 2015 10:25 AM, DB Tsai wrote: > > > Column 4 is always constant, so no predictive power resulting zero weight. > > On Sunday, October 25, 2015, Zhiliang Zhu wrote: > > Hi DB Tsai, > > Thanks very much for your kind reply help. > > As for your comment, I just modified and tested the key part of the codes: > > LinearRegression lr = new LinearRegression() > .setMaxIter(1) > .setRegParam(0) > .setElasticNetParam(0); //the number could be reset > > final LinearRegressionModel model = lr.fit(training); > > Now the output is much reasonable, however, x4 is always 0 while repeatedly > reset those parameters in lr , would you help some about it how to properly > set the parameters ... > > Final w: [1.00127825909,1.99979185054,2.3307136,0.0] > > Thank you, > Zhiliang > > > > > On Monday, October 26, 2015 5:14 AM, DB Tsai wrote: > > > LinearRegressionWithSGD is not stable. Please use linear regression in > ML package instead. > http://spark.apache.org/docs/latest/ml-linear-methods.html > > Sincerely, > > DB Tsai > -- > Web: https://www.dbtsai.com > PGP Key ID: 0xAF08DF8D > > > On Sun, Oct 25, 2015 at 10:14 AM, Zhiliang Zhu > wrote: >> Dear All, >> >> I have some program as below which makes me very much confused and >> inscrutable, it is about multiple dimension linear regression mode, the >> weight / coefficient is always perfect while the dimension is smaller than >> 4, otherwise it is wrong all the time. >> Or, whether the LinearRegressionWithSGD would be selected for another one? >> >> public class JavaLinearRegression { >> public static void main(String[] args) { >> SparkConf conf = new SparkConf().setAppName("Linear Regression >> Example"); >> JavaSparkContext sc = new JavaSparkContext(conf); >> SQLContext jsql = new SQLContext(sc); >> >> //Ax = b, x = [1, 2, 3, 4] would be the only one out
Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
Hi Meihua, DB Tsai, Thanks very much for your all kind help.While I add some more LabeledPoint in the training data, then the output result also seems much better. I will also try setFitIntercept(false) way . Currently I encounted some problem about algorithm optimization issue: f(x1, x2, ..., xn) = a11 *x1 * x1 + a12 * x1 * x2 + a22 * x2 * x2 + ... + ann * xn * xn , with constraint equations:b1 * x1 + b2 * x2 + ... bn * xn = 1, xi >= 0 etc .To find the proper x = [x1, x2, ..., xn] to make f(x1, x2, , xn) the biggest . It is reqiured to use Spark to fix it, however, I am not familar to use spark directly on algorithm optimization issue, and now I am not skilled to use gradient descentway on the multiple dimension function.If you know this issue, would you help comment some. Yes, then I converted this problem into someone about solve systems of linear equations c1 * w1 + c2 * w2 + ... + cn * wn = d,I just view c and w convensely as, w1 * c1 + w2 * c2 + ... + wn * cn = d, then w becomes coefficient and c becomes variable, I think Spark Linear Regression would be helpful here. Expert Sujit also kindly help me to point out the way to figure out pseudo inverse A for Ax = b, I will also try it next. Since I would use Spark to fix the issue, as you said breeze shall be used here, would you help explain or direct some about the way to use it here... Thank you very much !Zhiliang On Monday, October 26, 2015 2:58 PM, Meihua Wu wrote: please add "setFitIntercept(false)" to your LinearRegression. LinearRegression by default includes an intercept in the model, e.g. label = intercept + features dot weight To get the result you want, you need to force the intercept to be zero. Just curious, are you trying to solve systems of linear equations? If so, you can probably try breeze. On Sun, Oct 25, 2015 at 9:10 PM, Zhiliang Zhu wrote: > > > > On Monday, October 26, 2015 11:26 AM, Zhiliang Zhu > wrote: > > > Hi DB Tsai, > > Thanks very much for your kind help. I get it now. > > I am sorry that there is another issue, the weight/coefficient result is > perfect while A is triangular matrix, however, while A is not triangular > matrix (but > transformed from triangular matrix, still is invertible), the result seems > not perfect and difficult to make it better by resetting the parameter. > Would you help comment some about that... > > List localTraining = Lists.newArrayList( > new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), > new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), > new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), > new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0))); > ... > LinearRegression lr = new LinearRegression() > .setMaxIter(2) > .setRegParam(0) > .setElasticNetParam(0); > > > -- > > It seems that no matter how to reset the parameters for lr , the output of > x3 and x4 is always nearly the same . > Whether there is some way to make the result a little better... > > > -- > > x3 and x4 could not become better, the output is: > Final w: > [0.999477672867,1.999748740578,3.500112393734,3.50011239377] > > Thank you, > Zhiliang > > > > On Monday, October 26, 2015 10:25 AM, DB Tsai wrote: > > > Column 4 is always constant, so no predictive power resulting zero weight. > > On Sunday, October 25, 2015, Zhiliang Zhu wrote: > > Hi DB Tsai, > > Thanks very much for your kind reply help. > > As for your comment, I just modified and tested the key part of the codes: > > LinearRegression lr = new LinearRegression() > .setMaxIter(1) > .setRegParam(0) > .setElasticNetParam(0); //the number could be reset > > final LinearRegressionModel model = lr.fit(training); > > Now the output is much reasonable, however, x4 is always 0 while repeatedly > reset those parameters in lr , would you help some about it how to properly > set the parameters ... > > Final w: [1.00127825909,1.99979185054,2.3307136,0.0] > > Thank you, > Zhiliang > > > > > On Monday, October 26, 2015 5:14 AM, DB Tsai wrote: > > > LinearRegressionWithSGD is not stable. Please use linear regression in > ML package instead. > http://spark.apache.org/docs/latest/ml-linear-methods.ht
Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
On Monday, October 26, 2015 11:26 AM, Zhiliang Zhu wrote: Hi DB Tsai, Thanks very much for your kind help. I get it now. I am sorry that there is another issue, the weight/coefficient result is perfect while A is triangular matrix, however, while A is not triangular matrix (but transformed from triangular matrix, still is invertible), the result seems not perfect and difficult to make it better by resetting the parameter.Would you help comment some about that... List localTraining = Lists.newArrayList( new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0)));...LinearRegression lr = new LinearRegression() .setMaxIter(2) .setRegParam(0) .setElasticNetParam(0); -- It seems that no matter how to reset the parameters for lr , the output of x3 and x4 is always nearly the same .Whether there is some way to make the result a little better... -- x3 and x4 could not become better, the output is: Final w: [0.999477672867,1.999748740578,3.500112393734,3.50011239377] Thank you,Zhiliang On Monday, October 26, 2015 10:25 AM, DB Tsai wrote: Column 4 is always constant, so no predictive power resulting zero weight. On Sunday, October 25, 2015, Zhiliang Zhu wrote: Hi DB Tsai, Thanks very much for your kind reply help. As for your comment, I just modified and tested the key part of the codes: LinearRegression lr = new LinearRegression() .setMaxIter(1) .setRegParam(0) .setElasticNetParam(0); //the number could be reset final LinearRegressionModel model = lr.fit(training); Now the output is much reasonable, however, x4 is always 0 while repeatedly reset those parameters in lr , would you help some about it how to properly set the parameters ... Final w: [1.00127825909,1.99979185054,2.3307136,0.0] Thank you,Zhiliang On Monday, October 26, 2015 5:14 AM, DB Tsai wrote: LinearRegressionWithSGD is not stable. Please use linear regression in ML package instead. http://spark.apache.org/docs/latest/ml-linear-methods.html Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Oct 25, 2015 at 10:14 AM, Zhiliang Zhu wrote: > Dear All, > > I have some program as below which makes me very much confused and > inscrutable, it is about multiple dimension linear regression mode, the > weight / coefficient is always perfect while the dimension is smaller than > 4, otherwise it is wrong all the time. > Or, whether the LinearRegressionWithSGD would be selected for another one? > > public class JavaLinearRegression { > public static void main(String[] args) { > SparkConf conf = new SparkConf().setAppName("Linear Regression > Example"); > JavaSparkContext sc = new JavaSparkContext(conf); > SQLContext jsql = new SQLContext(sc); > > //Ax = b, x = [1, 2, 3, 4] would be the only one output about weight > //x1 + 2 * x2 + 3 * x3 + 4 * x4 = y would be the multiple linear mode > List localTraining = Lists.newArrayList( > new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), > new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), > new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), > new LabeledPoint(16.0, Vectors.dense(0.0, 0.0, 0.0, 4.0))); > > JavaRDD training = sc.parallelize(localTraining).cache(); > > // Building the model > int numIterations = 1000; //the number could be reset large > final LinearRegressionModel model = > LinearRegressionWithSGD.train(JavaRDD.toRDD(training), numIterations); > > //the coefficient weights are perfect while dimension of LabeledPoint is > SMALLER than 4. > //otherwise the output is always wrong and inscrutable. > //for instance, one output is > //Final w: > [2.537341836047772E25,-7.744333206289736E24,6.697875883454909E23,-2.6704705246777624E22] > System.out.print("Final w: " + model.weights() + "\n\n"); > } > } > > I would appreciate your kind help or guidance very much~~ > > Thank you! > Zhiliang > > -- - DBSent from my iPhone
Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
Hi DB Tsai, Thanks very much for your kind help. I get it now. I am sorry that there is another issue, the weight/coefficient result is perfect while A is triangular matrix, however, while A is not triangular matrix (but transformed from triangular matrix, still is invertible), the result seems not perfect and difficult to make it better by resetting the parameter.Would you help comment some about that... List localTraining = Lists.newArrayList( new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), new LabeledPoint(-3.0, Vectors.dense(0.0, 0.0, -1.0, 0.0)));...LinearRegression lr = new LinearRegression() .setMaxIter(2) .setRegParam(0) .setElasticNetParam(0); x3 and x4 could not become better, the output is: Final w: [0.999477672867,1.999748740578,3.500112393734,3.50011239377] Thank you,Zhiliang On Monday, October 26, 2015 10:25 AM, DB Tsai wrote: Column 4 is always constant, so no predictive power resulting zero weight. On Sunday, October 25, 2015, Zhiliang Zhu wrote: Hi DB Tsai, Thanks very much for your kind reply help. As for your comment, I just modified and tested the key part of the codes: LinearRegression lr = new LinearRegression() .setMaxIter(1) .setRegParam(0) .setElasticNetParam(0); //the number could be reset final LinearRegressionModel model = lr.fit(training); Now the output is much reasonable, however, x4 is always 0 while repeatedly reset those parameters in lr , would you help some about it how to properly set the parameters ... Final w: [1.00127825909,1.99979185054,2.3307136,0.0] Thank you,Zhiliang On Monday, October 26, 2015 5:14 AM, DB Tsai wrote: LinearRegressionWithSGD is not stable. Please use linear regression in ML package instead. http://spark.apache.org/docs/latest/ml-linear-methods.html Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Oct 25, 2015 at 10:14 AM, Zhiliang Zhu wrote: > Dear All, > > I have some program as below which makes me very much confused and > inscrutable, it is about multiple dimension linear regression mode, the > weight / coefficient is always perfect while the dimension is smaller than > 4, otherwise it is wrong all the time. > Or, whether the LinearRegressionWithSGD would be selected for another one? > > public class JavaLinearRegression { > public static void main(String[] args) { > SparkConf conf = new SparkConf().setAppName("Linear Regression > Example"); > JavaSparkContext sc = new JavaSparkContext(conf); > SQLContext jsql = new SQLContext(sc); > > //Ax = b, x = [1, 2, 3, 4] would be the only one output about weight > //x1 + 2 * x2 + 3 * x3 + 4 * x4 = y would be the multiple linear mode > List localTraining = Lists.newArrayList( > new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), > new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), > new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), > new LabeledPoint(16.0, Vectors.dense(0.0, 0.0, 0.0, 4.0))); > > JavaRDD training = sc.parallelize(localTraining).cache(); > > // Building the model > int numIterations = 1000; //the number could be reset large > final LinearRegressionModel model = > LinearRegressionWithSGD.train(JavaRDD.toRDD(training), numIterations); > > //the coefficient weights are perfect while dimension of LabeledPoint is > SMALLER than 4. > //otherwise the output is always wrong and inscrutable. > //for instance, one output is > //Final w: > [2.537341836047772E25,-7.744333206289736E24,6.697875883454909E23,-2.6704705246777624E22] > System.out.print("Final w: " + model.weights() + "\n\n"); > } > } > > I would appreciate your kind help or guidance very much~~ > > Thank you! > Zhiliang > > -- - DBSent from my iPhone
Re: [SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
Hi DB Tsai, Thanks very much for your kind reply help. As for your comment, I just modified and tested the key part of the codes: LinearRegression lr = new LinearRegression() .setMaxIter(1) .setRegParam(0) .setElasticNetParam(0); //the number could be reset final LinearRegressionModel model = lr.fit(training); Now the output is much reasonable, however, x4 is always 0 while repeatedly reset those parameters in lr , would you help some about it how to properly set the parameters ... Final w: [1.00127825909,1.99979185054,2.3307136,0.0] Thank you,Zhiliang On Monday, October 26, 2015 5:14 AM, DB Tsai wrote: LinearRegressionWithSGD is not stable. Please use linear regression in ML package instead. http://spark.apache.org/docs/latest/ml-linear-methods.html Sincerely, DB Tsai -- Web: https://www.dbtsai.com PGP Key ID: 0xAF08DF8D On Sun, Oct 25, 2015 at 10:14 AM, Zhiliang Zhu wrote: > Dear All, > > I have some program as below which makes me very much confused and > inscrutable, it is about multiple dimension linear regression mode, the > weight / coefficient is always perfect while the dimension is smaller than > 4, otherwise it is wrong all the time. > Or, whether the LinearRegressionWithSGD would be selected for another one? > > public class JavaLinearRegression { > public static void main(String[] args) { > SparkConf conf = new SparkConf().setAppName("Linear Regression > Example"); > JavaSparkContext sc = new JavaSparkContext(conf); > SQLContext jsql = new SQLContext(sc); > > //Ax = b, x = [1, 2, 3, 4] would be the only one output about weight > //x1 + 2 * x2 + 3 * x3 + 4 * x4 = y would be the multiple linear mode > List localTraining = Lists.newArrayList( > new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), > new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), > new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), > new LabeledPoint(16.0, Vectors.dense(0.0, 0.0, 0.0, 4.0))); > > JavaRDD training = sc.parallelize(localTraining).cache(); > > // Building the model > int numIterations = 1000; //the number could be reset large > final LinearRegressionModel model = > LinearRegressionWithSGD.train(JavaRDD.toRDD(training), numIterations); > > //the coefficient weights are perfect while dimension of LabeledPoint is > SMALLER than 4. > //otherwise the output is always wrong and inscrutable. > //for instance, one output is > //Final w: > [2.537341836047772E25,-7.744333206289736E24,6.697875883454909E23,-2.6704705246777624E22] > System.out.print("Final w: " + model.weights() + "\n\n"); > } > } > > I would appreciate your kind help or guidance very much~~ > > Thank you! > Zhiliang > >
[SPARK MLLIB] could not understand the wrong and inscrutable result of Linear Regression codes
Dear All, I have some program as below which makes me very much confused and inscrutable, it is about multiple dimension linear regression mode, the weight / coefficient is always perfect while the dimension is smaller than 4, otherwise it is wrong all the time.Or, whether the LinearRegressionWithSGD would be selected for another one? public class JavaLinearRegression { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext jsql = new SQLContext(sc); //Ax = b, x = [1, 2, 3, 4] would be the only one output about weight //x1 + 2 * x2 + 3 * x3 + 4 * x4 = y would be the multiple linear mode List localTraining = Lists.newArrayList( new LabeledPoint(30.0, Vectors.dense(1.0, 2.0, 3.0, 4.0)), new LabeledPoint(29.0, Vectors.dense(0.0, 2.0, 3.0, 4.0)), new LabeledPoint(25.0, Vectors.dense(0.0, 0.0, 3.0, 4.0)), new LabeledPoint(16.0, Vectors.dense(0.0, 0.0, 0.0, 4.0))); JavaRDD training = sc.parallelize(localTraining).cache(); // Building the model int numIterations = 1000; //the number could be reset large final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(training), numIterations); //the coefficient weights are perfect while dimension of LabeledPoint is SMALLER than 4. //otherwise the output is always wrong and inscrutable. //for instance, one output is //Final w: [2.537341836047772E25,-7.744333206289736E24,6.697875883454909E23,-2.6704705246777624E22] System.out.print("Final w: " + model.weights() + "\n\n"); }} I would appreciate your kind help or guidance very much~~ Thank you!Zhiliang
Re: How to get inverse Matrix / RDD or how to solve linear system of equations
Hi Sujit , Firstly, I must show my deep appreciation and respect towards your kind help and excellent knowledge.It would be the best if you and me are in the same place then I shall specially go to express my thanks and respect to you. I will try your way by spark mllib SVD . For Linear Regression, Ax = b, in fact I want to view their variables and coefficient conversely, just as (1): x1 * a1 + x2 * a2 + ... + xn * an = b , there is only with one linear formula for it.There are also training data set with n number of point tuple [a11, a21, ..., an1, b1] just from [A, b] (variables ), then the coefficient x = [x1, x2, ..., xn]T may be got by mllib linear regression. However, I tested spark mllib LR, while the point tuple dimension is more than 6, it would need more than 100 000 number of iterations to get enough accurate solution about its coefficient, the time complexity is too much, the time cost would be very tremendous while the dimension is hundreds of. In effect, I am working on algorithm optimization with specific model not in MLlib, that is object quadratic functionf(x1, x2, ..., xn) with lots of linear constraint conditions, then I use Lagrange way to convert the question as linear system of equations.My last problem is that, whether spark is properly used to algorithm optimization , or just directly use org.apache.spark.mllib.optimization, or by some other way, or it is not much convenient for this application... Thank you very much~~Zhiliang On Saturday, October 24, 2015 12:41 AM, Sujit Pal wrote: Hi Zhiliang, For a system of equations AX = y, Linear Regression will give you a best-fit estimate for A (coefficient vector) for a matrix of feature variables X and corresponding target variable y for a subset of your data. OTOH, what you are looking for here is to solve for x a system of equations Ax = b, where A and b are known and you want the vector x. This Math Stackexchange page [2] explains the math in more detail, but basically... A * x = b can be rewritten as x = A.I * b. You can get the pseudo-inverse of A using SVD (Spark MLLib supports SVD [1]). So the SVD decomposition would make A a product of three other matrices. A = U * S * V.T and the pseudo-inverse can be written as: A.I = V * S * U.T Then x can be found by multiplying A.I with b. -sujit [1] https://spark.apache.org/docs/1.2.0/mllib-dimensionality-reduction.html[2] http://math.stackexchange.com/questions/458404/how-can-we-compute-pseudoinverse-for-any-matrix On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhu wrote: Hi Sujit, and All, Currently I lost in large difficulty, I am eager to get some help from you. There is some big linear system of equations as:Ax = b, A with N number of row and N number of column, N is very large, b = [0, 0, ..., 0, 1]TThen, I will sovle it to get x = [x1, x2, ..., xn]T. The simple solution would be to get inverse(A), and then x = (inverse(A)) * b .A would be some JavaRDD> , however, for RDD/matrix there is add/multply/transpose APIs, no inverse API for it! Then, how would it conveniently get inverse(A), or just solve the linear system of equations by some other way...In Spark MLlib, there was linear regression, the training process might be to solve the coefficients to get some specific linear model, just is, Ax = y, just train by (x, y) to get A , this might be used to solve the linear system of equations. It is like that? I could not decide. I must show my deep appreciation torwards your all help. Thank you very much!Zhiliang
How to get inverse Matrix / RDD or how to solve linear system of equations
Hi Sujit, and All, Currently I lost in large difficulty, I am eager to get some help from you. There is some big linear system of equations as:Ax = b, A with N number of row and N number of column, N is very large, b = [0, 0, ..., 0, 1]TThen, I will sovle it to get x = [x1, x2, ..., xn]T. The simple solution would be to get inverse(A), and then x = (inverse(A)) * b .A would be some JavaRDD> , however, for RDD/matrix there is add/multply/transpose APIs, no inverse API for it! Then, how would it conveniently get inverse(A), or just solve the linear system of equations by some other way...In Spark MLlib, there was linear regression, the training process might be to solve the coefficients to get some specific linear model, just is, Ax = y, just train by (x, y) to get A , this might be used to solve the linear system of equations. It is like that? I could not decide. I must show my deep appreciation torwards your all help. Thank you very much!Zhiliang
[Spark MLlib] How to apply spark ml given models for questions with general background
Dear All, I am new for spark ml. There is some project for me, for some given math model and I would like to get its optimized solution.It is very similar with spark mllib application. However, the key problem for me is that the given math model is not obviously belonging to the models ( as classification, regression,clustering, collaborative filtering, dimensionality reduction ) provided in spark ml... For some specific application , I think the most important thing is to find the proper model for it from the known spark mllib, then all will follow the steps, since the optimizer is alreadyunder the mllib. However, my question is that, generally how it would go if the specific application is exactly belonging to the given models in mllib? Whether it generally convenient to split the specificbackground and convert into the given model? What is the general way to apply mllib for some specific backgrounds? I must appreciate your help very much! Thank you,Zhiliang
[Spark ML] How to extends MLlib's optimization algorithm
Dear All, I would like to use spark ml to develop some project related with optimization algorithm, however, in spark 1.4.1 it seems that under ml's optimizer there are only about 2 optimization algorithms. My project may needs more kinds of optimization algorithms, then how would I use spark ml to develop it? And for the given optimization algorithm, it would be with different constraint conditions (math formula), exactly how would I actualize & solve those complex math formulas in the optimization algorithm? If there are some comments, or some examples / links, it would be much useful.I must appreciate your help very much! Thank you,Zhiliang
Re: How to properly set conf/spark-env.sh for spark to run on yarn
It is working properly now, by command "spark-submit --master yarn-cluster " .It seems that it will not run by way of "spark-submit --master yarn-client " on yarn. Thanks a lot for all your help . On Saturday, September 26, 2015 2:27 PM, Gavin Yue wrote: It is working, We are doing the same thing everyday. But the remote server needs to able to talk with ResourceManager. If you are using Spark-submit, your will also specify the hadoop conf directory in your Env variable. Spark would rely on that to locate where the cluster's resource manager is. I think this tutorial is pretty clear: http://spark.apache.org/docs/latest/running-on-yarn.html On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu wrote: Hi Yue, Thanks very much for your kind reply. I would like to submit spark job remotely on another machine outside the cluster,and the job will run on yarn, similar as hadoop job is already done, could youconfirm it could exactly work for spark... Do you mean that I would print those variables on linux command side? Best Regards,Zhiliang On Saturday, September 26, 2015 10:07 AM, Gavin Yue wrote: Print out your env variables and check first Sent from my iPhone On Sep 25, 2015, at 18:43, Zhiliang Zhu wrote: Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... I shall sincerely appreciate your kind help very much!Zhiliang
Re: How to properly set conf/spark-env.sh for spark to run on yarn
Hi All, Would some expert help me some about the issue... I shall appreciate you kind help very much! Thank you! Zhiliang On Sunday, September 27, 2015 7:40 PM, Zhiliang Zhu wrote: Hi Alexis, Gavin, Thanks very much for your kind comment.My spark command is : spark-submit --class com.zyyx.spark.example.LinearRegression --master yarn-client LinearRegression.jar Both spark-shell and spark-submit will not run, all is hanging during the stage, 15/09/27 19:18:06 INFO yarn.Client: Application report for application_1440676456544_0727 (state: ACCEPTED)... The more deeper error log under /hdfs/yarn/logs/: 15/09/27 19:10:37 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 53882. 15/09/27 19:10:37 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/27 19:10:37 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:39581, retrying ... 15/09/27 19:10:37 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:39581, retrying ... For the all machine nodes, I just installed hadoop and spark, with same path & file & configuration, and copied one of the hadoop & spark directory to the remote gateway machine, the all would be with same path & file name & configuration under different nodes. In the link Running Spark on YARN - Spark 1.5.0 Documentation, there is some words as:Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster.These configs are used to write to HDFS and connect to the YARN ResourceManager. I do not exactly catch the first sentence. hadoop version is 2.5.2, spark version is 1.4.1 The spark-env.sh setting, export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master02 export SPARK_LOCAL_IP=localhost export SPARK_LOCAL_DIRS=/data/spark_local_dir Would you help point out what is wrong place I made...I must show sincere appreciation towards your help. Best Regards,Zhiliang On Saturday, September 26, 2015 2:27 PM, Gavin Yue wrote: It is working, We are doing the same thing everyday. But the remote server needs to able to talk with ResourceManager. If you are using Spark-submit, your will also specify the hadoop conf directory in your Env variable. Spark would rely on that to locate where the cluster's resource manager is. I think this tutorial is pretty clear: http://spark.apache.org/docs/latest/running-on-yarn.html On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu wrote: Hi Yue, Thanks very much for your kind reply. I would like to submit spark job remotely on another machine outside the cluster,and the job will run on yarn, similar as hadoop job is already done, could youconfirm it could exactly work for spark... Do you mean that I would print those variables on linux command side? Best Regards,Zhiliang On Saturday, September 26, 2015 10:07 AM, Gavin Yue wrote: Print out your env variables and check first Sent from my iPhone On Sep 25, 2015, at 18:43, Zhiliang Zhu wrote: Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.Appl
Re: How to properly set conf/spark-env.sh for spark to run on yarn
Hi Alexis, Gavin, Thanks very much for your kind comment.My spark command is : spark-submit --class com.zyyx.spark.example.LinearRegression --master yarn-client LinearRegression.jar Both spark-shell and spark-submit will not run, all is hanging during the stage, 15/09/27 19:18:06 INFO yarn.Client: Application report for application_1440676456544_0727 (state: ACCEPTED)... The more deeper error log under /hdfs/yarn/logs/: 15/09/27 19:10:37 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 53882. 15/09/27 19:10:37 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/27 19:10:37 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:39581, retrying ... 15/09/27 19:10:37 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:39581, retrying ... For the all machine nodes, I just installed hadoop and spark, with same path & file & configuration, and copied one of the hadoop & spark directory to the remote gateway machine, the all would be with same path & file name & configuration under different nodes. In the link Running Spark on YARN - Spark 1.5.0 Documentation, there is some words as:Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster.These configs are used to write to HDFS and connect to the YARN ResourceManager. I do not exactly catch the first sentence. hadoop version is 2.5.2, spark version is 1.4.1 The spark-env.sh setting, export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master02 export SPARK_LOCAL_IP=localhost export SPARK_LOCAL_DIRS=/data/spark_local_dir Would you help point out what is wrong place I made...I must show sincere appreciation towards your help. Best Regards,Zhiliang On Saturday, September 26, 2015 2:27 PM, Gavin Yue wrote: It is working, We are doing the same thing everyday. But the remote server needs to able to talk with ResourceManager. If you are using Spark-submit, your will also specify the hadoop conf directory in your Env variable. Spark would rely on that to locate where the cluster's resource manager is. I think this tutorial is pretty clear: http://spark.apache.org/docs/latest/running-on-yarn.html On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu wrote: Hi Yue, Thanks very much for your kind reply. I would like to submit spark job remotely on another machine outside the cluster,and the job will run on yarn, similar as hadoop job is already done, could youconfirm it could exactly work for spark... Do you mean that I would print those variables on linux command side? Best Regards,Zhiliang On Saturday, September 26, 2015 10:07 AM, Gavin Yue wrote: Print out your env variables and check first Sent from my iPhone On Sep 25, 2015, at 18:43, Zhiliang Zhu wrote: Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.Appl
Re: How to properly set conf/spark-env.sh for spark to run on yarn
Hi Yue, Thanks very much for your kind reply. I would like to submit spark job remotely on another machine outside the cluster,and the job will run on yarn, similar as hadoop job is already done, could youconfirm it could exactly work for spark... Do you mean that I would print those variables on linux command side? Best Regards,Zhiliang On Saturday, September 26, 2015 10:07 AM, Gavin Yue wrote: Print out your env variables and check first Sent from my iPhone On Sep 25, 2015, at 18:43, Zhiliang Zhu wrote: Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... I shall sincerely appreciate your kind help very much!Zhiliang
How to properly set conf/spark-env.sh for spark to run on yarn
Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang
How to properly set conf/spark-env.sh for spark to run on yarn
Hi All, I would like to submit spark job on some another remote machine outside the cluster,I also copied hadoop/spark conf files under the remote machine, then hadoopjob would be submitted, but spark job would not. In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or for some other reasons... This issue is urgent for me, would some expert provide some help about this problem... I will show sincere appreciation towards your help. Thank you!Best Regards,Zhiliang On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... I shall sincerely appreciate your kind help very much!Zhiliang
Re: How to set spark envoirnment variable SPARK_LOCAL_IP in conf/spark-env.sh
On Friday, September 25, 2015 7:46 PM, Zhiliang Zhu wrote: Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: <--- it is for SPARK_LOCAL_IP=localhost 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... I shall sincerely appreciate your kind help very much!Zhiliang
How to set spark envoirnment variable SPARK_LOCAL_IP in conf/spark-env.sh
Hi all, The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just set asexport SPARK_LOCAL_IP=localhost #or set as the specific node ip on the specific spark install directory It will work well to submit spark job on master node of cluster, however, it will fail by way of some gateway machine remotely. The gateway machine is already configed, it works well to submit hadoop job.It is set as: export SCALA_HOME=/usr/lib/scala export JAVA_HOME=/usr/java/jdk1.7.0_45 export R_HOME=/usr/lib/r export HADOOP_HOME=/usr/lib/hadoop export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export SPARK_MASTER_IP=master01 #export SPARK_LOCAL_IP=master01 #if no SPARK_LOCAL_IP is set, SparkContext will not start export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is started, but failed later export SPARK_LOCAL_DIRS=/data/spark_local_dir ... The error messages: 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 48133. 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 127.0.0.1:35706, retrying ... I shall sincerely appreciate your kind help very much!Zhiliang
Re: how to submit the spark job outside the cluster
It seems that is due to spark SPARK_LOCAL_IP setting.export SPARK_LOCAL_IP=localhost will not work. Then, how it would be set. Thank you all~~ On Friday, September 25, 2015 5:57 PM, Zhiliang Zhu wrote: Hi Steve, Thanks a lot for your reply. That is, some commands could work on the remote server gateway installed , but some other commands will not work.As expected, the remote machine is not in the same area network as the cluster, and the cluster's portis forbidden. While I make the remote machine gateway for another local area cluster, it works fine, and the hadoopjob could be submitted on the machine remotedly. However, I want to submit spark jobs remotely as hadoop jobs do In the gateway machine, I also copied the spark install directory from the cluster to it, conf/spark-env.shis also there. But I fail to submit spark job remotely...The error messages: 15/09/25 17:47:47 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/09/25 17:47:47 INFO Remoting: Starting remoting 15/09/25 17:47:48 ERROR netty.NettyTransport: failed to bind to /220.250.64.225:0, shutting down Netty transport 15/09/25 17:47:48 WARN util.Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. ... Would you help some about it ... Thank you very much!Zhiliang On Friday, September 25, 2015 5:21 PM, Steve Loughran wrote: On 25 Sep 2015, at 05:25, Zhiliang Zhu wrote: However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at the remote machine with gateway, which means the namenode is reachable; all those commands only need to interact with it. but commands "hadoop fs -cat/-put XXX YYY" would not work with error message as below: put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation. 15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010] the client can't reach the datanodes
Re: how to submit the spark job outside the cluster
Hi Steve, Thanks a lot for your reply. That is, some commands could work on the remote server gateway installed , but some other commands will not work.As expected, the remote machine is not in the same area network as the cluster, and the cluster's portis forbidden. While I make the remote machine gateway for another local area cluster, it works fine, and the hadoopjob could be submitted on the machine remotedly. However, I want to submit spark jobs remotely as hadoop jobs do In the gateway machine, I also copied the spark install directory from the cluster to it, conf/spark-env.shis also there. But I fail to submit spark job remotely...The error messages: 15/09/25 17:47:47 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/09/25 17:47:47 INFO Remoting: Starting remoting 15/09/25 17:47:48 ERROR netty.NettyTransport: failed to bind to /220.250.64.225:0, shutting down Netty transport 15/09/25 17:47:48 WARN util.Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. ... Would you help some about it ... Thank you very much!Zhiliang On Friday, September 25, 2015 5:21 PM, Steve Loughran wrote: On 25 Sep 2015, at 05:25, Zhiliang Zhu wrote: However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at the remote machine with gateway, which means the namenode is reachable; all those commands only need to interact with it. but commands "hadoop fs -cat/-put XXX YYY" would not work with error message as below: put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation. 15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010] the client can't reach the datanodes
Re: how to submit the spark job outside the cluster
And the remote machine is not in the same local area network with the cluster . On Friday, September 25, 2015 12:28 PM, Zhiliang Zhu wrote: Hi Zhan, I have done that as your kind help. However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at the remote machine with gateway, but commands "hadoop fs -cat/-put XXX YYY" would not work with error message as below: put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation. 15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]... in the cluster, all machines' /etc/hosts10.6.32.132 master #all is local area network ip 10.6.28.96 core1 #must this place use global ip, in order to operate for remote machine ? 10.6.26.160 core2 in the remote machine's /etc/hosts 42.62.77.77 master #all is global area network ip, or else no commands will work 42.62.77.81 core1 #but still -cat / -put will not work 42.62.77.83 core2 Would you help comment some... Thank you very much!Zhiliang On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang wrote: Hi Zhiliang, I cannot find a specific doc. But as far as I remember, you can log in one of your cluster machine, and find the hadoop configuration location, for example /etc/hadoop/conf, copy that directory to your local machine. Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, and the latter is used to launch application on top of yarn. Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. Thanks. Zhan Zhang On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu wrote: Hi Zhan, Yes, I get it now. I have not ever deployed hadoop configuration locally, and do not find the specific doc, would you help provide the doc to do that... Thank you,Zhiliang On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang wrote: There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu wrote: Hi Zhan, Thanks very much for your help comment.I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that whenit comes to spark. Have you ever tried that for spark...Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for meto do that, I do not find the specific doc for it. Best Regards,Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
Re: how to submit the spark job outside the cluster
Hi Zhan, I have done that as your kind help. However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at the remote machine with gateway, but commands "hadoop fs -cat/-put XXX YYY" would not work with error message as below: put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 2 datanode(s) running and 2 node(s) are excluded in this operation. 15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]... in the cluster, all machines' /etc/hosts10.6.32.132 master #all is local area network ip 10.6.28.96 core1 #must this place use global ip, in order to operate for remote machine ? 10.6.26.160 core2 in the remote machine's /etc/hosts 42.62.77.77 master #all is global area network ip, or else no commands will work 42.62.77.81 core1 #but still -cat / -put will not work 42.62.77.83 core2 Would you help comment some... Thank you very much!Zhiliang On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang wrote: Hi Zhiliang, I cannot find a specific doc. But as far as I remember, you can log in one of your cluster machine, and find the hadoop configuration location, for example /etc/hadoop/conf, copy that directory to your local machine. Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, and the latter is used to launch application on top of yarn. Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. Thanks. Zhan Zhang On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu wrote: Hi Zhan, Yes, I get it now. I have not ever deployed hadoop configuration locally, and do not find the specific doc, would you help provide the doc to do that... Thank you,Zhiliang On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang wrote: There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu wrote: Hi Zhan, Thanks very much for your help comment.I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that whenit comes to spark. Have you ever tried that for spark...Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for meto do that, I do not find the specific doc for it. Best Regards,Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
Re: How to subtract two RDDs with same size
Hi Sujit, It is wonderful for you!I must show my sincere appreciation towards your kind help. Thank you very much!Best Regards,Zhiliang On Wednesday, September 23, 2015 10:15 PM, Sujit Pal wrote: Hi Zhiliang, How about doing something like this? val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2)) The first zip will join the two RDDs and produce an RDD of (Array[Float], Array[Float]) pairs. On each pair, we zip the two Array[Float] components together to form an Array[(Float, Float)] and then we subtract the first element from the second in the inner map (the inner map is a Scala map not a Spark one). I tried this out on a notebook: val rdd1 = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0), Array(7.0, 8.0, 9.0)))val rdd2 = sc.parallelize(List(Array(1.0, 4.0, 3.0), Array(4.0, 10.0, 6.0), Array(7.0, 16.0, 9.0)))val rdd3 = rdd1.zip(rdd2).map(p => p._1.zip(p._2).map(z => z._1 - z._2))rdd3.collect() gives me:res0: Array[Array[Double]] = Array(Array(0.0, -2.0, 0.0), Array(0.0, -5.0, 0.0), Array(0.0, -8.0, 0.0)) -sujit On Wed, Sep 23, 2015 at 12:23 AM, Zhiliang Zhu wrote: there is matrix add API, might map rdd2 each row element to be negative , then make rdd1 and rdd2 and call add ? Or some more ways ... On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu wrote: Hi All, There are two RDDs : RDD> rdd1, and RDD> rdd2,that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with same row number and column number. I would like to get RDD> rdd3, each element in rdd3 is the subtract between rdd1 and rdd2 of thesame position, which is similar Matrix subtract:rdd3 = rdd1 - rdd2 ... It seemed very difficult to operate this kinds of matrix arithmetic, even is about add, subtract, multiple , diff etc... I shall appreciate your help very much~~Zhiliang
Re: How to subtract two RDDs with same size
there is matrix add API, might map rdd2 each row element to be negative , then make rdd1 and rdd2 and call add ? Or some more ways ... On Wednesday, September 23, 2015 3:11 PM, Zhiliang Zhu wrote: Hi All, There are two RDDs : RDD> rdd1, and RDD> rdd2,that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with same row number and column number. I would like to get RDD> rdd3, each element in rdd3 is the subtract between rdd1 and rdd2 of thesame position, which is similar Matrix subtract:rdd3 = rdd1 - rdd2 ... It seemed very difficult to operate this kinds of matrix arithmetic, even is about add, subtract, multiple , diff etc... I shall appreciate your help very much~~Zhiliang
How to subtract two RDDs with same size
Hi All, There are two RDDs : RDD> rdd1, and RDD> rdd2,that is to say, rdd1 and rdd2 are similar with DataFrame, or Matrix with same row number and column number. I would like to get RDD> rdd3, each element in rdd3 is the subtract between rdd1 and rdd2 of thesame position, which is similar Matrix subtract:rdd3 = rdd1 - rdd2 ... It seemed very difficult to operate this kinds of matrix arithmetic, even is about add, subtract, multiple , diff etc... I shall appreciate your help very much~~Zhiliang
Re: how to submit the spark job outside the cluster
Hi Zhan, I really appreciate your help, I will do as that next.And on the local machine, no hadoop/spark needs to be installed, but only copied with the /etc/hadoop/conf... whether the information (for example IP, hostname etc) of local machine would be set in the conf files... Moreover, do you have any exprience to submit hadoop/spark job by way of java program deployed on thegateway node, but not by way of hadoop/spark command... Thank you very much~Best Regards,Zhiliang On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang wrote: Hi Zhiliang, I cannot find a specific doc. But as far as I remember, you can log in one of your cluster machine, and find the hadoop configuration location, for example /etc/hadoop/conf, copy that directory to your local machine. Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, and the latter is used to launch application on top of yarn. Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. Thanks. Zhan Zhang On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu wrote: Hi Zhan, Yes, I get it now. I have not ever deployed hadoop configuration locally, and do not find the specific doc, would you help provide the doc to do that... Thank you,Zhiliang On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang wrote: There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu wrote: Hi Zhan, Thanks very much for your help comment.I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that whenit comes to spark. Have you ever tried that for spark...Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for meto do that, I do not find the specific doc for it. Best Regards,Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
Re: how to submit the spark job outside the cluster
Hi Zhan, Yes, I get it now. I have not ever deployed hadoop configuration locally, and do not find the specific doc, would you help provide the doc to do that... Thank you,Zhiliang On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang wrote: There is no difference between running the client in or out of the client (assuming there is no firewall or network connectivity issue), as long as you have hadoop configuration locally. Here is the doc for running on yarn. http://spark.apache.org/docs/latest/running-on-yarn.html Thanks. Zhan Zhang On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu wrote: Hi Zhan, Thanks very much for your help comment.I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that whenit comes to spark. Have you ever tried that for spark...Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for meto do that, I do not find the specific doc for it. Best Regards,Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
Re: how to submit the spark job outside the cluster
Hi Zhan, Thanks very much for your help comment.I also view it would be similar to hadoop job submit, however, I was not deciding whether it is like that when it comes to spark. Have you ever tried that for spark...Would you give me the deployment doc for hadoop and spark gateway, since this is the first time for meto do that, I do not find the specific doc for it. Best Regards,Zhiliang On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang wrote: It should be similar to other hadoop jobs. You need hadoop configuration in your client machine, and point the HADOOP_CONF_DIR in spark to the configuration. Thanks Zhan Zhang On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu wrote: Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
how to submit the spark job outside the cluster
Dear Experts, Spark job is running on the cluster by yarn. Since the job can be submited at the place on the machine from the cluster,however, I would like to submit the job from another machine which does not belong to the cluster.I know for this, hadoop job could be done by way of another machine which is installed hadoop gateway which is usedto connect the cluster. Then what would go for spark, is it same as hadoop... And where is the instruction doc for installing this gateway... Thank you very much~~Zhiliang
Re: How to get a new RDD by ordinarily subtract its adjacent rows
Dear Sujit, Since you are senior with Spark, I might not know whether it is convenient for you to help comment some on my dilemma while using spark to deal with R background application ... Thank you very much!Zhiliang On Tuesday, September 22, 2015 1:45 AM, Zhiliang Zhu wrote: Hi Romi, I must show my sincere appreciation towards your kind & helpful help. One more question, currently I am using spark to deal with financial data analysis, so lots of operations on R data.frame/matrix and stat/regressionare always called.However, SparkR currently is not that strong, most of its functions are from spark SQL and Mlib. Then, SQL and DataFrame is not as flexibly & easyas R operate on data.frame/matrix, moreover, now I do not decide how much function in Mlib would be used to R specific stat/regression . I have also thought of only operating the data by way of spark Java, it is quite much hard to act as data.frame/matrix from R .I think I have lost in risk by those. Would you help comment some on my points... Thank you very much!Zhiliang On Tuesday, September 22, 2015 1:21 AM, Sujit Pal wrote: Hi Zhiliang, Haven't used the Java API but found this Javadoc page, may be helpful to you. https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html I think the equivalent Java code snippet might go something like this: RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2) (the second parameter of fromRDD comes from this discussion thread).http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html There is also the SlidingRDD decorator:https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html So maybe something like this: new SlidingRDD(rdd1, 2, ClassTag$.apply(Class)) -sujit On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu wrote: Hi Sujit, I must appreciate your kind help very much~ It seems to be OK, however, do you know the corresponding spark Java API achievement...Is there any java API as scala sliding, and it seemed that I do not find spark scala's doc about sliding ... Thank you very much~Zhiliang On Monday, September 21, 2015 11:48 PM, Sujit Pal wrote: Hi Zhiliang, Would something like this work? val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0)) -sujit On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu wrote: Hi Romi, Thanks very much for your kind help comment~~ In fact there is some valid backgroud of the application, it is about R data analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each daily fund return, row is the daily date#fund_return_daily needs to count the each fund's daily return subtracted the previous day's return fund_return_daily <- diff(log(fund_nav_daily)) #the first row would be all 0, since there is no previous row ahead first row fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), fund_return_daily) ... I need to exactly code the R program by way of spark, then RDD/DataFrame is used to replace R data.frame, however, I just found that it is VERY MUCH diffcult to make the spark program to flexibly descript & transform R backgroud applications.I think I have seriously lost myself into risk about this... Would you help direct me some about the above coding issue... and my risk about practice in spark/R application... I must show all my sincere thanks torwards your kind help. P.S. currently sparkR in spark 1.4.1 , there is many bug in the API createDataFrame/except/unionAll, and it seemsthat spark Java has more functions than sparkR.Also, no specific R regression algorithmn is including in sparkR . Best Regards,Zhiliang On Monday, September 21, 2015 7:36 PM, Romi Kuntsman wrote: RDD is a set of data rows (in your case numbers), there is no meaning for the order of the items. What exactly are you trying to accomplish? Romi Kuntsman, Big Data Engineer http://www.totango.com On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu wrote: Dear , I have took lots of days to think into this issue, however, without any success...I shall appreciate your all kind help. There is an RDD rdd1, I would like get a new RDD rdd2, each row in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use... Thanks very much!John
Re: how to get RDD from two different RDDs with cross column
Hi Romi, Yes, you understand it correctly.And rdd1 keys are cross with rdd2 keys, that is, there are lots of same keys between rdd1 and rdd2, and there are some keys inrdd1 but not in rdd2, there are also some keys in rdd2 but not in rdd1.Then rdd3 keys would be same with rdd1 keys, rdd3 will not include the keys in rdd2 but not in rdd1, values of rdd3 will comefrom rdd2, if the keys in rdd3 is not in rdd2 its value would NOT exist. You are always much perfect in spark and having the solution about the questions, really appreciate you very much. Thank you very much~ Zhiliang On Tuesday, September 22, 2015 4:08 AM, Romi Kuntsman wrote: Hi, If I understand correctly: rdd1 contains keys (of type StringDate) rdd2 contains keys and values and rdd3 contains all the keys, and the values from rdd2? I think you should make rdd1 and rdd2 PairRDD, and then use outer join. Does that make sense? On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu wrote: Dear Romi, Priya, Sujt and Shivaram and all, I have took lots of days to think into this issue, however, without any enough good solution...I shall appreciate your all kind help. There is an RDD rdd1, and another RDD rdd2, (rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same. I would like to get a new RDD rdd3, StringDate in rdd3 would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its StringDate is in rdd2, or else NULL would be assigned. each row in rdd3[ i ] = , rdd2[i].StringDate would be same as rdd1[ i ].StringDate, then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API or function would I use... Thanks very much!Zhiliang
Re: How to get a new RDD by ordinarily subtract its adjacent rows
Hi Romi, I must show my sincere appreciation towards your kind & helpful help. One more question, currently I am using spark to deal with financial data analysis, so lots of operations on R data.frame/matrix and stat/regressionare always called.However, SparkR currently is not that strong, most of its functions are from spark SQL and Mlib. Then, SQL and DataFrame is not as flexibly & easyas R operate on data.frame/matrix, moreover, now I do not decide how much function in Mlib would be used to R specific stat/regression . I have also thought of only operating the data by way of spark Java, it is quite much hard to act as data.frame/matrix from R .I think I have lost in risk by those. Would you help comment some on my points... Thank you very much!Zhiliang On Tuesday, September 22, 2015 1:21 AM, Sujit Pal wrote: Hi Zhiliang, Haven't used the Java API but found this Javadoc page, may be helpful to you. https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html I think the equivalent Java code snippet might go something like this: RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2) (the second parameter of fromRDD comes from this discussion thread).http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html There is also the SlidingRDD decorator:https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html So maybe something like this: new SlidingRDD(rdd1, 2, ClassTag$.apply(Class)) -sujit On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu wrote: Hi Sujit, I must appreciate your kind help very much~ It seems to be OK, however, do you know the corresponding spark Java API achievement...Is there any java API as scala sliding, and it seemed that I do not find spark scala's doc about sliding ... Thank you very much~Zhiliang On Monday, September 21, 2015 11:48 PM, Sujit Pal wrote: Hi Zhiliang, Would something like this work? val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0)) -sujit On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu wrote: Hi Romi, Thanks very much for your kind help comment~~ In fact there is some valid backgroud of the application, it is about R data analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each daily fund return, row is the daily date#fund_return_daily needs to count the each fund's daily return subtracted the previous day's return fund_return_daily <- diff(log(fund_nav_daily)) #the first row would be all 0, since there is no previous row ahead first row fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), fund_return_daily) ... I need to exactly code the R program by way of spark, then RDD/DataFrame is used to replace R data.frame, however, I just found that it is VERY MUCH diffcult to make the spark program to flexibly descript & transform R backgroud applications.I think I have seriously lost myself into risk about this... Would you help direct me some about the above coding issue... and my risk about practice in spark/R application... I must show all my sincere thanks torwards your kind help. P.S. currently sparkR in spark 1.4.1 , there is many bug in the API createDataFrame/except/unionAll, and it seemsthat spark Java has more functions than sparkR.Also, no specific R regression algorithmn is including in sparkR . Best Regards,Zhiliang On Monday, September 21, 2015 7:36 PM, Romi Kuntsman wrote: RDD is a set of data rows (in your case numbers), there is no meaning for the order of the items. What exactly are you trying to accomplish? Romi Kuntsman, Big Data Engineer http://www.totango.com On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu wrote: Dear , I have took lots of days to think into this issue, however, without any success...I shall appreciate your all kind help. There is an RDD rdd1, I would like get a new RDD rdd2, each row in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use... Thanks very much!John
how to get RDD from two different RDDs with cross column
Dear Romi, Priya, Sujt and Shivaram and all, I have took lots of days to think into this issue, however, without any enough good solution...I shall appreciate your all kind help. There is an RDD rdd1, and another RDD rdd2, (rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same. I would like to get a new RDD rdd3, StringDate in rdd3 would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its StringDate is in rdd2, or else NULL would be assigned. each row in rdd3[ i ] = , rdd2[i].StringDate would be same as rdd1[ i ].StringDate, then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API or function would I use... Thanks very much!Zhiliang