Re: Process for backports?

2018-04-24 Thread Reynold Xin
1. We don't backport features.

2. In general we don't bump dependencies, unless they are for critical bug
fixes.

3. We weight the risk of new regression vs bug fixes. To state the obvious,
we wouldn't backport a bug fix if it only affects a very small number of
use cases but require very complex changes. There is huge gray zone in
between here that'd rely on committer's judgement.





On Tue, Apr 24, 2018 at 3:56 PM, Cody Koeninger  wrote:

>  https://issues.apache.org/jira/browse/SPARK-24067
>
> is asking to backport a change to the 2.3 branch.
>
> My questions
>
> - In general are there any concerns about what qualifies for backporting?
> This adds a configuration variable but shouldn't change default behavior.
>
> - Is a separate jira + pr actually necessary?
> Seems like the merge_spark_pr.py script is set up to handle cherry
> picking the original merged PR in a case like this.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Process for backports?

2018-04-24 Thread Cody Koeninger
 https://issues.apache.org/jira/browse/SPARK-24067

is asking to backport a change to the 2.3 branch.

My questions

- In general are there any concerns about what qualifies for backporting?
This adds a configuration variable but shouldn't change default behavior.

- Is a separate jira + pr actually necessary?
Seems like the merge_spark_pr.py script is set up to handle cherry
picking the original merged PR in a case like this.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [MLLib] Logistic Regression and standadization

2018-04-24 Thread DB Tsai
As I’m one of the original authors, let me chime in for some comments. 

Without the standardization, the LBFGS will be unstable. For example, if a 
feature is being x 10, then the corresponding coefficient should be / 10 to 
make the same prediction. But without standardization, the LBFGS will converge 
to different solution due to numerical stability.

TLDR, this can be implemented in the optimizer or in the trainer. We choose to 
implement in the trainer as LBFGS optimizer in breeze suffers this issue. As an 
user, you don’t need to care much even you have one-hot encoding features, and 
the result should match R. 

DB Tsai  |  Siri Open Source Technologies [not a contribution]  |   Apple, Inc

> On Apr 20, 2018, at 5:56 PM, Weichen Xu  wrote:
> 
> Right. If regularization item isn't zero, then enable/disable standardization 
> will get different result.
> But, if comparing results between R-glmnet and mllib, if we set the same 
> parameters for regularization/standardization/... , then we should get the 
> same result. If not, then maybe there's a bug. In this case you can paste 
> your testing code and I can help fix it.
> 
> On Sat, Apr 21, 2018 at 1:06 AM, Valeriy Avanesov  > wrote:
> Hi all.
> 
> Filipp, do you use l1/l2/elstic-net penalization? I believe in this case 
> standardization matters.
> 
> Best,
> 
> Valeriy.
> 
> 
> On 04/17/2018 11:40 AM, Weichen Xu wrote:
>> Not a bug.
>> 
>> When disabling standadization, mllib LR will still do standadization for 
>> features, but it will scale the coefficients back at the end (after training 
>> finished). So it will get the same result with no standadization training. 
>> The purpose of it is to improve the rate of convergence. So the result 
>> should be always exactly the same with R's glmnet, no matter enable or 
>> disable standadization. 
>> 
>> Thanks!
>> 
>> On Sat, Apr 14, 2018 at 2:21 AM, Yanbo Liang > > wrote:
>> Hi Filipp,
>> 
>> MLlib’s LR implementation did the same way as R’s glmnet for 
>> standardization. 
>> Actually you don’t need to care about the implementation detail, as the 
>> coefficients are always returned on the original scale, so it should be 
>> return the same result as other popular ML libraries.
>> Could you point me where glmnet doesn’t scale features? 
>> I suspect other issues cause your prediction quality dropped. If you can 
>> share the code and data, I can help to check it.
>> 
>> Thanks
>> Yanbo
>> 
>> 
>>> On Apr 8, 2018, at 1:09 PM, Filipp Zhinkin >> > wrote:
>>> 
>>> Hi all,
>>> 
>>> While migrating from custom LR implementation to MLLib's LR implementation 
>>> my colleagues noticed that prediction quality dropped (accoring to 
>>> different business metrics).
>>> It's turned out that this issue caused by features standardization perfomed 
>>> by MLLib's LR: disregard to 'standardization' option's value all features 
>>> are scaled during loss and gradient computation (as well as in few other 
>>> places): 
>>> https://github.com/apache/spark/blob/6cc7021a40b64c41a51f337ec4be9545a25e838c/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala#L229
>>>  
>>> 
>>> 
>>> According to comments in the code, standardization should be implemented 
>>> the same way it was implementes in R's glmnet package. I've looked through 
>>> corresponding Fortran code, an it seems like glmnet don't scale features 
>>> when you're disabling standardisation (but MLLib still does).
>>> 
>>> Our models contains multiple one-hot encoded features and scaling them is a 
>>> pretty bad idea.
>>> 
>>> Why MLLib's LR always scale all features? From my POV it's a bug.
>>> 
>>> Thanks in advance,
>>> Filipp.
>>> 
>> 
>> 
> 
> 



Re: Transform plan with scope

2018-04-24 Thread Marco Gaido
Hi Joseph, Herman,

thanks for your answers. The specific rule I was looking at
is FoldablePropagation. If you look at it, what is done is that first a
AttributeMap containing all the possible foldable alias is collected, then
they are replace in the whole plan (it is a bit more complex than this, I
know, this is just a simplification). So in this scenario, if we have two
aliases in completely separated trees we are nonetheless replacing them,
since we have no idea of which is the scope where each of them is available
in.

I know that this specific problem can be solved in a much easier way: we
are facing a weird situation where there are two aliases with the same
exprId and this is not a very good situation (we can just fix it in the
analyzer, as you proposed Herman). But logically, it would make more sense
that a replacement like this is enforced only where an attribute is in
scope (in other places it should not occur) and I think that if we perform
the things as they are logically needed we are less likely to introduce in
weird bugs caused by situations which we never thought as possible (or
which they are not, but they may become). So I was thinking that if such an
operation could be useful also in other places, then probably we could
introduce it: that is the reason of this email thread, understanding if we
need it or if my idea is worthless (in this case, I apologize for wasting
your time).

Yes, Herman, the management of the state is the hardest point. My best idea
so far (but I am not satisfied with it) is to enforce that the state which
is passed from the each child to the parent extends both Growable and
Shrinkable and we pass two functions which for each node return
respectively the items to discard form the state and to add to it. But in
the case we think/decide that such an operation I proposed might be useful,
probably we can spend more time on investigating the best solution (any
suggestion in case would be very welcomed).

Any more thoughts on this?

Thanks for your answers and your time,
Marco

2018-04-24 19:47 GMT+02:00 Herman van Hövell tot Westerflier <
her...@databricks.com>:

> Hi Marco,
>
> In the case of SPARK-24015 we should perhaps fix this in the analyzer. It
> seems that the plan is somewhat invalid.
>
> If you do want to fix it in the optimizer we might be able to fix it using
> an approach for similar to RemoveRedundantAliases (which manually recurses
> down the tree).
>
> As for your proposal we could explore this a little bit. My main question
> would be how would you pass information up the tree (from child to parent),
> and how would you merge such information if there are multiple children? It
> might be kind of painful to generalize.
>
> - Herman
>
> On Tue, Apr 24, 2018 at 7:37 PM, Joseph Torres <
> joseph.tor...@databricks.com> wrote:
>
>> Is there some transformation we'd want to apply to that tree, but can't
>> because we have no concept of scope? It's already possible for a plan rule
>> to traverse each node's subtree if it wants.
>>
>> On Tue, Apr 24, 2018 at 10:18 AM, Marco Gaido 
>> wrote:
>>
>>> Hi all,
>>>
>>> working on SPARK-24051 I realized that currently in the Optimizer and in
>>> all the places where we are transforming a query plan, we are lacking the
>>> context information of what is in scope and what is not.
>>>
>>> Coming back to the ticket, the bug reported in the ticket is caused
>>> mainly by two reasons:
>>>  1 - we have two aliases in different places of the plan;
>>>  2 - (the focus of this email) we apply all the rules globally over the
>>> whole plan, without any notion of scope where something is
>>> reachable/visible or not.
>>>
>>> I will start with an easy example to explain what I mean. If we have a
>>> simple query like:
>>>
>>> select a, b from (
>>>   select 1 as a, 2 as b from table1
>>> union
>>>   select 3 as a, 4 as b from table2) q
>>>
>>> We produce a tree which is logically something like:
>>>
>>> Project0(a, b)
>>> -   Union
>>> --Project1 (a, b)
>>> --- ScanTable1
>>> --Project 2(a, b)
>>> --- ScanTable2
>>>
>>> So when we apply a transformation on Project1 for instance, we have no
>>> information about what is coming from ScanTable1 (or in general any node
>>> which is part of the subtree whose root is Project1): we miss a stateful
>>> transform which allows the children to tell the parent, grandparents, and
>>> so on what is in their scope. This is in particular true for the
>>> Attributes: in a node we have no idea if an Attribute comes from its
>>> subtree (it is in scope) or not.
>>>
>>> So, the point of this email is: do you think in general might be useful
>>> to introduce a way of navigating the tree which allows the children to keep
>>> a state to be used by their parents? Or do you think it is useful in
>>> general to introduce the concept of scope (if an attribute can be accessed
>>> by a node of a plan)?
>>>
>>> Thanks,
>>> Marco
>>>
>>>
>>>
>>
>


Re: Sorting on a streaming dataframe

2018-04-24 Thread Chayapan Khannabha
Perhaps your use case fits to Apache Kafka better.

More info at:
https://kafka.apache.org/documentation/streams/ 


Everything really comes down to the architecture design and algorithm spec. 
However, from my experience with Spark, there are many good reasons why this 
requirement is not supported ;)

Best,

Chayapan (A)


> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat  wrote:
> 
> Thanks Chris. There are many ways in which I can solve this problem but they 
> are cumbersome. The easiest way would have been to sort the streaming 
> dataframe. The reason I asked this question is because I could not find a 
> reason why sorting on streaming dataframe is disallowed. 
> 
> Hemant
> 
> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris  > wrote:
> You can happily sort the underlying RDD of InternalRow(s) inside a sink, 
> assuming you are willing to implement and maintain your own sink(s). That is, 
> just grabbing the parquet sink, etc. isn’t going to work out of the box. 
> Alternatively map/flatMapGroupsWithState is probably sufficient and requires 
> less working knowledge to make effective reuse of internals. Just group by 
> foo and then sort accordingly and assign ids. The id counter can be stateful 
> per group. Sometimes this problem may not need to be solved at all. For 
> example, if you are using kafka, a proper partitioning scheme and message 
> offsets may be “good enough”. 
> From: Hemant Bhanawat >
> Sent: Thursday, April 12, 2018 11:42:59 PM
> To: Reynold Xin
> Cc: dev
> Subject: Re: Sorting on a streaming dataframe
>  
> Well, we want to assign snapshot ids (incrementing counters) to the incoming 
> records. For that, we are zipping the streaming rdds with that counter using 
> a modified version of ZippedWithIndexRDD. We are ok if the records in the 
> streaming dataframe gets counters in random order but the counter should 
> always be incrementing. 
> 
> This is working fine until we have a failure. When we have a failure, we 
> re-assign the records to snapshot ids  and this time same snapshot id can get 
> assigned to a different record. This is a problem because the primary key in 
> our storage engine is . So we want to sort the 
> dataframe so that the records always get the same snapshot id. 
> 
> 
> 
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin  > wrote:
> Can you describe your use case more?
> 
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat  > wrote:
> Hi Guys, 
> 
> Why is sorting on streaming dataframes not supported(unless it is complete 
> mode)? My downstream needs me to sort the streaming dataframe.
> 
> Hemant 
> 
> 



Transform plan with scope

2018-04-24 Thread Marco Gaido
Hi all,

working on SPARK-24051 I realized that currently in the Optimizer and in
all the places where we are transforming a query plan, we are lacking the
context information of what is in scope and what is not.

Coming back to the ticket, the bug reported in the ticket is caused mainly
by two reasons:
 1 - we have two aliases in different places of the plan;
 2 - (the focus of this email) we apply all the rules globally over the
whole plan, without any notion of scope where something is
reachable/visible or not.

I will start with an easy example to explain what I mean. If we have a
simple query like:

select a, b from (
  select 1 as a, 2 as b from table1
union
  select 3 as a, 4 as b from table2) q

We produce a tree which is logically something like:

Project0(a, b)
-   Union
--Project1 (a, b)
--- ScanTable1
--Project 2(a, b)
--- ScanTable2

So when we apply a transformation on Project1 for instance, we have no
information about what is coming from ScanTable1 (or in general any node
which is part of the subtree whose root is Project1): we miss a stateful
transform which allows the children to tell the parent, grandparents, and
so on what is in their scope. This is in particular true for the
Attributes: in a node we have no idea if an Attribute comes from its
subtree (it is in scope) or not.

So, the point of this email is: do you think in general might be useful to
introduce a way of navigating the tree which allows the children to keep a
state to be used by their parents? Or do you think it is useful in general
to introduce the concept of scope (if an attribute can be accessed by a
node of a plan)?

Thanks,
Marco


Re: Sorting on a streaming dataframe

2018-04-24 Thread Arun Mahadevan
I guess sorting would make sense only when you have the complete data set. In 
streaming you don’t know what record is coming next so doesn’t make sense to 
sort it (except in the aggregated complete output mode where the entire result 
table is emitted each time and the results can be sorted).

Thanks,
Arun

From:  Hemant Bhanawat 
Date:  Tuesday, April 24, 2018 at 12:18 AM
To:  "Bowden, Chris" 
Cc:  Reynold Xin , dev 
Subject:  Re: Sorting on a streaming dataframe

Thanks Chris. There are many ways in which I can solve this problem but they 
are cumbersome. The easiest way would have been to sort the streaming 
dataframe. The reason I asked this question is because I could not find a 
reason why sorting on streaming dataframe is disallowed. 

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris  
wrote:
You can happily sort the underlying RDD of InternalRow(s) inside a sink, 
assuming you are willing to implement and maintain your own sink(s). That is, 
just grabbing the parquet sink, etc. isn’t going to work out of the box. 
Alternatively map/flatMapGroupsWithState is probably sufficient and requires 
less working knowledge to make effective reuse of internals. Just group by foo 
and then sort accordingly and assign ids. The id counter can be stateful per 
group. Sometimes this problem may not need to be solved at all. For example, if 
you are using kafka, a proper partitioning scheme and message offsets may be 
“good enough”. From: Hemant Bhanawat 
Sent: Thursday, April 12, 2018 11:42:59 PM
To: Reynold Xin
Cc: dev
Subject: Re: Sorting on a streaming dataframe
 
Well, we want to assign snapshot ids (incrementing counters) to the incoming 
records. For that, we are zipping the streaming rdds with that counter using a 
modified version of ZippedWithIndexRDD. We are ok if the records in the 
streaming dataframe gets counters in random order but the counter should always 
be incrementing. 

This is working fine until we have a failure. When we have a failure, we 
re-assign the records to snapshot ids  and this time same snapshot id can get 
assigned to a different record. This is a problem because the primary key in 
our storage engine is . So we want to sort the dataframe 
so that the records always get the same snapshot id. 



On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin  wrote:
Can you describe your use case more?

On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat  wrote:
Hi Guys, 

Why is sorting on streaming dataframes not supported(unless it is complete 
mode)? My downstream needs me to sort the streaming dataframe.

Hemant 





Re: Block Missing Exception while connecting Spark with HDP

2018-04-24 Thread Marco Gaido
Hi Jasbir,

As a first note, please if you are using a vendor distribution, please
contact the vendor for any issue you are facing. This mailing list is for
the community so we focus on the community edition of Spark.

Anyway, the error seems to be quite clear: your file on HDFS has a missing
block. This might happen if you loose a datanode or the block gets
corrupted and there are no more replicas available for that node. The exact
root cause of the problem is hard to tell but anyway you have to
investigate what is going on your HDFS. Spark has nothing to do with this
problem.

Thanks,
Marco

On Tue, 24 Apr 2018, 09:21 Sing, Jasbir,  wrote:

> i am using HDP2.6.3 and 2.6.4 and using the below code –
>
>
>
> 1. Creating sparkContext object
> 2. Reading a text file using – rdd =sc.textFile(“hdfs://
> 192.168.142.129:8020/abc/test1.txt”);
> 3. println(rdd.count);
>
> After executing the 3rd line i am getting the below error –
>
> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
> block: BP-32082187-172.17.0.2-1517480669419:blk_1073742897_2103
> file=/abc/test1.txt
> at
> org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:838)
> at
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:526)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:749)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
> at java.io.DataInputStream.read(Unknown Source)
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
>
> Can you please help me out in this.
>
>
>
> Regards,
>
> Jasbir Singh
>
>
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy. Your privacy is important to us.
> Accenture uses your personal data only in compliance with data protection
> laws. For further information on how Accenture processes your personal
> data, please see our privacy statement at
> https://www.accenture.com/us-en/privacy-policy.
>
> __
>
> www.accenture.com
>


Block Missing Exception while connecting Spark with HDP

2018-04-24 Thread Sing, Jasbir
i am using HDP2.6.3 and 2.6.4 and using the below code –



1. Creating sparkContext object
2. Reading a text file using – rdd 
=sc.textFile(“hdfs://192.168.142.129:8020/abc/test1.txt”);
3. println(rdd.count);

After executing the 3rd line i am getting the below error –

Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain 
block: BP-32082187-172.17.0.2-1517480669419:blk_1073742897_2103 
file=/abc/test1.txt
at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:838)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:526)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:749)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:793)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:246)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1143)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Can you please help me out in this.

Regards,
Jasbir Singh




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy. Your privacy is important to us. Accenture uses your personal data only 
in compliance with data protection laws. For further information on how 
Accenture processes your personal data, please see our privacy statement at 
https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


Re: Sorting on a streaming dataframe

2018-04-24 Thread Hemant Bhanawat
Thanks Chris. There are many ways in which I can solve this problem but
they are cumbersome. The easiest way would have been to sort the streaming
dataframe. The reason I asked this question is because I could not find a
reason why sorting on streaming dataframe is disallowed.

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris 
wrote:

> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
> assuming you are willing to implement and maintain your own sink(s). That
> is, just grabbing the parquet sink, etc. isn’t going to work out of the
> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
> requires less working knowledge to make effective reuse of internals. Just
> group by foo and then sort accordingly and assign ids. The id counter can
> be stateful per group. Sometimes this problem may not need to be solved at
> all. For example, if you are using kafka, a proper partitioning scheme and
> message offsets may be “good enough”.
> --
> *From:* Hemant Bhanawat 
> *Sent:* Thursday, April 12, 2018 11:42:59 PM
> *To:* Reynold Xin
> *Cc:* dev
> *Subject:* Re: Sorting on a streaming dataframe
>
> Well, we want to assign snapshot ids (incrementing counters) to the
> incoming records. For that, we are zipping the streaming rdds with that
> counter using a modified version of ZippedWithIndexRDD. We are ok if the
> records in the streaming dataframe gets counters in random order but the
> counter should always be incrementing.
>
> This is working fine until we have a failure. When we have a failure, we
> re-assign the records to snapshot ids  and this time same snapshot id can
> get assigned to a different record. This is a problem because the primary
> key in our storage engine is . So we want to sort the
> dataframe so that the records always get the same snapshot id.
>
>
>
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin  wrote:
>
> Can you describe your use case more?
>
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat 
> wrote:
>
> Hi Guys,
>
> Why is sorting on streaming dataframes not supported(unless it is complete
> mode)? My downstream needs me to sort the streaming dataframe.
>
> Hemant
>
>
>