Executor still on the UI even if the worker is dead
Hi TD/Cody, Why does it happen so in Spark Streaming that the executors are still shown on the UI even when the worker is killed and not in the cluster. This severely impacts my running jobs which takes too longer and the stages failing with the exception java.io.IOException: Failed to connect to --- (dead worker) Is this a bug in Spark ?? Version is 1.4.0 This is entirely against the fault tolerance of the workers. Killing a worker in a cluster of 5 impacts the entire job. Thanks, Kundan
Executor still on the UI even if the worker is dead
Hi Guys, Anyone faced this issue with spark ? Why does it happen so in Spark Streaming that the executors are still shown on the UI even when the worker is killed and not in the cluster. This severely impacts my running jobs which takes too longer and the stages failing with the exception java.io.IOException: Failed to connect to --- (dead worker) Is this a bug in Spark ?? Version is 1.4.0 Thanks, Kundan
Logistic Regression in Spark Streaming
Hi , Do we have a streaming version of Logistic Regression in Spark ? I can see its there for the Linear Regression. Has anyone used logistic regression on streaming data, it would be really helpful if you share your insights on how to train the incoming data. In my use case I am trying to use logistic regression for click through rate prediction using spark. Reason to go for online streaming mode is we have new advertisers and items coming and old items leaving. Any insights would be helpful. Regards, Kundan
Re: Logistic Regression in Spark Streaming
Agree, we have logistic regression example. I was looking for its counterpart to "StreamingLinearRegressionWithSGD". On Fri, May 27, 2016 at 1:16 PM, Alonso Isidoro Roman wrote: > I do not have any experience using LR in spark, but you can see that LR is > already implemented in mllib. > > http://spark.apache.org/docs/latest/mllib-linear-methods.html > > > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> > > 2016-05-27 9:09 GMT+02:00 kundan kumar : > >> Hi , >> >> Do we have a streaming version of Logistic Regression in Spark ? I can >> see its there for the Linear Regression. >> >> Has anyone used logistic regression on streaming data, it would be really >> helpful if you share your insights on how to train the incoming data. >> >> In my use case I am trying to use logistic regression for click through >> rate prediction using spark. Reason to go for online streaming mode is we >> have new advertisers and items coming and old items leaving. >> >> Any insights would be helpful. >> >> >> Regards, >> Kundan >> >> >
Handling categorical variables in StreamingLogisticRegressionwithSGD
Hi , I am trying to use StreamingLogisticRegressionwithSGD to build a CTR prediction model. The document : http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression mentions that the numFeatures should be *constant*. The problem that I am facing is : Since most of my variables are categorical, the numFeatures variable should be the final set of variables after encoding and parsing the categorical variables in labeled point format. Suppose, for a categorical variable x1 I have 10 distinct values in current window. But in the next window some new values/items gets added to x1 and the number of distinct values increases. How should I handle the numFeatures variable in this case, because it will change now ? Basically, my question is how should I handle the new values of the categorical variables in streaming model. Thanks, Kundan
Re: Handling categorical variables in StreamingLogisticRegressionwithSGD
Hi Sean , Thanks for the reply !! Is there anything already available in spark that can fix the depth of categorical variables. The OneHotEncoder changes the level of the vector created depending on the number of distinct values coming in the stream. Is there any parameter available with the StringIndexer so that I can fix the level of categorical variable or will I need to write some implementation of my own. Thanks, Kundan On Tue, Jul 12, 2016 at 5:43 PM, Sean Owen wrote: > Yeah, for this to work, you need to know the number of distinct values > a categorical feature will take on, ever. Sometimes that's known, > sometimes it's not. > > One option is to use an algorithm that can use categorical features > directly, like decision trees. > > You could consider hashing your features if so. So, you'd have maybe > 10 indicator columns and you hash the feature into one of those 10 > columns to figure out which one it corresponds to. Of course, when you > have an 11th value it collides with one of them and they get > conflated, but, at least you can sort of proceed. > > This is more usually done with a large number of feature values, but > maybe that's what you have. It's more problematic the smaller your > hash space is. > > On Tue, Jul 12, 2016 at 10:21 AM, kundan kumar > wrote: > > Hi , > > > > I am trying to use StreamingLogisticRegressionwithSGD to build a CTR > > prediction model. > > > > The document : > > > > > http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression > > > > mentions that the numFeatures should be constant. > > > > The problem that I am facing is : > > Since most of my variables are categorical, the numFeatures variable > should > > be the final set of variables after encoding and parsing the categorical > > variables in labeled point format. > > > > Suppose, for a categorical variable x1 I have 10 distinct values in > current > > window. > > > > But in the next window some new values/items gets added to x1 and the > number > > of distinct values increases. How should I handle the numFeatures > variable > > in this case, because it will change now ? > > > > Basically, my question is how should I handle the new values of the > > categorical variables in streaming model. > > > > Thanks, > > Kundan > > > > >
Re: Getting kafka offsets at beginning of spark streaming application
Hi Cody, My use case is something like follows : My application dies at X time and I write the offsets to a DB. Now when my application starts at time Y (few minutes later) and spark streaming reads the latest offsets using createDirectStream method. Now here I want to get the exact offset that is being picked up by the createDirectStream method at the begining of the batch. I need this to create an initialRDD. Please let me know if anything is unclear. Thanks !!! On Mon, Jan 11, 2016 at 8:54 PM, Cody Koeninger wrote: > I'm not 100% sure what you're asking. > > If you're asking if it's possible to start a stream at a particular set of > offsets, yes, one of the createDirectStream methods takes a map from > topicpartition to starting offset. > > If you're asking if it's possible to query Kafka for the offset > corresponding to a particular time, yes, but the granularity for that API > is very poor, because it's based on filesystem timestamp. You're better > off keeping an index of time to offset on your own. > > On Mon, Jan 11, 2016 at 3:09 AM, Abhishek Anand > wrote: > >> Hi, >> >> Is there a way so that I can fetch the offsets from where the spark >> streaming starts reading from Kafka when my application starts ? >> >> What I am trying is to create an initial RDD with offsest at a particular >> time passed as input from the command line and the offsets from where my >> spark streaming starts. >> >> Eg - >> >> Partition 0 -> 1000 to (offset at which my spark streaming starts) >> >> Thanks !! >> >> >> >
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ..... on worker failure
Hi, I am running a Spark Streaming Job. I was testing the fault tolerance by killing one of the workers using the kill -9 command. What I understand is, when I kill a worker the process should not die and resume the execution. But, I am getting the following error and my process is halted. org.apache.spark.shuffle.FetchFailedException: Failed to connect to . Now, when I restart the same worker or (2 workers were running on the machine and I killed just one of them) then the execution resumes and the process is completed. Please help me in understanding why on a worker failure my process is not fault tolerant. Am I missing something ? Basically I need that my process resumes even if a worker is lost. Regards, Kundan
Batch Recovering from Checkpoint is taking longer runtime than usual
Hi, Below my code snippet where I am using checkpointing feature of spark streaming. The SPARK_DURATION that I am using is 5 minutes and the batch duration is 15 minutes. I am checkpointing the data at each SPARK_DURATION (5 minutes). When I kill the job and start the next batch it takes longer time than the usual. The normal time is approx 2.5 minutes and on killing the job and restarting, it takes around 4.5 minutes or more. In the stage information I can see that mapToPair (in mapAndReduce function) is called thrice, for each 5 minute window. Is it correct to calculate these again if I already have checkpointed the previous rdds or I am missing something ? Also, do I need to checkpoint kafkaStreamRdd and advDataObjectsRdd when I am already checkpointing advDashboardAggKeyVsMetricRdd. Please let me know if any other information is required. I am using spark 1.4.0 JavaPairDStream kafkaStreamRdd = KafkaConnector.getKafkaStream(jsc); JavaPairDStream kafkaStream = null; if(CommandLineArguments.DO_REPARTITION_OF_RAW_STREAM_NB){ kafkaStream = kafkaStreamRdd.repartition(CommandLineArguments.FINAL_SPARK_PARTITIONS_OF_RAW_STREAM_NB); }else { kafkaStream = kafkaStreamRdd; } kafkaStreamRdd.checkpoint(new Duration(CommandLineArguments.SPARK_DURATION)); JavaPairDStream filteredDataObjectsRdd = FilterInvalidAdlog.kafkaStreamToAdLogMapper(kafkaStream); filteredDataObjectsRdd.checkpoint(new Duration(CommandLineArguments.SPARK_DURATION)); JavaDStream advDataObjectsRdd = AdvAggregation.kafkaStreamToAdLogMapper(filteredDataObjectsRdd); advDataObjectsRdd.checkpoint(new Duration(CommandLineArguments.SPARK_DURATION)); JavaPairDStream advDashboardAggKeyVsMetricRdd = AdvDashboardV1.mapAndReduce(advDataObjectsRdd); //mapToPair applied inside mapAndReduce advDashboardAggKeyVsMetricRdd.checkpoint(new Duration(CommandLineArguments.SPARK_DURATION)); JavaDStream advDashboardAggDataRdd = AdvDashboardV1.cassandraOutputRowMapper(advDashboardAggKeyVsMetricRdd, CommandLineArguments.SPARK_BATCH_DURATION_NB); Thanks !! Kundan
Kafka Offsets after application is restarted using Spark Streaming Checkpointing
Hi, I am using spark streaming check-pointing mechanism and reading the data from kafka. The window duration for my application is 2 hrs with a sliding interval of 15 minutes. So, my batches run at following intervals... 09:45 10:00 10:15 10:30 and so on Suppose, my running batch dies at 09:55 and I restart the application at 12:05, then the flow is something like At 12:05 it would run the 10:00 batch -> would this read the kafka offsets from the time it went down (or 9:45) to 12:00 ? or just upto 10:10 ? then next would 10:15 batch - what would be the offsets as input for this batch ? ...so on for all the queued batches Basically, my requirement is such that when the application is restarted at 12:05 then it should read the kafka offsets till 10:00 and then the next queued batch takes offsets from 10:00 to 10:15 and so on until all the queued batches are processed. If this is the way offsets are handled for all the queued batched and I am fine. Or else please provide suggestions on how this can be done. Thanks!!!
Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing
Hi Cody , Thanks for the clarification. I will try to come up with some workaround. I have an another doubt. When my job is restarted, and recovers from the checkpoint it does the re-partitioning step twice for each 15 minute job until the window of 2 hours is complete. Then the re-partitioning takes place only once. For eg - When the job recovers at 16:15 it does re-partitioning for the 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other intermediate stages are computed for 10:00 batch. I am using reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is complete i.e at 18:15 repartitioning takes place only once. Seems like the checkpoint does not have rdd stored for beyond 2 hrs which is my window duration. Because of this my job takes more time than usual. Is there a way or some configuration parameter which would help avoid repartitioning twice ? I am attaching the snapshot for the same. Thanks !! Kundan On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger wrote: > Unless you change maxRatePerPartition, a batch is going to contain all of > the offsets from the last known processed to the highest available. > > Offsets are not time-based, and Kafka's time-based api currently has very > poor granularity (it's based on filesystem timestamp of the log segment). > There's a kafka improvement proposal to add time-based indexing, but I > wouldn't expect it soon. > > Basically, if you want batches to relate to time even while your spark job > is down, you need an external process to index Kafka and do some custom > work to use that index to generate batches. > > Or (preferably) embed a time in your message, and do any time-based > calculations using that time, not time of processing. > > On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar > wrote: > >> Hi, >> >> I am using spark streaming check-pointing mechanism and reading the data >> from kafka. The window duration for my application is 2 hrs with a sliding >> interval of 15 minutes. >> >> So, my batches run at following intervals... >> 09:45 >> 10:00 >> 10:15 >> 10:30 and so on >> >> Suppose, my running batch dies at 09:55 and I restart the application at >> 12:05, then the flow is something like >> >> At 12:05 it would run the 10:00 batch -> would this read the kafka >> offsets from the time it went down (or 9:45) to 12:00 ? or just upto >> 10:10 ? >> then next would 10:15 batch - what would be the offsets as input for this >> batch ? ...so on for all the queued batches >> >> >> Basically, my requirement is such that when the application is restarted >> at 12:05 then it should read the kafka offsets till 10:00 and then the >> next queued batch takes offsets from 10:00 to 10:15 and so on until all the >> queued batches are processed. >> >> If this is the way offsets are handled for all the queued batched and I >> am fine. >> >> Or else please provide suggestions on how this can be done. >> >> >> >> Thanks!!! >> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing
Sure Thanks !! On Sun, Nov 15, 2015 at 9:13 PM, Cody Koeninger wrote: > Not sure on that, maybe someone else can chime in > > On Sat, Nov 14, 2015 at 4:51 AM, kundan kumar > wrote: > >> Hi Cody , >> >> Thanks for the clarification. I will try to come up with some workaround. >> >> I have an another doubt. When my job is restarted, and recovers from the >> checkpoint it does the re-partitioning step twice for each 15 minute job >> until the window of 2 hours is complete. Then the re-partitioning takes >> place only once. >> >> For eg - When the job recovers at 16:15 it does re-partitioning for the >> 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other >> intermediate stages are computed for 10:00 batch. I am using >> reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is >> complete i.e at 18:15 repartitioning takes place only once. Seems like the >> checkpoint does not have rdd stored for beyond 2 hrs which is my window >> duration. Because of this my job takes more time than usual. >> >> Is there a way or some configuration parameter which would help avoid >> repartitioning twice ? >> >> I am attaching the snapshot for the same. >> >> Thanks !! >> Kundan >> >> On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger >> wrote: >> >>> Unless you change maxRatePerPartition, a batch is going to contain all >>> of the offsets from the last known processed to the highest available. >>> >>> Offsets are not time-based, and Kafka's time-based api currently has >>> very poor granularity (it's based on filesystem timestamp of the log >>> segment). There's a kafka improvement proposal to add time-based indexing, >>> but I wouldn't expect it soon. >>> >>> Basically, if you want batches to relate to time even while your spark >>> job is down, you need an external process to index Kafka and do some custom >>> work to use that index to generate batches. >>> >>> Or (preferably) embed a time in your message, and do any time-based >>> calculations using that time, not time of processing. >>> >>> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar >>> wrote: >>> >>>> Hi, >>>> >>>> I am using spark streaming check-pointing mechanism and reading the >>>> data from kafka. The window duration for my application is 2 hrs with a >>>> sliding interval of 15 minutes. >>>> >>>> So, my batches run at following intervals... >>>> 09:45 >>>> 10:00 >>>> 10:15 >>>> 10:30 and so on >>>> >>>> Suppose, my running batch dies at 09:55 and I restart the application >>>> at 12:05, then the flow is something like >>>> >>>> At 12:05 it would run the 10:00 batch -> would this read the kafka >>>> offsets from the time it went down (or 9:45) to 12:00 ? or just upto >>>> 10:10 ? >>>> then next would 10:15 batch - what would be the offsets as input for >>>> this batch ? ...so on for all the queued batches >>>> >>>> >>>> Basically, my requirement is such that when the application is >>>> restarted at 12:05 then it should read the kafka offsets till 10:00 and >>>> then the next queued batch takes offsets from 10:00 to 10:15 and so on >>>> until all the queued batches are processed. >>>> >>>> If this is the way offsets are handled for all the queued batched and I >>>> am fine. >>>> >>>> Or else please provide suggestions on how this can be done. >>>> >>>> >>>> >>>> Thanks!!! >>>> >>>> >>> >> >
ReduceByKeyAndWindow does repartitioning twice on recovering from checkpoint
Hi, I am using spark streaming check-pointing mechanism and reading the data from Kafka. The window duration for my application is 2 hrs with a sliding interval of 15 minutes. So, my batches run at following intervals... - 09:45 - 10:00 - 10:15 - 10:30 - and so on When my job is restarted, and recovers from the checkpoint it does the re-partitioning step twice for each 15 minute job until the window of 2 hours is complete. Then the re-partitioning takes place only once. For example - when the job recovers at 16:15 it does re-partitioning for the 16:15 Kafka stream and the 14:15 Kafka stream as well. Also, all the other intermediate stages are computed for 16:15 batch. I am using reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is complete 18:15 onward re-partitioning takes place only once. Seems like the checkpoint does not have RDD stored for beyond 2 hrs which is my window duration. Because of this my job takes more time than usual. Is there a way or some configuration parameter which would help avoid repartitioning twice ? Attaching the snaps when repartitioning takes place twice after recovery from checkpoint. Thanks !! Kundan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.shuffle.FetchFailedException
I have set spark.sql.shuffle.partitions=1000 then also its failing. On Tue, Aug 25, 2015 at 11:36 AM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > Did you try increasing sql partitions? > > On Tue, Aug 25, 2015 at 11:06 AM, kundan kumar > wrote: > >> I am running this query on a data size of 4 billion rows and >> getting org.apache.spark.shuffle.FetchFailedException error. >> >> select adid,position,userid,price >> from ( >> select adid,position,userid,price, >> dense_rank() OVER (PARTITION BY adlocationid ORDER BY price DESC) as rank >> FROM trainInfo) as tmp >> WHERE rank <= 2 >> >> >> I have attached the error logs from spark-sql terminal. >> >> Please suggest what is the reason for these kind of errors and how can I >> resolve them. >> >> >> Regards, >> Kundan >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > >
Unable to get raw probabilities after clearing model threshold
Hi, I am unable to get the raw probabilities despite of clearing the threshold. Its still printing the predicted label. Can someone help resolve this issue. Here is the code snippet. LogisticRegressionWithSGD lrLearner = new LogisticRegressionWithSGD(); LogisticRegressionModel model = lrLearner.run(labeledPointTrain.rdd()); model.clearThreshold(); JavaRDD> predictionAndLabels = labeledPointTrain.map( new Function>() { public Tuple2 call(LabeledPoint p) { Double prediction = model.predict(p.features()); return new Tuple2(prediction, p.label()); } } ); predictionAndLabels.foreach(new VoidFunction>(){ @Override public void call(Tuple2 pred) throws Exception { logger.error("PREDICTION:" + pred._1() + " ACTUAL LABEL:" + pred._2()); } }); Thanks, Kundan
Re: Unable to get raw probabilities after clearing model threshold
Sorry, my bad. The issue got resolved. Thanks, Kundan On Mon, Sep 5, 2016 at 3:58 PM, kundan kumar wrote: > Hi, > > I am unable to get the raw probabilities despite of clearing the > threshold. Its still printing the predicted label. > > Can someone help resolve this issue. > > Here is the code snippet. > > LogisticRegressionWithSGD lrLearner = new LogisticRegressionWithSGD(); > LogisticRegressionModel model = lrLearner.run(labeledPointTrain.rdd()); > model.clearThreshold(); > JavaRDD> predictionAndLabels = > labeledPointTrain.map( > new Function>() { > public Tuple2 call(LabeledPoint p) { > Double prediction = model.predict(p.features()); > return new Tuple2(prediction, p.label()); > } > } > ); > > > predictionAndLabels.foreach(new VoidFunction>(){ > > @Override > public void call(Tuple2 pred) throws Exception { > logger.error("PREDICTION:" + pred._1() + " ACTUAL LABEL:" + pred._2()); > > } > }); > > > > Thanks, > Kundan >
Convert the feature vector to raw data
I am using Dataset result = model.transform(testData).select("probability", "label","features"); result.show(1000, false); In this case the feature vector is being printed as output. Is there a way that my original raw data gets printed instead of the feature vector OR is there a way to reverse extract my raw data from the feature vector. All of the features that my dataset have is categorical in nature. Thanks, Kundan
Re: Convert the feature vector to raw data
Hi Yan, This doesnt work. thanks, kundan On Wed, Jun 7, 2017 at 2:53 PM, 颜发才(Yan Facai) wrote: > Hi, kumar. > > How about removing the `select` in your code? > namely, > > Dataset result = model.transform(testData); > result.show(1000, false); > > > > > On Wed, Jun 7, 2017 at 5:00 PM, kundan kumar > wrote: > >> I am using >> >> Dataset result = model.transform(testData).select("probability", >> "label","features"); >> result.show(1000, false); >> >> In this case the feature vector is being printed as output. Is there a >> way that my original raw data gets printed instead of the feature vector OR >> is there a way to reverse extract my raw data from the feature vector. All >> of the features that my dataset have is categorical in nature. >> >> Thanks, >> Kundan >> > >
Output of select in non exponential form.
predictions.select("prediction", "label", "features").show(5) I have labels as line numbers but they are getting printed in exponential format. Is there a way to print it in normal double notation. Kundan
Error running multinomial regression on a dataset with a field having constant value
I am running the sample multinomial regression code given in spark docs (Version 2.2.0) LogisticRegression lr = new LogisticRegression().setMaxIter(100).setRegParam(0.3).setElasticNetParam(0.8); LogisticRegressionModel lrModel = lr.fit(training); But in the dataset I am adding a constant field where all the values are same. Now, I get an error saying 2018-03-11 15:42:58,835 [main] ERROR OWLQN - Failure! Resetting history: breeze.optimize.NaNHistory: 2018-03-11 15:42:58,922 [main] INFO OWLQN - Step Size: 1.000 2018-03-11 15:42:58,938 [main] INFO OWLQN - Val and Grad Norm: NaN (rel: NaN) NaN 2018-03-11 15:42:58,940 [main] INFO OWLQN - Converged because max iterations reached Without the constant field in the dataset everything works fine. Please help me understand what is the reason behind this error. When I run a binary logistic regression code it runs fine even if there are constant values in a field. Do I really need to get rod of constant field from my dataset while running multinomial regression. Is it a bug or this is expected ?? Thanks !! Kundan
summary for all columns (numeric, strings) in a dataset
Hi , Is there something like summary function in spark like that in "R". The summary calculation which comes with spark(MultivariateStatisticalSummary) operates only on numeric types. I am interested in getting the results for string types also like the first four max occuring strings(groupby kind of operation) , number of uniques etc. Is there any preexisting code for this ? If not what please suggest the best way to deal with string types. Thanks, Kundan
foreachActive functionality
Can someone help me to understand the usage of "foreachActive" function introduced for the Vectors. I am trying to understand its usage in MultivariateOnlineSummarizer class for summary statistics. sample.foreachActive { (index, value) => if (value != 0.0) { if (currMax(index) < value) { currMax(index) = value } if (currMin(index) > value) { currMin(index) = value } val prevMean = currMean(index) val diff = value - prevMean currMean(index) = prevMean + diff / (nnz(index) + 1.0) currM2n(index) += (value - currMean(index)) * diff currM2(index) += value * value currL1(index) += math.abs(value) nnz(index) += 1.0 } } Regards, Kundan
Index wise most frequently occuring element
I have a an array of the form val array: Array[(Int, (String, Int))] = Array( (idx1,(word1,count1)), (idx2,(word2,count2)), (idx1,(word1,count1)), (idx3,(word3,count1)), (idx4,(word4,count4))) I want to get the top 10 and bottom 10 elements from this array for each index (idx1,idx2,). Basically I want the top 10 most occuring and bottom 10 least occuring elements for each index value. Please suggest how to acheive in spark in most efficient way. I have tried it using the for loops for each index but this makes the program too slow and runs sequentially. Thanks, Kundan
Percentile Calculation
Is there any inbuilt function for calculating percentile over a dataset ? I want to calculate the percentiles for each column in my data. Regards, Kundan
WARN NativeCodeLoader warning in spark shell
Hi, Whenever I start spark shell I get this warning. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Whats the meaning of this and does/how can it impact the execution of my spark jobs ? Please suggest how can I fix this ? Thanks !! Kundan
Writing RDD to a csv file
I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line => (line._1 + "," + line._2._1.mkString(",") + "," + line._2._2.mkString(','))).saveAsTextFile("hdfs://...") Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
Re: Writing RDD to a csv file
Thanks Gerard !! This is working. On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas wrote: > this is more of a scala question, so probably next time you'd like to > address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala > > val optArrStr:Option[Array[String]] = ??? > optArrStr.map(arr => arr.mkString(",")).getOrElse("") // empty string or > whatever default value you have for this. > > kr, Gerard. > > On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar > wrote: > >> I have a RDD which is of type >> >> org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] >> >> I want to write it as a csv file. >> >> Please suggest how this can be done. >> >> myrdd.map(line => (line._1 + "," + line._2._1.mkString(",") + "," + >> line._2._2.mkString(','))).saveAsTextFile("hdfs://...") >> >> Doing mkString on line._2._1 works but does not work for the Option type. >> >> Please suggest how this can be done. >> >> >> Thanks >> Kundan >> >> >> >
Spark Job running on localhost on yarn cluster
Hi, I am trying to execute my code on a yarn cluster The command which I am using is $SPARK_HOME/bin/spark-submit --class "EDDApp" target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster --num-executors 3 --driver-memory 6g --executor-memory 7g But, I can see that this program is running only on the localhost. Its able to read the file from hdfs. I have tried this in standalone mode and it works fine. Please suggest where is it going wrong. Regards, Kundan
Re: Spark Job running on localhost on yarn cluster
The problem got resolved after removing all the configuration files from all the slave nodes. Earlier we were running in the standalone mode and that lead to duplicating the configuration on all the slaves. Once that was done it ran as expected in cluster mode. Although performance is not up to the standalone mode. However, as compared to the standalone mode, spark on yarn runs very slow. I am running it as $SPARK_HOME/bin/spark-submit --class "EDDApp" --master yarn-cluster --num-executors 10 --executor-memory 14g target/scala-2.10/edd-application_2.10-1.0.jar hdfs://hm41:9000/user/hduser/newtrans.csv hdfs://hm41:9000/user/hduser/trans-out We have a cluster of 5 nodes with each having 16GB RAM and 8 cores each. We have configured the minimum container size as 3GB and maximum as 14GB in yarn-site.xml. When submitting the job to yarn-cluster we supply number of executor = 10, memory of executor =14 GB. According to my understanding our job should be allocated 4 container of 14GB. But the spark UI shows only 3 container of 7.2GB each. We are unable to ensure the container number and resources allocated to it. This causes detrimental performance when compared to the standalone mode. Regards, Kundan On Thu, Feb 5, 2015 at 12:49 PM, Felix C wrote: > Is YARN_CONF_DIR set? > > --- Original Message --- > > From: "Aniket Bhatnagar" > Sent: February 4, 2015 6:16 AM > To: "kundan kumar" , "spark users" < > user@spark.apache.org> > Subject: Re: Spark Job running on localhost on yarn cluster > > Have you set master in SparkConf/SparkContext in your code? Driver logs > show in which mode the spark job is running. Double check if the logs > mention local or yarn-cluster. > Also, what's the error that you are getting? > > On Wed, Feb 4, 2015, 6:13 PM kundan kumar wrote: > > Hi, > > I am trying to execute my code on a yarn cluster > > The command which I am using is > > $SPARK_HOME/bin/spark-submit --class "EDDApp" > target/scala-2.10/edd-application_2.10-1.0.jar --master yarn-cluster > --num-executors 3 --driver-memory 6g --executor-memory 7g > > But, I can see that this program is running only on the localhost. > > Its able to read the file from hdfs. > > I have tried this in standalone mode and it works fine. > > Please suggest where is it going wrong. > > > Regards, > Kundan > >
Error while querying hive table from spark shell
Hi , I am getting the following error when I am trying query a hive table from spark shell. I have placed my hive-site.xml in the spark/conf directory. Please suggest how to resolve this error. scala> sqlContext.sql("select count(*) from offers_new").collect().foreach(println) 15/02/11 01:48:01 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/02/11 01:48:01 INFO parse.ParseDriver: Parsing command: select count(*) from offers_new 15/02/11 01:48:01 INFO parse.ParseDriver: Parse Completed 15/02/11 01:48:01 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/11 01:48:01 INFO metastore.ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table offers_new at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:984) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) at org.apache.
Unable to query hive tables from spark
I want to create/access the hive tables from spark. I have placed the hive-site.xml inside the spark/conf directory. Even though it creates a local metastore in the directory where I run the spark shell and exists with an error. I am getting this error when I try to create a new hive table. Even on querying a existing table error appears. sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") Please suggest what wrong I am doing and a way to resolve this. 15/02/12 10:35:58 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
Unable to run hive queries inside spark
Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala> hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: "@" (64), after : "". 15/02/25 10:31:09 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:18 INFO PerfLogger: 15/02/25 10:31:19 INFO SemanticAnalyzer: Starting Semantic Analysis 15/02/25 10:31:19 INFO SemanticAnalyzer: Creating table src position=27 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_table : db=default tbl=src 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_database: default 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_database: default 15/02/25 10:31:19 INFO Driver: Semantic Analysis Completed 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO Driver: Starting command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO PerfLogger: 15/02/25 10:31:19 INFO DDLTask: Default to LazySimpleSerDe for table src 15/02/25 10:31:19 INFO HiveMetaStore: 0: create_table: Table(tableName:src, dbName:default, owner:spuser, createTime:1424840479, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:Skewe
Re: Unable to run hive queries inside spark
Hi Denny, yes the user has all the rights to HDFS. I am running all the spark operations with this user. and my hive-site.xml looks like this hive.metastore.warehouse.dir /user/hive/warehouse location of default database for the warehouse Do I need to do anything explicitly other than placing hive-site.xml in the spark.conf directory ? Thanks !! On Wed, Feb 25, 2015 at 11:42 AM, Denny Lee wrote: > The error message you have is: > > FAILED: Execution Error, return code 1 from > org.apache.hadoop.hive.ql.exec.DDLTask. > MetaException(message:file:/user/hive/warehouse/src is not a directory or > unable to create one) > > Could you verify that you (the user you are running under) has the rights > to create the necessary folders within HDFS? > > > On Tue, Feb 24, 2015 at 9:06 PM kundan kumar > wrote: > >> Hi , >> >> I have placed my hive-site.xml inside spark/conf and i am trying to >> execute some hive queries given in the documentation. >> >> Can you please suggest what wrong am I doing here. >> >> >> >> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> hiveContext: org.apache.spark.sql.hive.HiveContext = >> org.apache.spark.sql.hive.HiveContext@3340a4b8 >> >> scala> hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value >> STRING)") >> warning: there were 1 deprecation warning(s); re-run with -deprecation >> for details >> 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT >> EXISTS src (key INT, value STRING) >> 15/02/25 10:30:59 INFO ParseDriver: Parse Completed >> 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with >> implemenation class:org.apache.hadoop.hive.metastore.ObjectStore >> 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called >> 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 >> unknown - will be ignored >> 15/02/25 10:30:59 INFO Persistence: Property >> hive.metastore.integral.jdo.pushdown unknown - will be ignored >> 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in >> CLASSPATH (or one of dependencies) >> 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes >> with >> hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" >> 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming >> we are not on mysql: Lexical error at line 1, column 5. Encountered: "@" >> (64), after : "". >> 15/02/25 10:31:09 INFO Datastore: The class >> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as >> "embedded-only" so does not have its own datastore table. >> 15/02/25 10:31:09 INFO Datastore: The class >> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as >> "embedded-only" so does not have its own datastore table. >> 15/02/25 10:31:15 INFO Datastore: The class >> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as >> "embedded-only" so does not have its own datastore table. >> 15/02/25 10:31:15 INFO Datastore: The class >> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as >> "embedded-only" so does not have its own datastore table. >> 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore >> 15/02/25 10:31:17 WARN ObjectStore: Version information not found in >> metastore. hive.metastore.schema.verification is not enabled so recording >> the schema version 0.13.1aa >> 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore >> 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore >> 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, >> since config is empty >> 15/02/25 10:31:18 INFO SessionState: No Tez session required at this >> point. hive.execution.engine=mr. >> 15/02/25 10:31:18 INFO PerfLogger: > from=org.apache.hadoop.hive.ql.Driver> >> 15/02/25 10:31:18 INFO PerfLogger: > from=org.apache.hadoop.hive.ql.Driver> >> 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating >> a lock manager >> 15/02/25 10:31:18 INFO PerfLogger: > from=org.apache.hadoop.hive.ql.Driver> >> 15/02/25 10:31:18 INFO PerfLogger: > from=org.apache.hadoop.hive.ql.Driver> >> 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT >> EXISTS src (key INT, value STRING
Handling Big data for interactive BI tools
Hi, I need to store terabytes of data which will be used for BI tools like qlikview. The queries can be on the basis of filter on any column. Currently, we are using redshift for this purpose. I am trying to explore things other than the redshift . Is it possible to gain better performance in spark as compared to redshift ? If yes, please suggest what is the best way to achieve this. Thanks!! Kundan
Re: Handling Big data for interactive BI tools
I looking for some options and came across http://www.jethrodata.com/ On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke wrote: > You can also preaggregate results for the queries by the user - depending > on what queries they use this might be necessary for any underlying > technology > Le 26 mars 2015 11:27, "kundan kumar" a écrit : > > Hi, >> >> I need to store terabytes of data which will be used for BI tools like >> qlikview. >> >> The queries can be on the basis of filter on any column. >> >> Currently, we are using redshift for this purpose. >> >> I am trying to explore things other than the redshift . >> >> Is it possible to gain better performance in spark as compared to >> redshift ? >> >> If yes, please suggest what is the best way to achieve this. >> >> >> Thanks!! >> Kundan >> >
Re: Handling Big data for interactive BI tools
I was looking for some options and came across JethroData. http://www.jethrodata.com/ This stores the data maintaining indexes over all the columns seems good and claims to have better performance than Impala. Earlier I had tried Apache Phoenix because of its secondary indexing feature. But the major challenge I faced there was, secondary indexing was not supported for bulk loading process. Only the sequential loading process supported the secondary indexes, which took longer time. Any comments on this ? On Thu, Mar 26, 2015 at 5:59 PM, kundan kumar wrote: > I looking for some options and came across > > http://www.jethrodata.com/ > > On Thu, Mar 26, 2015 at 5:47 PM, Jörn Franke wrote: > >> You can also preaggregate results for the queries by the user - depending >> on what queries they use this might be necessary for any underlying >> technology >> Le 26 mars 2015 11:27, "kundan kumar" a écrit : >> >> Hi, >>> >>> I need to store terabytes of data which will be used for BI tools like >>> qlikview. >>> >>> The queries can be on the basis of filter on any column. >>> >>> Currently, we are using redshift for this purpose. >>> >>> I am trying to explore things other than the redshift . >>> >>> Is it possible to gain better performance in spark as compared to >>> redshift ? >>> >>> If yes, please suggest what is the best way to achieve this. >>> >>> >>> Thanks!! >>> Kundan >>> >> >
Equal Height and Depth Binning in Spark
Hi, I am trying to implement equal depth and equal height binning methods in spark. Any insights, existing code for this would be really helpful. Thanks, Kundan