How to disable pushdown predicate in spark 2.x query
Hi All, I am trying to read a table of a relational database using spark 2.x. I am using code like the following: sparkContext.read().jdbc(url, table , connectionProperties).select('SELECT_COLUMN').where(whereClause); Now, What's happening is spark is actually the SQL query which spark is running against the relational db is : select column,(where_clause_columns) from table WHERE SELECT_COLUMN IS NOT NULL; And I guess it is doing filtering based on the where clause only after fetching all the data from DB where SELECT_COLUMN IS NOT NULL. I searched about it and found out this is because of pushdown predicate. Is there a way to load data into dataframe using specific query instead of this. I found a solution where if we provide actual query instead of the table name in the following code, it should run that query exactly: table = "select SELECT_COLUMN from table "+ whereClause; sparkContext.read().jdbc(url, table , connectionProperties).select('SELECT_COLUMN').where(whereClause); Does the above seem like a good solution? Regards, Mohit
How to split a dataframe into two dataframes based on count
Dear All, I would like to know how, in spark 2.0, can I split a dataframe into two dataframes when I know the exact counts the two dataframes should have. I tried using limit but got quite weird results. Also, I am looking for exact counts in child dfs, not the approximate % based split. *Following is what I have tried:* var dfParent = sc.read.parquet("somelocation");// let's say it has 4000 rows I want to split the parent into two dfs with the following counts: var dfChild1Count = 1000 var dfChild2Count = 3000 *I tried this: * var dfChild1 = dfParent.limit(dfChild1Count); var dfChild2 = dfParent.except(dfChild1); *and wrote that to output hdfs directories:* dfChild1.write.parquet("/outputfilechild1"); dfChild2.write.parquet("/outputfilechild2"); It turns out this results in some duplicates saved in files outputfilechild1 & outputfilechild2. Could anyone explain why they have duplicates? When I sorted my parent dataframe before limit, it then worked fine: *dfParent = dfParent.sortBy(col("unique_col").desc())* Seems like the limit on parent is executed twice and return different records each time. Not sure why it is executed twice when I mentioned only once. Also, Is there a better way to split a df into multiple dfs when we know exact counts of the child dfs? Regards, Mohit
getting error on spark streaming : java.lang.OutOfMemoryError: unable to create new native thread
Hi Everyone, I am getting the following error while running a spark streaming example on my local machine, the being ingested is only 506kb. *16/11/23 03:05:54 INFO MappedDStream: Slicing from 1479850537180 ms to 1479850537235 ms (aligned to 1479850537180 ms and 1479850537235 ms)* *Exception in thread "streaming-job-executor-0" java.lang.OutOfMemoryError: unable to create new native thread* I looked it up and found out that it could be related to ulimit, I even increased the ulimit to 1 but still the same error. Regards Mohit
Re: newbie question about RDD
Hi Raghav, Please refer to the following code: SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("PersonApp"); //creating java spark context JavaSparkContext sc = new JavaSparkContext(sparkConf); //reading file from hfs into spark rdd , the name node is localhost JavaRDD personStringRDD = sc.textFile("hdfs://localhost:9000/custom/inputPersonFile.txt"); //Converting from String RDD to Person RDD ...this is just an example, you can replace the parsing with a better exception handled code JavaRDD personObjectRDD = personStringRDD.map(personRow -> { String[] personValues = personRow.split("\t"); return new Person(Long.parseLong(personValues[0]), personValues[1], personValues[2], personValues[3]); }); //finally just printing the count of objects System.out.println("Person count = "+personObjectRDD.count()); Regards Mohit On Tue, Nov 22, 2016 at 11:17 AM, Raghav wrote: > Sorry I forgot to ask how can I use spark context here ? I have hdfs > directory path of the files, as well as the name node of hdfs cluster. > > Thanks for your help. > > On Mon, Nov 21, 2016 at 9:45 PM, Raghav wrote: > >> Hi >> >> I am extremely new to Spark. I have to read a file form HDFS, and get it >> in memory in RDD format. >> >> I have a Java class as follows: >> >> class Person { >> private long UUID; >> private String FirstName; >> private String LastName; >> private String zip; >> >>// public methods >> } >> >> The file in HDFS is as follows: >> >> UUID. FirstName LastName Zip >> 7462 John Doll06903 >> 5231 Brad Finley 32820 >> >> >> Can someone point me how to get a JavaRDD object by reading the >> file in HDFS ? >> >> Thanks. >> >> -- >> Raghav >> > > > > -- > Raghav >
Re: how do you convert directstream into data frames
Any idea anyone? On Fri, Aug 14, 2015 at 10:11 AM, Mohit Durgapal wrote: > Hi All, > > After creating a direct stream like below: > > val events = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) > > > I would like to convert the above stream into data frames, so that I could > run hive queries over it. Could anyone please explain how this can be > achieved? I am using spark version 1.3.0 >
how do you convert directstream into data frames
Hi All, After creating a direct stream like below: val events = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) I would like to convert the above stream into data frames, so that I could run hive queries over it. Could anyone please explain how this can be achieved? I am using spark version 1.3.0
spark-kafka directAPI vs receivers based API
Hi All, I just wanted to know how does directAPI for spark streaming compare with earlier receivers based API. Has anyone used directAPI based approach on production or is it still being used for pocs? Also, since I'm new to spark, could anyone share a starting point from where I could find a working code for both of the above APIs? Also, in my use case I want to analyse a data stream(comma separated string) & aggregate over certain fields based on their types. Ideally I would like to push that aggregated data to a column family based datastore(like HBase, we are using it currently). But my first I'd like to find out how to aggregate that data and how does streaming work, whether It polls & fetches data in batches or does it continuously listen to the kafka queue for any new message. And how can I configure my application for either cases. I hope my questions make sense. Regards Mohit
checking
Just wanted to know If my emails are reaching the user list. Regards Mohit
spark streaming from kafka real time + batch processing in java
I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its "hour" folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
spark streaming from kafka real time + batch processing in java
I want to write a spark streaming consumer for kafka in java. I want to process the data in real-time as well as store the data in hdfs in year/month/day/hour/ format. I am not sure how to achieve this. Should I write separate kafka consumers, one for writing data to HDFS and one for spark streaming? Also I would like to ask what do people generally do with the result of spark streams after aggregating over it? Is it okay to update a NoSQL DB with aggregated counts per batch interval or is it generally stored in hdfs? Is it possible to store the mini batch data from spark streaming to HDFS in a way that the data is aggregated hourly and put into HDFS in its "hour" folder. I would not want a lot of small files equal to the mini batches of spark per hour, that would be inefficient for running hadoop jobs later. Is anyone working on the same problem? Any help and comments would be great. Regards Mohit
connecting spark with ActiveMQ
Hi All, I have a requirement where I need to consume messages from ActiveMQ and do live stream processing as well as batch processing using Spark. Is there a spark-plugin or library that can enable this? If not, then do you know any other way this could be done? Regards Mohit