Re: Joins in Spark
My suspect is your input file partitions are small. Hence small number of tasks are started. Can you provide some more details like how you load the files and how the result size is around 500GBs ? Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Mar 17, 2016 at 12:12 PM, Stuti Awasthi wrote: > Hi All, > > > > I have to join 2 files both not very big say few MBs only but the result > can be huge say generating 500GBs to TBs of data. Now I have tried using > spark Join() function but Im noticing that join is executing on only 1 or 2 > nodes at the max. Since I have a cluster size of 5 nodes , I tried to pass “ > join(*otherDataset*, [*numTasks*])” as numTasks=10 but again what I > noticed that all the 9 tasks are finished instantly and only 1 executor is > processing all the data. > > > > I searched on internet and got that we can use Broadcast variable to send > data from 1 file to all nodes and then use map function to do the join. In > this way I should be able to run multiple task on different executors. > > Now my question is , since Spark is providing the Join functionality, I > have assumed that it will handle the data parallelism automatically. Now is > Spark provide some functionality which I can directly use for join rather > than implementing Mapside join using Broadcast on my own or any other > better way is also welcome. > > > > I assume that this might be very common problem for all and looking out > for suggestions. > > > > Thanks &Regards > > Stuti Awasthi > > > > > > ::DISCLAIMER:: > > > > The contents of this e-mail and any attachment(s) are confidential and > intended for the named recipient(s) only. > E-mail transmission is not guaranteed to be secure or error-free as > information could be intercepted, corrupted, > lost, destroyed, arrive late or incomplete, or may contain viruses in > transmission. The e mail and its contents > (with or without referred errors) shall therefore not attach any liability > on the originator or HCL or its affiliates. > Views or opinions, if any, presented in this email are solely those of the > author and may not necessarily reflect the > views or opinions of HCL or its affiliates. Any form of reproduction, > dissemination, copying, disclosure, modification, > distribution and / or publication of this message without the prior > written consent of authorized representative of > HCL is strictly prohibited. If you have received this email in error > please delete it and notify the sender immediately. > Before opening any email and/or attachments, please check them for viruses > and other defects. > > > >
Re: sliding Top N window
Hi Alexy, We are also trying to solve similar problems using approximation. Would like to hear more about your usage. We can discuss this offline without boring others. :) Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Tue, Mar 22, 2016 at 1:19 AM, Lars Albertsson wrote: > Hi, > > If you can accept approximate top N results, there is a neat solution > for this problem: Use an approximate Map structure called > Count-Min Sketch, in combination with a list of the M top items, where > M > N. When you encounter an item not in the top M, you look up its > count in the Count-Min Sketch do determine whether it qualifies. > > You will need to break down your event stream into time windows with a > certain time unit, e.g. minutes or hours, and keep one Count-Min > Sketch for each unit. The CMSs can be added, so you aggregate them to > form your sliding windows. You also keep a top M (aka "heavy hitters") > list for each window. > > The data structures required are surprisingly small, and will likely > fit in memory on a single machine, if it can handle the traffic > volume, so you might not need Spark at all. If you choose to use Spark > in order to benefit from windowing, be aware that Spark lumps events > in micro batches based on processing time, not event time. > > I made a presentation on approximate counting a couple of years ago. > Slides and video here: > > http://www.slideshare.net/lallea/scalable-real-time-processing-techniques-39990105 > . > You can also search for presentation by Ted Dunning and Mikio Braun, > who have held good presentations on the subject. > > There are AFAIK two open source implementations of Count-Min Sketch, > one of them in Algebird. > > Let me know if anything is unclear. > > Good luck, and let us know how it goes. > > Regards, > > > > Lars Albertsson > Data engineering consultant > www.mapflat.com > +46 70 7687109 > > > On Fri, Mar 11, 2016 at 9:09 PM, Yakubovich, Alexey > wrote: > > Good day, > > > > I have a following task: a stream of “page vies” coming to kafka topic. > Each > > view contains list of product Ids from a visited page. The task: to have > in > > “real time” Top N product. > > > > I am interested in some solution that would require minimum intermediate > > writes … So need to build a sliding window for top N product, where the > > product counters dynamically changes and window should present the TOP > > product for the specified period of time. > > > > I believe there is no way to avoid maintaining all product counters > counters > > in memory/storage. But at least I would like to do all logic, all > > calculation on a fly, in memory, not spilling multiple RDD from memory to > > disk. > > > > So I believe I see one way of doing it: > >Take, msg from kafka take and line up, all elementary action > (increase by > > 1 the counter for the product PID ) > > Each action will be implemented as a call to HTable.increment() // or > > easier, with incrementColumnValue()… > > After each increment I can apply my own operation “offer” would provide > > that only top N products with counters are kept in another Hbase table > (also > > with atomic operations). > > But there is another stream of events: decreasing product counters when > > view expires the legth of sliding window…. > > > > So my question: does anybody know/have and can share the piece code/ know > > how: how to implement “sliding Top N window” better. > > If nothing will be offered, I will share what I will do myself. > > > > Thank you > > Alexey > > This message, including any attachments, is the property of Sears > Holdings > > Corporation and/or one of its subsidiaries. It is confidential and may > > contain proprietary or legally privileged information. If you are not the > > intended recipient, please delete it without reading the contents. Thank > > you. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark SQL Optimization
What we have observed so far is Spark picks join order in the same order as tables in from clause is specified. Sometimes reordering benefits the join query. This can be an inbuilt optimization in Spark. But again its not going to be straight forward, where rather than table size, selectivity of Join is important. Probably some kind of heuristic might help. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Mon, Mar 21, 2016 at 11:18 PM, gtinside wrote: > More details : > > Execution plan for Original query > select distinct pge.portfolio_code > from table1 pge join table2 p > on p.perm_group = pge.anc_port_group > join table3 uge > on p.user_group=uge.anc_user_group > where uge.user_name = 'user' and p.perm_type = 'TEST' > > == Physical Plan == > TungstenAggregate(key=[portfolio_code#14119], functions=[], > output=[portfolio_code#14119]) > TungstenExchange hashpartitioning(portfolio_code#14119) > TungstenAggregate(key=[portfolio_code#14119], functions=[], > output=[portfolio_code#14119]) >TungstenProject [portfolio_code#14119] > BroadcastHashJoin [user_group#13665], [anc_user_group#13658], > BuildRight > TungstenProject [portfolio_code#14119,user_group#13665] > BroadcastHashJoin [anc_port_group#14117], [perm_group#13667], > BuildRight >ConvertToUnsafe > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117] >ConvertToUnsafe > Project [user_group#13665,perm_group#13667] > Filter (perm_type#13666 = TEST) > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666] > ConvertToUnsafe > Project [anc_user_group#13658] >Filter (user_name#13659 = user) > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659] > > > > Execution plan for optimized query > select distinct pge.portfolio_code > from table1 uge, table2 p, table3 pge > where uge.user_name = 'user' and p.perm_type = 'TEST' > and p.perm_group = pge.anc_port_group > and p.user_group=uge.anc_user_group > > == Physical Plan == > TungstenAggregate(key=[portfolio_code#14119], functions=[], > output=[portfolio_code#14119]) > TungstenExchange hashpartitioning(portfolio_code#14119) > TungstenAggregate(key=[portfolio_code#14119], functions=[], > output=[portfolio_code#14119]) >TungstenProject [portfolio_code#14119] > BroadcastHashJoin [perm_group#13667], [anc_port_group#14117], > BuildRight > TungstenProject [perm_group#13667] > BroadcastHashJoin [anc_user_group#13658], [user_group#13665], > BuildRight >ConvertToUnsafe > Project [anc_user_group#13658] > Filter (user_name#13659 = user) > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659] >ConvertToUnsafe > Project [perm_group#13667,user_group#13665] > Filter (perm_type#13666 = TEST) > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666] > ConvertToUnsafe > Scan > > ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117] > > > > > > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: About nested RDD
rdd.count() is a fairly straightforward operations which can be calculated on a driver and then the value can be included in the map function. Is your goal is to write a generic function which operates on two rdds, one rdd being evaluated for each partition of the other ? Here also you can use broadcast , if one of your RDD is small enough. If both the RDDs are fairly big, I would like to understand your use case better. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Fri, Apr 8, 2016 at 1:52 PM, Holden Karau wrote: > It seems like the union function on RDDs might be what you are looking > for, or was there something else you were trying to achieve? > > > On Thursday, April 7, 2016, Tenghuan He wrote: > >> Hi all, >> >> I know that nested RDDs are not possible like linke rdd1.map(x => x + >> rdd2.count()) >> I tried to create a custome RDD like following >> >> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] { >> >> var rdds = new ArrayBuffer.empty[RDD[(K, (V, Int))]] >> def update(rdd: RDD[_]) { >> udds += rdd >> } >> def comput ... >> def getPartitions ... >> } >> >> In the compute method I call the internal rdds' iterators and got >> NullPointerException >> Is this also a form of nested RDDs and how do I get rid of this? >> >> Thanks. >> >> >> Tenghuan >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > >
Re: About nested RDD
As mentioned earlier you can create a broadcast variable containing all the small RDD elements. I hope they are really small. Then you can fire A.updatae(broadcastVariable). Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Fri, Apr 8, 2016 at 2:33 PM, Tenghuan He wrote: > Hi > > Thanks for your reply. > Yes, It's very much like the union() method, but there is some difference. > > I have a very large RDD A, and a lot of small RDDs b, c, d and so on. > and A.update(a) will update some element in the A and return a new RDD > > when calling > val B = A.update(b).update(c).update(d).update(). > B.count() > > The count action will call the compute method. > and each update will iterating the large rdd A. > To avoid this I can merge these small rdds first to rdds then call > A.update(rdds) > But I don't hope to do this merge manually outside but inside RDD A > automatically > > I hope I made it clear. > > > On Fri, Apr 8, 2016 at 4:22 PM, Holden Karau wrote: > >> It seems like the union function on RDDs might be what you are looking >> for, or was there something else you were trying to achieve? >> >> >> On Thursday, April 7, 2016, Tenghuan He wrote: >> >>> Hi all, >>> >>> I know that nested RDDs are not possible like linke rdd1.map(x => x + >>> rdd2.count()) >>> I tried to create a custome RDD like following >>> >>> class MyRDD(base: RDD, part: Partitioner) extends RDD[(K, V)] { >>> >>> var rdds = new ArrayBuffer.empty[RDD[(K, (V, Int))]] >>> def update(rdd: RDD[_]) { >>> udds += rdd >>> } >>> def comput ... >>> def getPartitions ... >>> } >>> >>> In the compute method I call the internal rdds' iterators and got >>> NullPointerException >>> Is this also a form of nested RDDs and how do I get rid of this? >>> >>> Thanks. >>> >>> >>> Tenghuan >>> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> >> >
Re: Spark Streaming share state between two streams
Hi Shekhar, As both of your state functions does the same thing can't you do a union of dtsreams before applying mapWithState() ? It might be difficult if one state function is dependent on other state. This requires a named state, which can be accessed in other state functions. I have not gone through the details but the PR (https://github.com/apache/spark/pull/11645) from Tathagat seems to be in that direction . Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Fri, Apr 8, 2016 at 3:53 PM, Shekhar Bansal < shekhar0...@yahoo.com.invalid> wrote: > Hi > Can we share spark streaming state between two DStreams?? > Basically I want to create state using first stream and enrich second > stream using state. > Example: I have modified StatefulNetworkWordCount example. I am creating > state using first stream and enriching second stream with count of first > stream. > > val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", > 1))) > > > val mappingFuncForFirstStream = (batchTime: Time, word: String, one: > Option[Int], state: State[Int]) => { > val sum = one.getOrElse(0) + state.getOption.getOrElse(0) > val output = (word, sum) > state.update(sum) > > Some(output) > } > > val mappingFuncForSecondStream = (batchTime: Time, word: String, one: > Option[Int], state: State[Int]) => { > val sum = state.getOption.getOrElse(0) > val output = (word, sum) > > Some(output) > } > > > > // first stream > KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicsSet) > .flatMap(r=>r._2.split(" ")) > .map(x => (x, 1)) > > .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10))) > .print(1) > > > > // second stream > KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams2, mergeTopicSet) > .flatMap(r=>r._2.split(" ")) > .map(x => (x, 1)) > > .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10))) > .print(50) > > > In checkpointing directory, I can see two different state RDDs. > I am using spark-1.6.1 and kafka-0.8.2.1 > > Regards > Shekhar >
Re: Accumulator question
Your mail does not describe much , but wont a simple reduce function help you ? Something like as below val data = Seq(1,2,3,4,5,6,7) val rdd = sc.parallelize(data, 2) val sum = rdd.reduce((a,b) => a+b) Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Tue, May 10, 2016 at 10:44 AM, Abi wrote: > I am splitting an integer array in 2 partitions and using an accumulator > to sum the array. problem is > > 1. I am not seeing execution time becoming half of a linear summing. > > 2. The second node (from looking at timestamps) takes 3 times as long as > the first node. This gives the impression it is "waiting" for the first > node to finish. > > Hence, I am given the impression using accumulator.sum () in the kernel > and rdd.foreach (kernel) is making things sequential. > > Any api/setting suggestions where I could make things parallel ? > > On Mon, May 9, 2016 at 8:24 PM, Abi wrote: > >> I am splitting an integer array in 2 partitions and using an accumulator >> to sum the array. problem is >> >> 1. I am not seeing execution time becoming half of a linear summing. >> >> 2. The second node (from looking at timestamps) takes 3 times as long as >> the first node. This gives the impression it is "waiting" for the first >> node to finish. >> >> Hence, I am given the impression using accumulator.sum () in the kernel >> and rdd.foreach (kernel) is making things sequential. >> >> Any api/setting suggestions where I could make things parallel ? >> >> >> >
Re: Updating Values Inside Foreach Rdd loop
Hi Harsh, Probably you need to maintain some state for your values, as you are updating some of the keys in a batch and check for a global state of your equation. Can you check the API mapWithState of DStream ? Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Mon, May 9, 2016 at 8:40 PM, HARSH TAKKAR wrote: > Hi > > Please help. > > On Sat, 7 May 2016, 11:43 p.m. HARSH TAKKAR, > wrote: > >> Hi Ted >> >> Following is my use case. >> >> I have a prediction algorithm where i need to update some records to >> predict the target. >> >> For eg. >> I have an eq. Y= mX +c >> I need to change value of Xi of some records and calculate sum(Yi) if the >> value of prediction is not close to target value then repeat the process. >> >> In each iteration different set of values are updated but result is >> checked when we sum up the values. >> >> On Sat, 7 May 2016, 8:58 a.m. Ted Yu, wrote: >> >>> Using RDDs requires some 'low level' optimization techniques. >>> While using dataframes / Spark SQL allows you to leverage existing code. >>> >>> If you can share some more of your use case, that would help other >>> people provide suggestions. >>> >>> Thanks >>> >>> On May 6, 2016, at 6:57 PM, HARSH TAKKAR wrote: >>> >>> Hi Ted >>> >>> I am aware that rdd are immutable, but in my use case i need to update >>> same data set after each iteration. >>> >>> Following are the points which i was exploring. >>> >>> 1. Generating rdd in each iteration.( It might use a lot of memory). >>> >>> 2. Using Hive tables and update the same table after each iteration. >>> >>> Please suggest,which one of the methods listed above will be good to use >>> , or is there are more better ways to accomplish it. >>> >>> On Fri, 6 May 2016, 7:09 p.m. Ted Yu, wrote: >>> Please see the doc at the beginning of RDD class: * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, On Fri, May 6, 2016 at 5:25 AM, HARSH TAKKAR wrote: > Hi > > Is there a way i can modify a RDD, in for-each loop, > > Basically, i have a use case in which i need to perform multiple > iteration over data and modify few values in each iteration. > > > Please help. >
Re: partitioner aware subtract
As you have same partitioner and number of partitions probably you can use zipPartition and provide a user defined function to substract . A very primitive example being. val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7) val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6) val rdd1 = sc.parallelize(data1, 2) val rdd2 = sc.parallelize(data2, 2) val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) => leftItr.filter(p => !rightItr.contains(p)) } sum.foreach(println) Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju wrote: > We tried that but couldn't figure out a way to efficiently filter it. Lets > take two RDDs. > > rdd1: > > (1,2) > (1,5) > (2,3) > (3,20) > (3,16) > > rdd2: > > (1,2) > (3,30) > (3,16) > (5,12) > > rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2): > > (1,(2,Some(2))) > (1,(5,Some(2))) > (2,(3,None)) > (3,(20,Some(30))) > (3,(20,Some(16))) > (3,(16,Some(30))) > (3,(16,Some(16))) > > case (x, (y, z)) => Apart from allowing z == None and filtering on y == z, > we also should filter out (3, (16, Some(30))). How can we do that > efficiently without resorting to broadcast of any elements of rdd2? > > Regards, > Raghava. > > > On Mon, May 9, 2016 at 6:27 AM, ayan guha wrote: > >> How about outer join? >> On 9 May 2016 13:18, "Raghava Mutharaju" >> wrote: >> >>> Hello All, >>> >>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key >>> (number of partitions are same for both the RDDs). We would like to >>> subtract rdd2 from rdd1. >>> >>> The subtract code at >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala >>> seems to group the elements of both the RDDs using (x, null) where x is the >>> element of the RDD and partition them. Then it makes use of >>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our >>> case, is both key and value combined). In our case, both the RDDs are >>> already hash partitioned on the key of x. Can we take advantage of this by >>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use >>> mapPartitions() for this? >>> >>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to >>> be memory consuming and inefficient. We tried to do a local set difference >>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did >>> use destroy() on the broadcasted value, but it does not help. >>> >>> The current subtract method is slow for us. rdd1 and rdd2 are around >>> 700MB each and the subtract takes around 14 seconds. >>> >>> Any ideas on this issue is highly appreciated. >>> >>> Regards, >>> Raghava. >>> >> > > > -- > Regards, > Raghava > http://raghavam.github.io >
Re: Spark 1.6 Catalyst optimizer
It does push the predicate. But as a relations are generic and might or might not handle some of the predicates , it needs to apply filter of un-handled predicates. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues < telmo.galante.rodrig...@gmail.com> wrote: > Hello, > > I have a question about the Catalyst optimizer in Spark 1.6. > > initial logical plan: > > !'Project [unresolvedalias(*)] > !+- 'Filter ('t.id = 1) > ! +- 'Join Inner, Some(('t.id = 'u.id)) > ! :- 'UnresolvedRelation `t`, None > ! +- 'UnresolvedRelation `u`, None > > > logical plan after optimizer execution: > > Project [id#0L,id#1L] > !+- Filter (id#0L = cast(1 as bigint)) > ! +- Join Inner, Some((id#0L = id#1L)) > ! :- Subquery t > ! : +- Relation[id#0L] JSONRelation > ! +- Subquery u > ! +- Relation[id#1L] JSONRelation > > > Shouldn't the optimizer push down predicates to subquery t in order to the > filter be executed before join? > > Thanks > > >
Re: Spark 1.6 Catalyst optimizer
Will try with JSON relation, but with Spark's temp tables (Spark version 1.6 ) I get an optimized plan as you have mentioned. Should not be much different though. Query : "select t1.col2, t1.col3 from t1, t2 where t1.col1=t2.col1 and t1.col3=7" Plan : Project [COL2#1,COL3#2] +- Join Inner, Some((COL1#0 = COL1#3)) :- Filter (COL3#2 = 7) : +- LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[4] at apply at Transformer.scala:22 +- Project [COL1#3] +- LogicalRDD [col1#3,col2#4,col3#5], MapPartitionsRDD[5] at apply at Transformer.scala:22 Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Wed, May 11, 2016 at 4:56 PM, Telmo Rodrigues < telmo.galante.rodrig...@gmail.com> wrote: > In this case, isn't better to perform the filter earlier as possible even > there could be unhandled predicates? > > Telmo Rodrigues > > No dia 11/05/2016, às 09:49, Rishi Mishra > escreveu: > > It does push the predicate. But as a relations are generic and might or > might not handle some of the predicates , it needs to apply filter of > un-handled predicates. > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra > > On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues < > telmo.galante.rodrig...@gmail.com> wrote: > >> Hello, >> >> I have a question about the Catalyst optimizer in Spark 1.6. >> >> initial logical plan: >> >> !'Project [unresolvedalias(*)] >> !+- 'Filter ('t.id = 1) >> ! +- 'Join Inner, Some(('t.id = 'u.id)) >> ! :- 'UnresolvedRelation `t`, None >> ! +- 'UnresolvedRelation `u`, None >> >> >> logical plan after optimizer execution: >> >> Project [id#0L,id#1L] >> !+- Filter (id#0L = cast(1 as bigint)) >> ! +- Join Inner, Some((id#0L = id#1L)) >> ! :- Subquery t >> ! : +- Relation[id#0L] JSONRelation >> ! +- Subquery u >> ! +- Relation[id#1L] JSONRelation >> >> >> Shouldn't the optimizer push down predicates to subquery t in order to >> the filter be executed before join? >> >> Thanks >> >> >> >
Re: RDD and Dataframes
Yes, finally it will be converted to an RDD internally. However DataFrame queries are passed through catalyst , which provides several optimizations e.g. code generation, intelligent shuffle etc , which is not the case for pure RDDs. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Jul 7, 2016 at 4:50 PM, brccosta wrote: > Dear guys, > > I'm investigating the differences between RDDs and Dataframes/Datasets. I > couldn't find the answer for this question: Dataframes acts as a new layer > in the Spark stack? I mean, in the execution there is a conversion to RDD? > > For example, if I create a Dataframe and perform a query, in the final step > it will be transformed into a RDD to be executed in Spark? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Dataframes-tp27306.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Extend Dataframe API
Or , you can extend SQLContext to add your plans . Not sure if it fits your requirement , but answered to highlight an option. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Jul 7, 2016 at 8:39 PM, tan shai wrote: > That was what I am thinking to do. > > Do you have any idea about this? Or any documentations? > > Many thanks. > > 2016-07-07 17:07 GMT+02:00 Koert Kuipers : > >> i dont see any easy way to extend the plans, beyond creating a custom >> version of spark. >> >> On Thu, Jul 7, 2016 at 9:31 AM, tan shai wrote: >> >>> Hi, >>> >>> I need to add new operations to the dataframe API. >>> Can any one explain to me how to extend the plans of query execution? >>> >>> Many thanks. >>> >> >> >
Re: Union of RDDs without the overhead of Union
Agree with Koert that UnionRDD should have a narrow dependencies . Although union of two RDDs increases the number of tasks to be executed ( rdd1.partitions + rdd2.partitions) . If your two RDDs have same number of partitions , you can also use zipPartitions, which causes lesser number of tasks, hence less overhead. On Wed, Feb 3, 2016 at 9:58 AM, Koert Kuipers wrote: > i am surprised union introduces a stage. UnionRDD should have only narrow > dependencies. > > On Tue, Feb 2, 2016 at 11:25 PM, Koert Kuipers wrote: > >> well the "hadoop" way is to save to a/b and a/c and read from a/* :) >> >> On Tue, Feb 2, 2016 at 11:05 PM, Jerry Lam wrote: >> >>> Hi Spark users and developers, >>> >>> anyone knows how to union two RDDs without the overhead of it? >>> >>> say rdd1.union(rdd2).saveTextFile(..) >>> This requires a stage to union the 2 rdds before saveAsTextFile (2 >>> stages). Is there a way to skip the union step but have the contents of the >>> two rdds save to the same output text file? >>> >>> Thank you! >>> >>> Jerry >>> >> >> > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: Unit test with sqlContext
Hi Steve, Have you cleaned up your SparkContext ( sc.stop()) , in a afterAll(). The error suggests you are creating more than one SparkContext. On Fri, Feb 5, 2016 at 10:04 AM, Holden Karau wrote: > Thanks for recommending spark-testing-base :) Just wanted to add if anyone > has feature requests for Spark testing please get in touch (or add an issue > on the github) :) > > > On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> Hi Steve, >> >> Have you looked at the spark-testing-base package by Holden? It’s really >> useful for unit testing Spark apps as it handles all the bootstrapping for >> you. >> >> https://github.com/holdenk/spark-testing-base >> >> DataFrame examples are here: >> https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala >> >> Thanks, >> Silvio >> >> From: Steve Annessa >> Date: Thursday, February 4, 2016 at 8:36 PM >> To: "user@spark.apache.org" >> Subject: Unit test with sqlContext >> >> I'm trying to unit test a function that reads in a JSON file, manipulates >> the DF and then returns a Scala Map. >> >> The function has signature: >> def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext) >> >> I've created a bootstrap spec for spark jobs that instantiates the Spark >> Context and SQLContext like so: >> >> @transient var sc: SparkContext = _ >> @transient var sqlContext: SQLContext = _ >> >> override def beforeAll = { >> System.clearProperty("spark.driver.port") >> System.clearProperty("spark.hostPort") >> >> val conf = new SparkConf() >> .setMaster(master) >> .setAppName(appName) >> >> sc = new SparkContext(conf) >> sqlContext = new SQLContext(sc) >> } >> >> When I do not include sqlContext, my tests run. Once I add the sqlContext >> I get the following errors: >> >> 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being >> constructed (or threw an exception in its constructor). This may indicate >> an error, since only one SparkContext may be running in this JVM (see >> SPARK-2243). The other SparkContext was created at: >> org.apache.spark.SparkContext.(SparkContext.scala:81) >> >> 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext. >> akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is >> not unique! >> >> and finally: >> >> [info] IngestSpec: >> [info] Exception encountered when attempting to run a suite with class >> name: com.company.package.IngestSpec *** ABORTED *** >> [info] akka.actor.InvalidActorNameException: actor name >> [ExecutorEndpoint] is not unique! >> >> >> What do I need to do to get a sqlContext through my tests? >> >> Thanks, >> >> -- Steve >> > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: spark.executor.memory ? is used just for cache RDD or both cache RDD and the runtime of cores on worker?
You would probably like to see http://spark.apache.org/docs/latest/configuration.html#memory-management. Other config parameters are also explained there. On Fri, Feb 5, 2016 at 10:56 AM, charles li wrote: > if set spark.executor.memory = 2G for each worker [ 10 in total ] > > does it mean I can cache 20G RDD in memory ? if so, how about the memory > for code running in each process on each worker? > > thanks. > > > -- > and is there any materials about memory management or resource management > in spark ? I want to put spark in production, but have little knowing about > the resource management in spark, great thanks again > > > -- > *--* > a spark lover, a quant, a developer and a good man. > > http://github.com/litaotao > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: Spark : Unable to connect to Oracle
ASFIK sc.addJar() will add the jars to executor's classpath . The datasource resolution ( createRelation) happens at driver side and driver classpath should contain the ojdbc6.jar. You can use "spark.driver.extraClassPath" config parameter to set the same. On Wed, Feb 10, 2016 at 3:08 PM, Jorge Machado wrote: > Hi Divya, > > You need to install the Oracle jdbc driver on the cluster into lib folder. > > On 10/02/2016, at 09:37, Divya Gehlot wrote: > > oracle.jdbc.driver.OracleDrive > > > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: SparkSQL parallelism
I am not sure why all 3 nodes should query. If you have not mentioned any partitions it should only be one partition of JDBCRDD where all dataset should reside. On Fri, Feb 12, 2016 at 10:15 AM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > I have a spark cluster with One Master and 3 worker nodes. I have written > a below code to fetch the records from oracle using sparkSQL > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > val employees = sqlContext.read.format("jdbc").options( > Map("url" -> "jdbc:oracle:thin:@:1525:SID", > "dbtable" -> "(select * from employee where name like '%18%')", > "user" -> "username", > "password" -> "password")).load > > I have a submitted this job to spark cluster using spark-submit command. > > > > *Looks like, All 3 workers are executing same query and fetching same > data. It means, it is making 3 jdbc calls to oracle.* > *How to make this code to make a single jdbc call to oracle(In case of > more than one worker) ?* > > Please help me to resolve this use case > > Regards, > Rajesh > > > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: How to use a custom partitioner in a dataframe in Spark
Unfortunately there is not any, at least till 1.5. Have not gone through the new DataSet of 1.6. There is some basic support for Parquet like partitionByColumn. If you want to partition your dataset on a certain way you have to use an RDD to partition & convert that into a DataFrame before storing in table. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Tue, Feb 16, 2016 at 11:51 PM, SRK wrote: > Hi, > > How do I use a custom partitioner when I do a saveAsTable in a dataframe. > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: How to use a custom partitioner in a dataframe in Spark
Michael, Is there any specific reason why DataFrames does not have partitioners like RDDs ? This will be very useful if one is writing custom datasources , which keeps data in partitions. While storing data one can pre-partition the data at Spark level rather than at the datasource. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy wrote: > So suppose I have a bunch of userIds and I need to save them as parquet in > database. I also need to load them back and need to be able to do a join > on userId. My idea is to partition by userId hashcode first and then on > userId. So that I don't have to deal with any performance issues because of > a number of small files and also to be able to scan faster. > > > Something like ...df.write.format("parquet").partitionBy( "userIdHash" > , "userId").mode(SaveMode.Append).save("userRecords"); > > On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> So suppose I have a bunch of userIds and I need to save them as parquet >> in database. I also need to load them back and need to be able to do a join >> on userId. My idea is to partition by userId hashcode first and then on >> userId. >> >> >> >> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> Can you describe what you are trying to accomplish? What would the >>> custom partitioner be? >>> >>> On Tue, Feb 16, 2016 at 1:21 PM, SRK wrote: >>> Hi, How do I use a custom partitioner when I do a saveAsTable in a dataframe. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org >>> >> >
Re: Join and HashPartitioner question
AFAIK and can see in the code both of them should behave same. On Sat, Nov 14, 2015 at 2:10 AM, Alexander Pivovarov wrote: > Hi Everyone > > Is there any difference in performance btw the following two joins? > > > val r1: RDD[(String, String]) = ??? > val r2: RDD[(String, String]) = ??? > > val partNum = 80 > val partitioner = new HashPartitioner(partNum) > > // Join 1 > val res1 = r1.partitionBy(partitioner).join(r2.partitionBy(partitioner)) > > // Join 2 > val res2 = r1.join(r2, partNum) > > > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: ClassLoader resources on executor
Did you try to use *spark.executor.extraClassPath*. The classpath resources will be accessible through the executors class loader which executes your job. On Wed, Dec 2, 2015 at 2:15 AM, Charles Allen wrote: > Is there a way to pass configuration file resources to be resolvable > through the classloader? > > For example, if I'm using a library (non-spark) that can use a > some-lib.properties file in the classpath/classLoader, can I pass that file > so that when it tries to get the resource from the classloader it is able > to find it? > > One potential solution is to take the files and package them as resources > in a jar, and include the jar as part of the spark job, but that feels like > a hack instead of an actual solution. > > Is there any support or planned support for such a thing? > > https://github.com/apache/spark/pull/9118 seems to tackle a similar > problem in a hard-coded kind of way. > > Thank you, > Charles Allen > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: SparkSQL API to insert DataFrame into a static partition?
As long as all your data is being inserted by Spark , hence using the same hash partitioner, what Fengdong mentioned should work. On Wed, Dec 2, 2015 at 9:32 AM, Fengdong Yu wrote: > Hi > you can try: > > if your table under location “/test/table/“ on HDFS > and has partitions: > > “/test/table/dt=2012” > “/test/table/dt=2013” > > df.write.mode(SaveMode.Append).partitionBy("date”).save(“/test/table") > > > > On Dec 2, 2015, at 10:50 AM, Isabelle Phan wrote: > > df.write.partitionBy("date").insertInto("my_table") > > > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: How to use collections inside foreach block
Your list is defined on the driver, whereas function specified in forEach will be evaluated on each executor. You might want to add an accumulator or handle a Sequence of list from each partition. On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > I have a below query. Please help me to solve this > > I have a 2 ids. I want to join these ids to table. This table contains > some blob data. So i can not join these 2000 ids to this table in one step. > > I'm planning to join this table in a chunks. For example, each step I will > join 5000 ids. > > Below code is not working. I'm not able to add result to ListBuffer. > Result s giving always ZERO > > *Code Block :-* > > var listOfIds is a ListBuffer with 2 records > > listOfIds.grouped(5000).foreach { x => > { > var v1 = new ListBuffer[String]() > val r = sc.parallelize(x).toDF() > r.registerTempTable("r") > var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id = > t.id") > result.foreach{ y => > { > v1 += y > } > } > println(" SIZE OF V1 === "+ v1.size) ==> > > *THIS VALUE PRINTING AS ZERO* > > *// Save v1 values to other db* > } > > Please help me on this. > > Regards, > Rajesh > -- Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra
Re: DataFrame joins with Spark-Java
Hi Sushma, can you try as below with a left anti join ..In my example name & id consists of a key. df1.alias("a").join(df2.alias("b"), col("a.name").equalTo(col("b.name")) .and(col("a.id").equalTo(col("b.id"))) , "left_anti").selectExpr("name", "id").show(10, false); Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Nov 30, 2017 at 7:38 AM, sushma spark wrote: > Dear Friends, > > I am new to spark DataFrame. My requirement is i have a dataframe1 > contains the today's records and dataframe2 contains yesterday's records. I > need to compare the today's records with yesterday's records and find out > new records which are not exists in the yesterday's records based on the > primary key of the column. Here, the problem is sometimes there are > multiple columns having primary keys. > > I am receiving primary key columns in a List. > > example: > > List primaryKeyList = listOfPrimarykeys; // single or multiple > primary key columns > > DataFrame currentDataRecords = queryexecutor.getCurrentRecords(); // this > contains today's records > DataFrame yesterdayRecords = queryexecutor.getYesterdayRecords();// this > contains yesterday's records > > Can you anyone help me how to join these two dataframes and apply WHERE > conditions on columns dynamically with SPARK-JAVA code. > > Thanks > Sushma > >
Re: Joining streaming data with static table data.
You can do a join between streaming dataset and a static dataset. I would prefer your first approach. But the problem with this approach is performance. Unless you cache the dataset , every time you fire a join query it will fetch the latest records from the table. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Tue, Dec 12, 2017 at 6:29 AM, satyajit vegesna < satyajit.apas...@gmail.com> wrote: > Hi All, > > I working on real time reporting project and i have a question about > structured streaming job, that is going to stream a particular table > records and would have to join to an existing table. > > Stream > query/join to another DF/DS ---> update the Stream data > record. > > Now i have a problem on how do i approach the mid layer(query/join to > another DF/DS), should i create a DF from spark.read.format("JDBC") or > "stream and maintain the data in memory sink" or if there is any better way > to do it. > > Would like to know, if anyone has faced a similar scenario and have any > suggestion on how to go ahead. > > Regards, > Satyajit. >