Re: Custom Partitioning in Catalyst
Seems like a great idea to do? On Fri, Jun 16, 2017 at 12:03 PM, Russell Spitzerwrote: > I considered adding this to DataSource APIV2 ticket but I didn't want to > be first :P Do you think there will be any issues with opening up the > partitioning as well? > > On Fri, Jun 16, 2017 at 11:58 AM Reynold Xin wrote: > >> Perhaps we should extend the data source API to support that. >> >> >> On Fri, Jun 16, 2017 at 11:37 AM, Russell Spitzer < >> russell.spit...@gmail.com> wrote: >> >>> I've been trying to work with making Catalyst Cassandra partitioning >>> aware. There seem to be two major blocks on this. >>> >>> The first is that DataSourceScanExec is unable to learn what the >>> underlying partitioning should be from the BaseRelation it comes from. I'm >>> currently able to get around this by using the DataSourceStrategy plan and >>> then transforming the resultant DataSourceScanExec. >>> >>> The second is that the Partitioning trait is sealed. I want to define a >>> new partitioning which is Clustered but is not hashed based on certain >>> columns. It would look almost identical to the HashPartitioning class >>> except the >>> expression which returns a valid PartitionID given expressions would be >>> different. >>> >>> Anyone have any ideas on how to get around the second issue? Would it be >>> worth while to make changes to allow BaseRelations to advertise a >>> particular Partitioner? >>> >> >>
Re: Custom Partitioning in Catalyst
Perhaps we should extend the data source API to support that. On Fri, Jun 16, 2017 at 11:37 AM, Russell Spitzerwrote: > I've been trying to work with making Catalyst Cassandra partitioning > aware. There seem to be two major blocks on this. > > The first is that DataSourceScanExec is unable to learn what the > underlying partitioning should be from the BaseRelation it comes from. I'm > currently able to get around this by using the DataSourceStrategy plan and > then transforming the resultant DataSourceScanExec. > > The second is that the Partitioning trait is sealed. I want to define a > new partitioning which is Clustered but is not hashed based on certain > columns. It would look almost identical to the HashPartitioning class > except the > expression which returns a valid PartitionID given expressions would be > different. > > Anyone have any ideas on how to get around the second issue? Would it be > worth while to make changes to allow BaseRelations to advertise a > particular Partitioner? >
Custom Partitioning in Catalyst
I've been trying to work with making Catalyst Cassandra partitioning aware. There seem to be two major blocks on this. The first is that DataSourceScanExec is unable to learn what the underlying partitioning should be from the BaseRelation it comes from. I'm currently able to get around this by using the DataSourceStrategy plan and then transforming the resultant DataSourceScanExec. The second is that the Partitioning trait is sealed. I want to define a new partitioning which is Clustered but is not hashed based on certain columns. It would look almost identical to the HashPartitioning class except the expression which returns a valid PartitionID given expressions would be different. Anyone have any ideas on how to get around the second issue? Would it be worth while to make changes to allow BaseRelations to advertise a particular Partitioner?
Re: structured streaming documentation does not match behavior
I created https://issues.apache.org/jira/browse/SPARK-21123. PR is welcome. On Thu, Jun 15, 2017 at 10:55 AM, Shixiong(Ryan) Zhu < shixi...@databricks.com> wrote: > Good catch. These are file source options. Could you submit a PR to fix > the doc? Thanks! > > On Thu, Jun 15, 2017 at 10:46 AM, Mendelson, Assaf < > assaf.mendel...@rsa.com> wrote: > >> Hi, >> >> I have started to play around with structured streaming and it seems the >> documentation (structured streaming programming guide) does not match the >> actual behavior I am seeing. >> >> It says in the documentation that maxFilesPerTrigger (as well as >> latestFirst) are options for the File sink. However, in fact, at least >> maxFilesPerTrigger does not seem to have any real effect. On the other >> hand, the streaming source (readStream) which has no documentation for this >> option, does limit the number of files. >> >> This behavior actually makes more sense than the documentation as I >> expect the file reader to define how to read files rather than the sink >> (e.g. if I would use a kafka sink or foreach sink, they should still get >> the same behavior from the reading). >> >> >> >> Thanks, >> >> Assaf. >> >> >> > >
Re: How does MapWithStateRDD distribute the data
Hello, I have encountered some situation just like what is described above. I am running a Spark Streaming Application with 2 executors, 16 cores and 10G memory for each executor and the input topic Kafka has 64 partitions. My code are like this: KafkaUtils.createDirectStream(...) ... .map(s => (k, v)) .mapWithState(...numPartitions(32)) ... .foreachRdd(_.foreachPartition(output)) I was also expecting the 32 partitions of the MapWithStateRDD would be distributed evenly between the two executors, but it turned out that all the 32 were on one of them. I noticed that you replyed 'Are you using KafkaUtils.createDirectStream? ' and I was wondering whether this Kafka direct stream lead to this situation. Or is there something else? Thanks a lot! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-does-MapWithStateRDD-distribute-the-data-tp18544p21770.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org