Re: Custom Partitioning in Catalyst

2017-06-16 Thread Reynold Xin
Seems like a great idea to do?


On Fri, Jun 16, 2017 at 12:03 PM, Russell Spitzer  wrote:

> I considered adding this to DataSource APIV2 ticket but I didn't want to
> be first :P Do you think there will be any issues with opening up the
> partitioning as well?
>
> On Fri, Jun 16, 2017 at 11:58 AM Reynold Xin  wrote:
>
>> Perhaps we should extend the data source API to support that.
>>
>>
>> On Fri, Jun 16, 2017 at 11:37 AM, Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> I've been trying to work with making Catalyst Cassandra partitioning
>>> aware. There seem to be two major blocks on this.
>>>
>>> The first is that DataSourceScanExec is unable to learn what the
>>> underlying partitioning should be from the BaseRelation it comes from. I'm
>>> currently able to get around this by using the DataSourceStrategy plan and
>>> then transforming the resultant DataSourceScanExec.
>>>
>>> The second is that the Partitioning trait is sealed. I want to define a
>>> new partitioning which is Clustered but is not hashed based on certain
>>> columns. It would look almost identical to the HashPartitioning class
>>> except the
>>> expression which returns a valid PartitionID given expressions would be
>>> different.
>>>
>>> Anyone have any ideas on how to get around the second issue? Would it be
>>> worth while to make changes to allow BaseRelations to advertise a
>>> particular Partitioner?
>>>
>>
>>


Re: Custom Partitioning in Catalyst

2017-06-16 Thread Reynold Xin
Perhaps we should extend the data source API to support that.


On Fri, Jun 16, 2017 at 11:37 AM, Russell Spitzer  wrote:

> I've been trying to work with making Catalyst Cassandra partitioning
> aware. There seem to be two major blocks on this.
>
> The first is that DataSourceScanExec is unable to learn what the
> underlying partitioning should be from the BaseRelation it comes from. I'm
> currently able to get around this by using the DataSourceStrategy plan and
> then transforming the resultant DataSourceScanExec.
>
> The second is that the Partitioning trait is sealed. I want to define a
> new partitioning which is Clustered but is not hashed based on certain
> columns. It would look almost identical to the HashPartitioning class
> except the
> expression which returns a valid PartitionID given expressions would be
> different.
>
> Anyone have any ideas on how to get around the second issue? Would it be
> worth while to make changes to allow BaseRelations to advertise a
> particular Partitioner?
>


Custom Partitioning in Catalyst

2017-06-16 Thread Russell Spitzer
I've been trying to work with making Catalyst Cassandra partitioning aware.
There seem to be two major blocks on this.

The first is that DataSourceScanExec is unable to learn what the underlying
partitioning should be from the BaseRelation it comes from. I'm currently
able to get around this by using the DataSourceStrategy plan and then
transforming the resultant DataSourceScanExec.

The second is that the Partitioning trait is sealed. I want to define a new
partitioning which is Clustered but is not hashed based on certain columns.
It would look almost identical to the HashPartitioning class except the
expression which returns a valid PartitionID given expressions would be
different.

Anyone have any ideas on how to get around the second issue? Would it be
worth while to make changes to allow BaseRelations to advertise a
particular Partitioner?


Re: structured streaming documentation does not match behavior

2017-06-16 Thread Shixiong(Ryan) Zhu
I created https://issues.apache.org/jira/browse/SPARK-21123. PR is welcome.

On Thu, Jun 15, 2017 at 10:55 AM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> Good catch. These are file source options. Could you submit a PR to fix
> the doc? Thanks!
>
> On Thu, Jun 15, 2017 at 10:46 AM, Mendelson, Assaf <
> assaf.mendel...@rsa.com> wrote:
>
>> Hi,
>>
>> I have started to play around with structured streaming and it seems the
>> documentation (structured streaming programming guide) does not match the
>> actual behavior I am seeing.
>>
>> It says in the documentation that maxFilesPerTrigger (as well as
>> latestFirst) are options for the File sink. However, in fact, at least
>> maxFilesPerTrigger does not seem to have any real effect. On the other
>> hand, the streaming source (readStream) which has no documentation for this
>> option, does limit the number of files.
>>
>> This behavior actually makes more sense than the documentation as I
>> expect the file reader to define how to read files rather than the sink
>> (e.g. if I would use a kafka sink or foreach sink, they should still get
>> the same behavior from the reading).
>>
>>
>>
>> Thanks,
>>
>>   Assaf.
>>
>>
>>
>
>


Re: How does MapWithStateRDD distribute the data

2017-06-16 Thread coolcoolkid
Hello,

I have encountered some situation just like what is described above. I am
running a Spark Streaming Application with 2 executors, 16 cores and 10G
memory for each executor and the input topic Kafka has 64 partitions.

My code are like this:

KafkaUtils.createDirectStream(...) 
...
.map(s => (k, v))
.mapWithState(...numPartitions(32))
...
.foreachRdd(_.foreachPartition(output))


I was also expecting the 32 partitions of the MapWithStateRDD would be
distributed evenly between the two executors, but it turned out that all the
32 were on one of them.

I noticed that you replyed 'Are you using KafkaUtils.createDirectStream? '
and I was wondering whether this Kafka direct stream lead to this situation.
Or is there something else?

Thanks a lot!




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/How-does-MapWithStateRDD-distribute-the-data-tp18544p21770.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org