Parallel read parquet file, write to postgresql
Reading Spark doc (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). It's not mentioned how to parallel read parquet file with SparkSession. Would --num-executors just work? Any additional parameters needed to be added to SparkSession as well? Also if I want to parallel write data to database, would options 'numPartitions' and 'batchsize' enough to improve write performance? For example, mydf.format("jdbc"). option("driver", "org.postgresql.Driver"). option("url", url). option("dbtable", table_name). option("user", username). option("password", password). option("numPartitions", N) . option("batchsize", M) save From Spark website (https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#jdbc-to-other-databases), I only find these two parameters that would have impact on db write performance. I appreciate any suggestions.
Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]
By taking with your advice flatMap, now I can convert result from RDD[Iterable[MyCaseClass]] to RDD[MyCaseClass]. Basically just to perform flatMap in the end before starting to convert RDD object back to DF (i.e. SparkSession.createDataFrame(rddRecordsOfMyCaseClass)). For instance, df.map { ... }.filter{ ... }.flatMap { records => records.flatMap { record => Seq(record) } } Not smart code, but it works for my case. Thanks for the advice! ‐‐‐ Original Message ‐‐‐ On Saturday, December 1, 2018 12:17 PM, Chris Teoh wrote: > Hi James, > > Try flatMap (_.toList). See below example:- > > scala> case class MyClass(i:Int) > defined class MyClass > > scala> val r = 1 to 100 > r: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, > 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, > 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, > 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, > 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, > 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) > > scala> val r2 = 101 to 200 > r2: scala.collection.immutable.Range.Inclusive = Range(101, 102, 103, 104, > 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, > 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, > 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, > 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, > 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, > 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, > 195, 196, 197, 198, 199, 200) > > scala> val c1 = r.map(MyClass(_)).toIterable > c1: Iterable[MyClass] = Vector(MyClass(1), MyClass(2), MyClass(3), > MyClass(4), MyClass(5), MyClass(6), MyClass(7), MyClass(8), MyClass(9), > MyClass(10), MyClass(11), MyClass(12), MyClass(13), MyClass(14), MyClass(15), > MyClass(16), MyClass(17), MyClass(18), MyClass(19), MyClass(20), MyClass(21), > MyClass(22), MyClass(23), MyClass(24), MyClass(25), MyClass(26), MyClass(27), > MyClass(28), MyClass(29), MyClass(30), MyClass(31), MyClass(32), MyClass(33), > MyClass(34), MyClass(35), MyClass(36), MyClass(37), MyClass(38), MyClass(39), > MyClass(40), MyClass(41), MyClass(42), MyClass(43), MyClass(44), MyClass(45), > MyClass(46), MyClass(47), MyClass(48), MyClass(49), MyClass(50), MyClass(51), > MyClass(52), MyClass(53), MyClass(54), MyClass(55), MyClass(56), MyClass(57), > MyClass(58), MyClass(59), MyClass(... > > scala> val c2 = r2.map(MyClass(_)).toIterable > c2: Iterable[MyClass] = Vector(MyClass(101), MyClass(102), MyClass(103), > MyClass(104), MyClass(105), MyClass(106), MyClass(107), MyClass(108), > MyClass(109), MyClass(110), MyClass(111), MyClass(112), MyClass(113), > MyClass(114), MyClass(115), MyClass(116), MyClass(117), MyClass(118), > MyClass(119), MyClass(120), MyClass(121), MyClass(122), MyClass(123), > MyClass(124), MyClass(125), MyClass(126), MyClass(127), MyClass(128), > MyClass(129), MyClass(130), MyClass(131), MyClass(132), MyClass(133), > MyClass(134), MyClass(135), MyClass(136), MyClass(137), MyClass(138), > MyClass(139), MyClass(140), MyClass(141), MyClass(142), MyClass(143), > MyClass(144), MyClass(145), MyClass(146), MyClass(147), MyClass(148), > MyClass(149), MyClass(150), MyClass(151), MyClass(152), MyClass(153), > MyClass(154), MyClass(15... > scala> val rddIt = sc.parallelize(Seq(c1,c2)) > rddIt: org.apache.spark.rdd.RDD[Iterable[MyClass]] = ParallelCollectionRDD[2] > at parallelize at :28 > > scala> rddIt.flatMap(_.toList) > res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at > :26 > > res4 is what you're looking for. > > On Sat, 1 Dec 2018 at 21:09, Chris Teoh wrote: > >> Do you have the full code example? >> >> I think this would be similar to the mapPartitions code flow, something like >> flatMap( _ => _.toList ) >> >> I haven't yet tested this out but this is how I'd first try. >> >> On Sat, 1 Dec 2018 at 01:02, James Starks >> wrote: >> >>> When processing data, I create an instance of RDD[Iterable[MyCaseClass]] >>> and I want to convert it to RDD[MyCaseClass] so that it can be further >>> converted to dataset or dataframe with toDS() function. But I encounter a >>> problem that SparkContext can not be instantiated within SparkSession.map >>> function because it already exists, even with allowMultipleContexts set to >>> true. >>> >>> val sc = new SparkConf() >>> sc.set("spark.driver.allowMultipleContexts", "true") >>> new SparkContext(sc).parallelize(seq) >>> >>> How can I fix this? >>> >>> Thanks. >> >> -- >> Chris > > -- > Chris
Re: Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend
Shadowed with object MyObject { def mymethod(param: MyParam) = actual_function(param) } class MyObject { import MyObject._ session.map { ... => mymethod(...) } } does the job. Thanks for the advice! ‐‐‐ Original Message ‐‐‐ On Friday, November 30, 2018 9:26 AM, wrote: > If it’s just a couple of classes and they are actually suitable for > serializing and you have the source code then you can shadow them in your own > project with the serializable interface added. Your shadowed classes should > be on the classpath before the library’s versions which should lead to spark > being able to use the serializable versions. > > That’s very much a last resort though! > > Chris > > On 30 Nov 2018, at 05:08, Koert Kuipers wrote: > >> if you only use it in the executors sometimes using lazy works >> >> On Thu, Nov 29, 2018 at 9:45 AM James Starks >> wrote: >> >>> This is not problem directly caused by Spark, but it's related; thus asking >>> here. I use spark to read data from parquet and processing some http call >>> with sttp (https://github.com/softwaremill/sttp). However, spark throws >>> >>> Caused by: java.io.NotSerializableException: >>> com.softwaremill.sttp.FollowRedirectsBackend >>> >>> It's understood why such exception is thrown because >>> FollowRedirectsBackend is not seralizable. So I would like know in such >>> case - are there any ways to get around this problem without modifying, >>> recompiling original code? >>> >>> Thanks
Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]
When processing data, I create an instance of RDD[Iterable[MyCaseClass]] and I want to convert it to RDD[MyCaseClass] so that it can be further converted to dataset or dataframe with toDS() function. But I encounter a problem that SparkContext can not be instantiated within SparkSession.map function because it already exists, even with allowMultipleContexts set to true. val sc = new SparkConf() sc.set("spark.driver.allowMultipleContexts", "true") new SparkContext(sc).parallelize(seq) How can I fix this? Thanks.
Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend
This is not problem directly caused by Spark, but it's related; thus asking here. I use spark to read data from parquet and processing some http call with sttp (https://github.com/softwaremill/sttp). However, spark throws Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend It's understood why such exception is thrown because FollowRedirectsBackend is not seralizable. So I would like know in such case - are there any ways to get around this problem without modifying, recompiling original code? Thanks
Re: Spark job's driver programe consums too much memory
Yes I think I am confused because originally my thought was that executor only requires 10g then driver ideally do not need to consume more than 10g or at least not more than 20g. But this is not the case. My configuration is setting --dervier-memory to 25g and --executor-memory 10g. And my program basically only uses `filter`, `map`, `write.mode().parquet` as below (main logic) val df = spark.read.format("jdbc")...option("dbtable", "select * from mytable where filedX <> ''")...load() /* sql returns around 8MM records. */ df.createOrReplaceTempView("newtable") val newdf = spark.sql("select field1, ..., filedN from newtable" /* around 50 fields */).as[MyCaseClass].filter {...}.map { ... }.filter { ... } newdf.wrie.mode(...).parquet(...) So I don't understand why driver program need such huge memory? And I don't find related doc explaining this, either spark website or through google (perhaps I miss it by using wrong keyword). Any places that may contain pointer to this? I appreciate your help. ‐‐‐ Original Message ‐‐‐ On 7 September 2018 4:46 PM, Apostolos N. Papadopoulos wrote: > You are putting all together and this does not make sense. Writing data > to HDFS does not require that all data should be transfered back to the > driver and THEN saved to HDFS. > > This would be a disaster and it would never scale. I suggest to check > the documentation more carefully because I believe you are a bit confused. > > regards, > > Apostolos > > On 07/09/2018 05:39 μμ, James Starks wrote: > > > Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking > > doc shows that my spark doesn't use those actions functions. But save > > functions looks resembling the function > > df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my > > spark job uses. Therefore I am thinking maybe that's the reason why my > > spark job driver consumes such amount of memory. > > https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions > > My spark job's driver program consumes too much memory, so I want to > > prevent that by writing data to hdfs at the executor side, instead of > > waiting those data to be sent back to the driver program (then writing to > > hdfs). This is because our worker servers have bigger memory size than the > > one that runs driver program. If I can write data to hdfs at executor, then > > the driver memory for my spark job can be reduced. > > Otherwise does Spark support streaming read from database (i.e. spark > > streaming + spark sql)? > > Thanks for your reply. > > ‐‐‐ Original Message ‐‐‐ > > On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos papad...@csd.auth.gr > > wrote: > > > > > Dear James, > > > > > > - check the Spark documentation to see the actions that return a lot of > > > data back to the driver. One of these actions is collect(). However, > > > take(x) is an action, also reduce() is an action. > > > Before executing collect() find out what is the size of your RDD/DF. > > > > > > - I cannot understand the phrase "hdfs directly from the executor". You > > > can specify an hdfs file as your input and also you can use hdfs to > > > store your output. > > > regards, > > > Apostolos > > > On 07/09/2018 05:04 μμ, James Starks wrote: > > > > > > > > > > I have a Spark job that read data from database. By increasing submit > > > > parameter '--driver-memory 25g' the job can works without a problem > > > > locally but not in prod env because prod master do not have enough > > > > capacity. > > > > So I have a few questions: > > > > - What functions such as collecct() would cause the data to be sent > > > > back to the driver program? > > > > My job so far merely uses `as`, `filter`, `map`, and `filter`. > > > > > > > > - Is it possible to write data (in parquet format for instance) to > > > > hdfs directly from the executor? If so how can I do (any code > > > > snippet, > > > > doc for reference, or what keyword to search cause can't find by > > > > e.g. > > > > `spark direct executor hdfs write`)? > > > > > > > > > > > > Thanks > > > > -- > > > > > > Apostolos N. Papadopoulos, Associate Professor > > > Department of Informatics > > > Aristotle University of Thessaloniki > > > Thessaloniki, GREECE > > > tel: ++0030312310991918 > > > email: papad...@csd.auth.gr > > > twitter: @papadopoulos_ap > > > web: http://datalab.csd.auth.gr/~apostol > > > > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- > > Apostolos N. Papadopoulos, Associate Professor > Department of Informatics > Aristotle University of Thessaloniki > Thessaloniki, GREECE > tel: ++0030312310991918 > email: papad...@csd.auth.gr > twitter: @papadopoulos_ap > web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark job's driver programe consums too much memory
Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that my spark doesn't use those actions functions. But save functions looks resembling the function df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount of memory. https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions My spark job's driver program consumes too much memory, so I want to prevent that by writing data to hdfs at the executor side, instead of waiting those data to be sent back to the driver program (then writing to hdfs). This is because our worker servers have bigger memory size than the one that runs driver program. If I can write data to hdfs at executor, then the driver memory for my spark job can be reduced. Otherwise does Spark support streaming read from database (i.e. spark streaming + spark sql)? Thanks for your reply. ‐‐‐ Original Message ‐‐‐ On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos wrote: > Dear James, > > - check the Spark documentation to see the actions that return a lot of > data back to the driver. One of these actions is collect(). However, > take(x) is an action, also reduce() is an action. > > Before executing collect() find out what is the size of your RDD/DF. > > - I cannot understand the phrase "hdfs directly from the executor". You > can specify an hdfs file as your input and also you can use hdfs to > store your output. > > regards, > > Apostolos > > On 07/09/2018 05:04 μμ, James Starks wrote: > > > > I have a Spark job that read data from database. By increasing submit > > parameter '--driver-memory 25g' the job can works without a problem > > locally but not in prod env because prod master do not have enough > > capacity. > > So I have a few questions: > > - What functions such as collecct() would cause the data to be sent > > back to the driver program? > > My job so far merely uses `as`, `filter`, `map`, and `filter`. > > > > - Is it possible to write data (in parquet format for instance) to > > hdfs directly from the executor? If so how can I do (any code snippet, > > doc for reference, or what keyword to search cause can't find by e.g. > > `spark direct executor hdfs write`)? > > > > > > Thanks > > -- > > Apostolos N. Papadopoulos, Associate Professor > Department of Informatics > Aristotle University of Thessaloniki > Thessaloniki, GREECE > tel: ++0030312310991918 > email: papad...@csd.auth.gr > twitter: @papadopoulos_ap > web: http://datalab.csd.auth.gr/~apostol > > > --- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark job's driver programe consums too much memory
I have a Spark job that read data from database. By increasing submit parameter '--driver-memory 25g' the job can works without a problem locally but not in prod env because prod master do not have enough capacity. So I have a few questions: - What functions such as collecct() would cause the data to be sent back to the driver program? My job so far merely uses `as`, `filter`, `map`, and `filter`. - Is it possible to write data (in parquet format for instance) to hdfs directly from the executor? If so how can I do (any code snippet, doc for reference, or what keyword to search cause can't find by e.g. `spark direct executor hdfs write`)? Thanks
Re: [External Sender] How to debug Spark job
Got the root cause eventually as it throws java.lang.OutOfMemoryError: Java heap space. Increasing --driver-memory temporarily fixes the problem. Thanks. ‐‐‐ Original Message ‐‐‐ On 7 September 2018 12:32 PM, Femi Anthony wrote: > One way I would go about this would be to try running a new_df.show(numcols, > truncate=False) on a few columns before you try writing to parquet to force > computation of newdf and see whether the hanging is occurring at that point > or during the write. You may also try doing a newdf.count() as well. > > Femi > > On Fri, Sep 7, 2018 at 5:48 AM James Starks > wrote: > >> I have a Spark job that reads from a postgresql (v9.5) table, and write >> result to parquet. The code flow is not complicated, basically >> >> case class MyCaseClass(field1: String, field2: String) >> val df = spark.read.format("jdbc")...load() >> df.createOrReplaceTempView(...) >> val newdf = spark.sql("seslect field1, field2 from >> mytable").as[MyCaseClass].map { row => >> val fieldX = ... // extract something from field2 >> (field1, fileldX) >> }.filter { ... /* filter out field 3 that's not valid */ } >> newdf.write.mode(...).parquet(destPath) >> >> This job worked correct without a problem. But it's doesn't look working ok >> (the job looks like hanged) when adding more fields. The refactored job >> looks as below >> ... >> val newdf = spark.sql("seslect field1, field2, ... fieldN from >> mytable").as[MyCaseClassWithMoreFields].map { row => >> ... >> NewCaseClassWithMoreFields(...) // all fields plus fieldX >> }.filter { ... } >> newdf.write.mode(...).parquet(destPath) >> >> Basically what the job does is extracting some info from one of a field in >> db table, appends that newly extracted field to the original row, and then >> dumps the whole new table to parquet. >> >> new filed + (original field1 + ... + original fieldN) >> ... >> ... >> >> Records loaded by spark sql to spark job (before refactored) are around 8MM, >> this remains the same, but when the refactored spark runs, it looks hanging >> there without progress. The only output on the console is (there is no >> crash, no exceptions thrown) >> >> WARN HeartbeatReceiver:66 - Removing executor driver with no recent >> heartbeats: 137128 ms exceeds timeout 12 ms >> >> Memory in top command looks like >> >> VIRT RES SHR%CPU %MEM >> 15.866g 8.001g 41.4m 740.3 25.6 >> >> The command used to submit spark job is >> >> spark-submit --class ... --master local[*] --driver-memory 10g >> --executor-memory 10g ... --files ... --driver-class-path ... ... >> >> How can I debug or check which part of my code might cause the problem (so I >> can improve it)? >> >> Thanks > > --- > > The information contained in this e-mail is confidential and/or proprietary > to Capital One and/or its affiliates and may only be used solely in > performance of work or services for Capital One. The information transmitted > herewith is intended only for use by the individual or entity to which it is > addressed. If the reader of this message is not the intended recipient, you > are hereby notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action in reliance > upon this information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the material > from your computer.
How to debug Spark job
I have a Spark job that reads from a postgresql (v9.5) table, and write result to parquet. The code flow is not complicated, basically case class MyCaseClass(field1: String, field2: String) val df = spark.read.format("jdbc")...load() df.createOrReplaceTempView(...) val newdf = spark.sql("seslect field1, field2 from mytable").as[MyCaseClass].map { row => val fieldX = ... // extract something from field2 (field1, fileldX) }.filter { ... /* filter out field 3 that's not valid */ } newdf.write.mode(...).parquet(destPath) This job worked correct without a problem. But it's doesn't look working ok (the job looks like hanged) when adding more fields. The refactored job looks as below ... val newdf = spark.sql("seslect field1, field2, ... fieldN from mytable").as[MyCaseClassWithMoreFields].map { row => ... NewCaseClassWithMoreFields(...) // all fields plus fieldX }.filter { ... } newdf.write.mode(...).parquet(destPath) Basically what the job does is extracting some info from one of a field in db table, appends that newly extracted field to the original row, and then dumps the whole new table to parquet. new filed + (original field1 + ... + original fieldN) ... ... Records loaded by spark sql to spark job (before refactored) are around 8MM, this remains the same, but when the refactored spark runs, it looks hanging there without progress. The only output on the console is (there is no crash, no exceptions thrown) WARN HeartbeatReceiver:66 - Removing executor driver with no recent heartbeats: 137128 ms exceeds timeout 12 ms Memory in top command looks like VIRT RES SHR%CPU %MEM 15.866g 8.001g 41.4m 740.3 25.6 The command used to submit spark job is spark-submit --class ... --master local[*] --driver-memory 10g --executor-memory 10g ... --files ... --driver-class-path ... ... How can I debug or check which part of my code might cause the problem (so I can improve it)? Thanks
Re: Pass config file through spark-submit
Accidentally to get it working, though don't thoroughly understand why (So far as I know, it's to configure in allowing executor refers to the conf file after copying to executors' working dir). Basically it's a combination of parameters --conf, --files, and --driver-class-path, instead of any single parameter. spark-submit --class pkg.to.MyApp --master local[*] --conf "spark.executor.extraClassPath=-Dconfig.file=" --files --driver-class-path "" --conf requires to pass the conf file name e.g. myfile.conf along with spark executor class path as directive. --files passes the conf file associated from the context root e.g. executing under dir , under which it contains folders such as conf, logs, work and so on. The conf file i.e. myfile.conf is located under conf folder. --driver-class-path points to the conf directory with absolute path. ‐‐‐ Original Message ‐‐‐ On August 17, 2018 3:00 AM, yujhe.li wrote: > So can you read the file on executor side? > I think the file passed by --files my.app.conf would be added under > classpath, and you can use it directly. > > > > > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Pass config file through spark-submit
I have a config file that exploits type safe config library located on the local file system, and want to submit that file through spark-submit so that spark program can read customized parameters. For instance, my.app { db { host = domain.cc port = 1234 db = dbname user = myuser passwd = mypass } } Spark submit code looks like spark-submit --class "my.app.Sample" --master local[*] --conf "spark.executor.extraJavaOptions=-Dconfig.file=/path/to/conf/myapp.conf" /path/to/my-app.jar But the program can not read the parameters such as db, user, host, and so on in my conf file. Passing with --files /path/to/myapp.conf doesn't work either. What is the correct way to submit that kind of conf file so that my spark job can read customized parameters from there? Thanks
Data source jdbc does not support streamed reading
Now my spark job can perform sql operations against database table. Next I want to combine that with streaming context, so switching to readStream() function. But after job submission, spark throws Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading That looks like sparkSession.readSteam.format("jdbc")... jdbc doesn't support streaming val sparkSession = SparkSession.builder().appName("my-test").getOrCreate() import session.implicits._ val df = sparkSession.readStream.format("jdbc")...load() // other operations against df Checking the example - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala Also searching on the internet, I don't see any examples that close to my need. Any pointers or docs that may talk about this or code snippet that may illustrate such purpose? Thanks
Re: Newbie question on how to extract column value
Because of some legacy issues I can't immediately upgrade spark version. But I try filter data before loading it into spark based on the suggestion by val df = sparkSession.read.format("jdbc").option(...).option("dbtable", "(select .. from ... where url <> '') table_name")load() df.createOrReplaceTempView("new_table") Then perform custom operation do the trick. sparkSession.sql("select id, url from new_table").as[(String, String)].map { case (id, url) => val derived_data = ... // operation on url (id, derived_data) }.show() Thanks for the advice, it's really helpful! ‐‐‐ Original Message ‐‐‐ On August 7, 2018 5:33 PM, Gourav Sengupta wrote: > Hi James, > > It is always advisable to use the latest SPARK version. That said, can you > please giving a try to dataframes and udf if possible. I think, that would be > a much scalable way to address the issue. > > Also in case possible, it is always advisable to use the filter option before > fetching the data to Spark. > > Thanks and Regards, > Gourav > > On Tue, Aug 7, 2018 at 4:09 PM, James Starks > wrote: > >> I am very new to Spark. Just successfully setup Spark SQL connecting to >> postgresql database, and am able to display table with code >> >> sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show() >> >> Now I want to perform filter and map function on col_b value. In plain scala >> it would be something like >> >> Seq((1, "http://a.com/a";), (2, "http://b.com/b";), (3, "unknown")).filter >> { case (_, url) => isValid(url) }.map { case (id, url) => (id, pathOf(url)) >> } >> >> where filter will remove invalid url, and then map (id, url) to (id, path of >> url). >> >> However, when applying this concept to spark sql with code snippet >> >> sparkSession.sql("...").filter(isValid($"url")) >> >> Compiler complains type mismatch because $"url" is ColumnName type. How can >> I extract column value i.e. http://... for the column url in order to >> perform filter function? >> >> Thanks >> >> Java 1.8.0 >> Scala 2.11.8 >> Spark 2.1.0
Newbie question on how to extract column value
I am very new to Spark. Just successfully setup Spark SQL connecting to postgresql database, and am able to display table with code sparkSession.sql("SELECT id, url from table_a where col_b <> '' ").show() Now I want to perform filter and map function on col_b value. In plain scala it would be something like Seq((1, "http://a.com/a";), (2, "http://b.com/b";), (3, "unknown")).filter { case (_, url) => isValid(url) }.map { case (id, url) => (id, pathOf(url)) } where filter will remove invalid url, and then map (id, url) to (id, path of url). However, when applying this concept to spark sql with code snippet sparkSession.sql("...").filter(isValid($"url")) Compiler complains type mismatch because $"url" is ColumnName type. How can I extract column value i.e. http://... for the column url in order to perform filter function? Thanks Java 1.8.0 Scala 2.11.8 Spark 2.1.0