Re: Leveraging S3 select
Hey Steve, Happen to have a link to the TPC-DS benchmark data w/random S3 reads? I've done a decent amount of digging, but all I've found is a reference in a slide deck and some jira tickets. From: Steve LoughranDate: Tuesday, December 5, 2017 at 09:44 To: "Lalwani, Jayesh" Cc: Apache Spark Dev Subject: Re: Leveraging S3 select On 29 Nov 2017, at 21:45, Lalwani, Jayesh > wrote: AWS announced at re:Invent that they are launching S3 Select. This can allow Spark to push down predicates to S3, rather than read the entire file in memory. Are there any plans to update Spark to use S3 Select? 1. ORC and Parquet don't read the whole file in memory anyway, except in the special case that the file is gzipped 2. Hadoop's s3a <= 2.7 doesn't handle the aggressive seeks of those columnar formats that well, as it does a GET pos-EOF & has to abort the TCP connection if the seek is backwards 3. Hadoop 2.8+ with spark.hadoop.fs.s3a.experimental.fadvise=random switches to random IO and only does smaller GET reads of the data requested (actually min(min-read-length, buffer-size). This delivers ~3x performance boost in TCP-DS benchmarks I don't yet know how much more efficient the new mechanism will be against columnar data, given those facts. You'd need to do experiments The place to implement this would be though predicate push down from the file format to the FS. ORC & Parquet support predicate pushdown, so they'd need to recognise when the underlying store could do some of the work for them, open the store input stream differently, and use a whole new (undefined?) API to the queries. Most likely: s3a would add a way to specify a predicate to select on in open(), as well as the expected file type. This would need the underlying mechanism to also support those formats though, which the announcement doesn't/ Someone could do something more immediately though some modified CSV data source which did the pushdown. However, If you are using CSV for your datasets, there's something fundamental w.r.t your data storage policy you need to look at. It works sometimes as an exchange format, though I prefer Avro there due to its schemas and support for more complex structures. As a format you run queries over? No.
Re: Broadcast big dataset
Have you tried upping executor memory? There's a separate spark conf for that: spark.executor.memory In general driver configurations don't automatically apply to executors. On Wed, Sep 28, 2016 at 7:03 AM -0700, "WangJianfei"wrote: Hi Devs In my application, i just broadcast a dataset(about 500M) to the ececutors(100+), I got a java heap error Jmartad-7219.hadoop.jd.local:53591 (size: 4.0 MB, free: 3.3 GB) 16/09/28 15:56:48 INFO BlockManagerInfo: Added broadcast_9_piece19 in memory on BJHC-Jmartad-9012.hadoop.jd.local:53197 (size: 4.0 MB, free: 3.3 GB) 16/09/28 15:56:49 INFO BlockManagerInfo: Added broadcast_9_piece8 in memory on BJHC-Jmartad-84101.hadoop.jd.local:52044 (size: 4.0 MB, free: 3.3 GB) 16/09/28 15:56:58 INFO BlockManagerInfo: Removed broadcast_8_piece0 on 172.22.176.114:37438 in memory (size: 2.7 KB, free: 3.1 GB) 16/09/28 15:56:58 WARN TaskSetManager: Lost task 125.0 in stage 7.0 (TID 130, BJHC-Jmartad-9376.hadoop.jd.local): java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3465) at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3271) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) My configuration is 4G memory in driver. Any advice is appreciated. Thank you! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Broadcast-big-dataset-tp19127.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: What's the use of RangePartitioner.hashCode
Pedantic note about hashCode and equals: the equality doesn't need to be bidirectional, you just need to ensure that a.hashCode == b.hashCode when a.equals(b), the bidirectional case is usually harder to satisfy due to possibility of collisions. Good info: http://www.programcreek.com/2011/07/java-equals-and-hashcode-contract/ _ From: Jakob OderskySent: Wednesday, September 21, 2016 15:12 Subject: Re: What's the use of RangePartitioner.hashCode To: WangJianfei Cc: dev Hi, It is used jointly with a custom implementation of the `equals` method. In Scala, you can override the `equals` method to change the behaviour of `==` comparison. On example of this would be to compare classes based on their parameter values (i.e. what case classes do). Partitioners aren't case classes however it makes sense to have a value comparison between them (see RDD.subtract for an example) and hence they redefine the equals method. When redefining an equals method, it is good practice to also redefine the hashCode method so that `a == b` iff `a.hashCode == b.hashCode` (e.g. this is useful when your objects will be stored in a hash map). You can learn more about redefining the equals method and hashcodes here https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch04s16.html regards, --Jakob On Thu, Sep 15, 2016 at 6:17 PM, WangJianfei wrote: > who can give me an example of the use of RangePartitioner.hashCode, thank > you! > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/What-s-the-use-of-RangePartitioner-hashCode-tp18953.html > Sent from the Apache Spark Developers List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
master snapshots not publishing?
I’m trying to use a Snapshot build off of master, and after looking through Jenkins it appears that the last commit where the snapshot was built is back on 757dc2c09d23400dacac22e51f52062bbe471136, 22 days ago: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/ Looking at the Jenkins page it says that the master-maven build is disabled, is this purposeful? -Andrew
Re: Support for local disk columnar storage for DataFrames
Relevant link: http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files On Wed, Nov 11, 2015 at 7:31 PM, Reynold Xinwrote: > Thanks for the email. Can you explain what the difference is between this > and existing formats such as Parquet/ORC? > > > On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < > cristian.b.op...@googlemail.com> wrote: > >> Hi, >> >> I was wondering if there's any planned support for local disk columnar >> storage. >> >> This could be an extension of the in-memory columnar store, or possibly >> something similar to the recently added local checkpointing for RDDs >> >> This could also have the added benefit of enabling iterative usage for >> DataFrames by pruning the query plan through local checkpoints. >> >> A further enhancement would be to add update support to the columnar >> format (in the immutable copy-on-write sense of course), by maintaining >> references to unchanged row blocks and only copying and mutating the ones >> that have changed. >> >> A use case here is streaming and merging updates in a large dataset that >> can be efficiently stored internally in a columnar format, rather than >> accessing a more inefficient external data store like HDFS or Cassandra. >> >> Thanks, >> Cristian >> > >