Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Could it be that these messages were processed in the same micro batch? In
that case, watermark will be updated only after the batch finishes which
did not have any effect of the late data in the current batch.

On Tue, Feb 6, 2018 at 4:18 PM Jiewen Shao <fifistorm...@gmail.com> wrote:

> Ok, Thanks for confirmation.
>
> So based on my code, I have messages with following timestamps (converted
> to more readable format) in the following order:
>
> 2018-02-06 12:00:00
> 2018-02-06 12:00:01
> 2018-02-06 12:00:02
> 2018-02-06 12:00:03
> 2018-02-06 11:59:00  <-- this message should not be counted, right?
> however in my test, this one is still counted
>
>
>
> On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Yes, that is correct.
>>
>> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao <fifistorm...@gmail.com>
>> wrote:
>>
>>> Vishnu, thanks for the reply
>>> so "event time" and "window end time" have nothing to do with current
>>> system timestamp, watermark moves with the higher value of "timestamp"
>>> field of the input and never moves down, is that correct understanding?
>>>
>>>
>>> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> 20 second corresponds to when the window state should be cleared. For
>>>> the late message to be dropped, it should come in after you receive a
>>>> message with event time >= window end time + 20 seconds.
>>>>
>>>> I wrote a post on this recently:
>>>> http://vishnuviswanath.com/spark_structured_streaming.html#watermark
>>>>
>>>> Thanks,
>>>> Vishnu
>>>>
>>>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <fifistorm...@gmail.com>
>>>> wrote:
>>>>
>>>>> sample code:
>>>>>
>>>>> Let's say Xyz is POJO with a field called timestamp,
>>>>>
>>>>> regarding code withWatermark("timestamp", "20 seconds")
>>>>>
>>>>> I expect the msg with timestamp 20 seconds or older will be dropped,
>>>>> what does 20 seconds compare to? based on my test nothing was dropped no
>>>>> matter how old the timestamp is, what did i miss?
>>>>>
>>>>> Dataset xyz = lines
>>>>> .as(Encoders.STRING())
>>>>> .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, 
>>>>> Xyz.class), Encoders.bean(Xyz.class));
>>>>>
>>>>> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
>>>>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
>>>>> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>>>>> ).count();
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>
>>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Yes, that is correct.

On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao <fifistorm...@gmail.com> wrote:

> Vishnu, thanks for the reply
> so "event time" and "window end time" have nothing to do with current
> system timestamp, watermark moves with the higher value of "timestamp"
> field of the input and never moves down, is that correct understanding?
>
>
> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Hi
>>
>> 20 second corresponds to when the window state should be cleared. For the
>> late message to be dropped, it should come in after you receive a message
>> with event time >= window end time + 20 seconds.
>>
>> I wrote a post on this recently: http://vishnuviswana
>> th.com/spark_structured_streaming.html#watermark
>>
>> Thanks,
>> Vishnu
>>
>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <fifistorm...@gmail.com>
>> wrote:
>>
>>> sample code:
>>>
>>> Let's say Xyz is POJO with a field called timestamp,
>>>
>>> regarding code withWatermark("timestamp", "20 seconds")
>>>
>>> I expect the msg with timestamp 20 seconds or older will be dropped,
>>> what does 20 seconds compare to? based on my test nothing was dropped no
>>> matter how old the timestamp is, what did i miss?
>>>
>>> Dataset xyz = lines
>>> .as(Encoders.STRING())
>>> .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, 
>>> Xyz.class), Encoders.bean(Xyz.class));
>>>
>>> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
>>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
>>> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
>>> ).count();
>>>
>>> Thanks
>>>
>>>
>


Re: Spark Streaming withWatermark

2018-02-06 Thread Vishnu Viswanath
Hi

20 second corresponds to when the window state should be cleared. For the
late message to be dropped, it should come in after you receive a message
with event time >= window end time + 20 seconds.

I wrote a post on this recently:
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Thanks,
Vishnu

On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao  wrote:

> sample code:
>
> Let's say Xyz is POJO with a field called timestamp,
>
> regarding code withWatermark("timestamp", "20 seconds")
>
> I expect the msg with timestamp 20 seconds or older will be dropped, what
> does 20 seconds compare to? based on my test nothing was dropped no matter
> how old the timestamp is, what did i miss?
>
> Dataset xyz = lines
> .as(Encoders.STRING())
> .map((MapFunction) value -> mapper.readValue(value, 
> Xyz.class), Encoders.bean(Xyz.class));
>
> Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), 
> xyz.col("x") //tumbling window of size 5 seconds (timestamp)
> ).count();
>
> Thanks
>
>


Re: Apache Spark - Spark Structured Streaming - Watermark usage

2018-01-31 Thread Vishnu Viswanath
Hi Mans,

Watermark is Spark is used to decide when to clear the state, so if the
even it delayed more than when the state is cleared by Spark, then it will
be ignored.
I recently wrote a blog post on this :
http://vishnuviswanath.com/spark_structured_streaming.html#watermark

Yes, this State is applicable for aggregation only. If you are having only
a map function and don't want to process it, you could do a filter based on
its EventTime field, but I guess you will have to compare it with the
processing time since there is no API to access Watermark by the user.

-Vishnu

On Fri, Jan 26, 2018 at 1:14 PM, M Singh 
wrote:

> Hi:
>
> I am trying to filter out records which are lagging behind (based on event
> time) by a certain amount of time.
>
> Is the watermark api applicable to this scenario (ie, filtering lagging
> records) or it is only applicable with aggregation ?  I could not get a
> clear understanding from the documentation which only refers to it's usage
> with aggregation.
>
> Thanks
>
> Mans
>


Handling skewed data

2017-04-17 Thread Vishnu Viswanath
Hello All,

Does anyone know if the skew handling code mentioned in this talk
https://www.youtube.com/watch?v=bhYV0JOPd9Y was added to spark?

If so can I know where to look for more info, JIRA? Pull request?

Thanks in advance.
Regards,
Vishnu Viswanath.


Re: Installing Spark on Mac

2016-03-04 Thread Vishnu Viswanath
Installing spark on mac is similar to how you install it on Linux.

I use mac and have written a blog on how to install spark here is the link
: http://vishnuviswanath.com/spark_start.html

Hope this helps.

On Fri, Mar 4, 2016 at 2:29 PM, Simon Hafner  wrote:

> I'd try `brew install spark` or `apache-spark` and see where that gets
> you. https://github.com/Homebrew/homebrew
>
> 2016-03-04 21:18 GMT+01:00 Aida :
> > Hi all,
> >
> > I am a complete novice and was wondering whether anyone would be willing
> to
> > provide me with a step by step guide on how to install Spark on a Mac; on
> > standalone mode btw.
> >
> > I downloaded a prebuilt version, the second version from the top.
> However, I
> > have not installed Hadoop and am not planning to at this stage.
> >
> > I also downloaded Scala from the Scala website, do I need to download
> > anything else?
> >
> > I am very eager to learn more about Spark but am unsure about the best
> way
> > to do it.
> >
> > I would be happy for any suggestions or ideas
> >
> > Many thanks,
> >
> > Aida
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-on-Mac-tp26397.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Question about MEOMORY_AND_DISK persistence

2016-02-28 Thread Vishnu Viswanath
Thank you Ashwin.

On Sun, Feb 28, 2016 at 7:19 PM, Ashwin Giridharan <ashwin.fo...@gmail.com>
wrote:

> Hi Vishnu,
>
> A partition will either be in memory or in disk.
>
> -Ashwin
> On Feb 28, 2016 15:09, "Vishnu Viswanath" <vishnu.viswanat...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have a question regarding Persistence (MEMORY_AND_DISK)
>>
>> Suppose I am trying to persist an RDD which has 2 partitions and only 1
>> partition can be fit in memory completely but some part of partition 2 can
>> also be fit, will spark keep the portion of partition 2 in memory and rest
>> in disk, or will the whole 2nd partition be kept in disk.
>>
>> Regards,
>> Vishnu
>>
>


-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Question about MEOMORY_AND_DISK persistence

2016-02-28 Thread Vishnu Viswanath
Hi All,

I have a question regarding Persistence (MEMORY_AND_DISK)

Suppose I am trying to persist an RDD which has 2 partitions and only 1
partition can be fit in memory completely but some part of partition 2 can
also be fit, will spark keep the portion of partition 2 in memory and rest
in disk, or will the whole 2nd partition be kept in disk.

Regards,
Vishnu


Question on RDD caching

2016-02-04 Thread Vishnu Viswanath
Hello,

When we call cache() or persist(MEMORY_ONLY), how does the request flow to
the nodes?
I am assuming this will happen:

1.  Driver knows which all nodes hold the partition for the given
rdd (where is this info stored?)
2. It sends a cache request to the node's executor
3. The executor will store the Partition in memory
4. Therefore, each node can have partitions of different RDDs in it's cache.

Can someone please tell me if I am correct.

Thanks and Regards,
Vishnu Viswanath,


Re: Spark ML Random Forest output.

2015-12-04 Thread Vishnu Viswanath
Hi,

As per my understanding the probability matrix is giving the probability
that that particular item can belong to each class. So the one with highest
probability is your predicted class.

Since you have converted you label to index label, according the model the
classes are 0.0 to 9.0 and I see you are getting prediction as a value
which is in [0.0,1.0,,9.0] -  which is correct.

So what you want is a reverse map that can convert your predicted class
back to the String. I don't know if  StringIndexer has such an option, may
be you can create your own map and reverse map of (label to index) and
(index to label) and use this for getting back your original label.

May be there is better way to do this..

Regards,
Vishnu

On Fri, Dec 4, 2015 at 4:56 PM, Eugene Morozov 
wrote:

> Hello,
>
> I've got an input dataset of handwritten digits and working java code that
> uses random forest classification algorithm to determine the numbers. My
> test set is just some lines from the same input dataset - just to be sure
> I'm doing the right thing. My understanding is that having correct
> classifier in this case would give me the correct prediction.
> At the moment overfitting is not an issue.
>
> After applying StringIndexer to my input DataFrame I've applied an ugly
> trick and got "indexedLabel" metadata:
>
> {"ml_attr":{"vals":["1.0","7.0","3.0","9.0","2.0","6.0","0.0","4.0","8.0","5.0"],"type":"nominal","name":"indexedLabel"}}
>
>
> So, my algorithm gives me the following result. The question is I'm not
> sure I understand the meaning of the "prediction" here in the output. It
> looks like it's just an index of the highest probability value in the
> "prob" array. Shouldn't "prediction" be the actual class, i.e. one of the
> "0.0", "1.0", ..., "9.0"? If the prediction is just an ordinal number, then
> I have to manually correspond it to my classes, but for that I have to
> either specify classes manually to know their order or somehow be able to
> get them out of the classifier. Both of these way seem to be are not
> accessible.
>
> (4.0 -> prediction=7.0,
> prob=[0.004708283878223195,0.08478236104777455,0.03594642191080532,0.19286506771018885,0.038304389235523435,0.02841307797386,0.003334431932056404,0.5685242322346109,0.018564705500837587,0.024557028569980155]
> (9.0 -> prediction=3.0,
> prob=[0.018432404716456248,0.16837195846781422,0.05995559403934031,0.32282148259583565,0.018374168600855455,0.04792285114398864,0.018226352623526704,0.1611650363085499,0.11703073969440755,0.06769941180922535]
> (2.0 -> prediction=4.0,
> prob=[0.017918245251872154,0.029243677407669404,0.06228050320552064,0.03633295481094746,0.45707974962418885,0.09675606366289394,0.03921437851648226,0.043917057390743426,0.14132883075087405,0.0759285393788078]
>
> So, what is the prediction here? How can I specify classes manually or get
> the valid access to them?
> --
> Be well!
> Jean Morozov
>


Re: General question on using StringIndexer in SparkML

2015-12-02 Thread Vishnu Viswanath
Thank you Yanbo,

It looks like this is available in 1.6 version only.
Can you tell me how/when can I download version 1.6?

Thanks and Regards,
Vishnu Viswanath,

On Wed, Dec 2, 2015 at 4:37 AM, Yanbo Liang <yblia...@gmail.com> wrote:

> You can set "handleInvalid" to "skip" which help you skip the labels which
> not exist in training dataset.
>
> 2015-12-02 14:31 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com>
> :
>
>> Hi Jeff,
>>
>> I went through the link you provided and I could understand how the fit()
>> and transform() work.
>> I tried to use the pipeline in my code and I am getting exception  Caused
>> by: org.apache.spark.SparkException: Unseen label:
>>
>> The reason for this error as per my understanding is:
>> For the column on which I am doing StringIndexing, the test data is
>> having values which was not there in train data.
>> Since fit() is done only on the train data, the indexing is failing.
>>
>> Can you suggest me what can be done in this situation.
>>
>> Thanks,
>>
>> On Mon, Nov 30, 2015 at 12:32 AM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>> Thank you Jeff.
>>>
>>> On Sun, Nov 29, 2015 at 7:36 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> StringIndexer is an estimator which would train a model to be used both
>>>> in training & prediction. So it is consistent between training & 
>>>> prediction.
>>>>
>>>> You may want to read this section of spark ml doc
>>>> http://spark.apache.org/docs/latest/ml-guide.html#how-it-works
>>>>
>>>>
>>>>
>>>> On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
>>>> vishnu.viswanat...@gmail.com> wrote:
>>>>
>>>>> Thanks for the reply Yanbo.
>>>>>
>>>>> I understand that the model will be trained using the indexer map
>>>>> created during the training stage.
>>>>>
>>>>> But since I am getting a new set of data during prediction, and I have
>>>>> to do StringIndexing on the new data also,
>>>>> Right now I am using a new StringIndexer for this purpose, or is there
>>>>> any way that I can reuse the Indexer used for training stage.
>>>>>
>>>>> Note: I am having a pipeline with StringIndexer in it, and I am
>>>>> fitting my train data in it and building the model. Then later when i get
>>>>> the new data for prediction, I am using the same pipeline to fit the data
>>>>> again and do the prediction.
>>>>>
>>>>> Thanks and Regards,
>>>>> Vishnu Viswanath
>>>>>
>>>>>
>>>>> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishnu,
>>>>>>
>>>>>> The string and indexer map is generated at model training step and
>>>>>> used at model prediction step.
>>>>>> It means that the string and indexer map will not changed when
>>>>>> prediction. You will use the original trained model when you do
>>>>>> prediction.
>>>>>>
>>>>>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <
>>>>>> vishnu.viswanat...@gmail.com>:
>>>>>> > Hi All,
>>>>>> >
>>>>>> > I have a general question on using StringIndexer.
>>>>>> > StringIndexer gives an index to each label in the feature starting
>>>>>> from 0 (
>>>>>> > 0 for least frequent word).
>>>>>> >
>>>>>> > Suppose I am building a model, and I use StringIndexer for
>>>>>> transforming on
>>>>>> > of my column.
>>>>>> > e.g., suppose A was most frequent word followed by B and C.
>>>>>> >
>>>>>> > So the StringIndexer will generate
>>>>>> >
>>>>>> > A  0.0
>>>>>> > B  1.0
>>>>>> > C  2.0
>>>>>> >
>>>>>> > After building the model, I am going to do some prediction using
>>>>>> this model,
>>>>>> > So I do the same transformation on my new data which I need to
>>>>>> predict. And
>>>>>> > suppose the new dataset has C as the most frequent word, followed
>>>>>> by B and
>>>>>> > A. So the StringIndexer will assign index as
>>>>>> >
>>>>>> > C 0.0
>>>>>> > B 1.0
>>>>>> > A 2.0
>>>>>> >
>>>>>> > These indexes are different from what we used for modeling. So
>>>>>> won’t this
>>>>>> > give me a wrong prediction if I use StringIndexer?
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>>
>>> ​
>>
>
>


Re: General question on using StringIndexer in SparkML

2015-12-02 Thread Vishnu Viswanath
Thank you.

On Wed, Dec 2, 2015 at 8:12 PM, Yanbo Liang <yblia...@gmail.com> wrote:

> You can get 1.6.0-RC1 from
> http://people.apache.org/~pwendell/spark-releases/spark-v1.6.0-rc1-bin/
> currently, but it's not the last release version.
>
> 2015-12-02 23:57 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com>
> :
>
>> Thank you Yanbo,
>>
>> It looks like this is available in 1.6 version only.
>> Can you tell me how/when can I download version 1.6?
>>
>> Thanks and Regards,
>> Vishnu Viswanath,
>>
>> On Wed, Dec 2, 2015 at 4:37 AM, Yanbo Liang <yblia...@gmail.com> wrote:
>>
>>> You can set "handleInvalid" to "skip" which help you skip the labels
>>> which not exist in training dataset.
>>>
>>> 2015-12-02 14:31 GMT+08:00 Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com>:
>>>
>>>> Hi Jeff,
>>>>
>>>> I went through the link you provided and I could understand how the
>>>> fit() and transform() work.
>>>> I tried to use the pipeline in my code and I am getting exception  Caused
>>>> by: org.apache.spark.SparkException: Unseen label:
>>>>
>>>> The reason for this error as per my understanding is:
>>>> For the column on which I am doing StringIndexing, the test data is
>>>> having values which was not there in train data.
>>>> Since fit() is done only on the train data, the indexing is failing.
>>>>
>>>> Can you suggest me what can be done in this situation.
>>>>
>>>> Thanks,
>>>>
>>>> On Mon, Nov 30, 2015 at 12:32 AM, Vishnu Viswanath <
>>>> vishnu.viswanat...@gmail.com> wrote:
>>>>
>>>> Thank you Jeff.
>>>>>
>>>>> On Sun, Nov 29, 2015 at 7:36 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>
>>>>>> StringIndexer is an estimator which would train a model to be used
>>>>>> both in training & prediction. So it is consistent between training &
>>>>>> prediction.
>>>>>>
>>>>>> You may want to read this section of spark ml doc
>>>>>> http://spark.apache.org/docs/latest/ml-guide.html#how-it-works
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
>>>>>> vishnu.viswanat...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the reply Yanbo.
>>>>>>>
>>>>>>> I understand that the model will be trained using the indexer map
>>>>>>> created during the training stage.
>>>>>>>
>>>>>>> But since I am getting a new set of data during prediction, and I
>>>>>>> have to do StringIndexing on the new data also,
>>>>>>> Right now I am using a new StringIndexer for this purpose, or is
>>>>>>> there any way that I can reuse the Indexer used for training stage.
>>>>>>>
>>>>>>> Note: I am having a pipeline with StringIndexer in it, and I am
>>>>>>> fitting my train data in it and building the model. Then later when i 
>>>>>>> get
>>>>>>> the new data for prediction, I am using the same pipeline to fit the 
>>>>>>> data
>>>>>>> again and do the prediction.
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Vishnu Viswanath
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vishnu,
>>>>>>>>
>>>>>>>> The string and indexer map is generated at model training step and
>>>>>>>> used at model prediction step.
>>>>>>>> It means that the string and indexer map will not changed when
>>>>>>>> prediction. You will use the original trained model when you do
>>>>>>>> prediction.
>>>>>>>>
>>>>>>>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <
>>>>>>>> vishnu.viswanat...@gmail.com>:
>>>>>>>> > Hi All,
>>>>>>>> >
>>>>>>>> > I have a general question on using StringIndexer.
>>>>>>>> > StringIndexer gives an index to each label in the feature
>>>>>>>> starting from 0 (
>>>>>>>> > 0 for least frequent word).
>>>>>>>> >
>>>>>>>> > Suppose I am building a model, and I use StringIndexer for
>>>>>>>> transforming on
>>>>>>>> > of my column.
>>>>>>>> > e.g., suppose A was most frequent word followed by B and C.
>>>>>>>> >
>>>>>>>> > So the StringIndexer will generate
>>>>>>>> >
>>>>>>>> > A  0.0
>>>>>>>> > B  1.0
>>>>>>>> > C  2.0
>>>>>>>> >
>>>>>>>> > After building the model, I am going to do some prediction using
>>>>>>>> this model,
>>>>>>>> > So I do the same transformation on my new data which I need to
>>>>>>>> predict. And
>>>>>>>> > suppose the new dataset has C as the most frequent word, followed
>>>>>>>> by B and
>>>>>>>> > A. So the StringIndexer will assign index as
>>>>>>>> >
>>>>>>>> > C 0.0
>>>>>>>> > B 1.0
>>>>>>>> > A 2.0
>>>>>>>> >
>>>>>>>> > These indexes are different from what we used for modeling. So
>>>>>>>> won’t this
>>>>>>>> > give me a wrong prediction if I use StringIndexer?
>>>>>>>> >
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ​
>>>>
>>>
>>>
>>
>


-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Re: General question on using StringIndexer in SparkML

2015-12-01 Thread Vishnu Viswanath
Hi Jeff,

I went through the link you provided and I could understand how the fit()
and transform() work.
I tried to use the pipeline in my code and I am getting exception  Caused
by: org.apache.spark.SparkException: Unseen label:

The reason for this error as per my understanding is:
For the column on which I am doing StringIndexing, the test data is having
values which was not there in train data.
Since fit() is done only on the train data, the indexing is failing.

Can you suggest me what can be done in this situation.

Thanks,

On Mon, Nov 30, 2015 at 12:32 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

Thank you Jeff.
>
> On Sun, Nov 29, 2015 at 7:36 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> StringIndexer is an estimator which would train a model to be used both
>> in training & prediction. So it is consistent between training & prediction.
>>
>> You may want to read this section of spark ml doc
>> http://spark.apache.org/docs/latest/ml-guide.html#how-it-works
>>
>>
>>
>> On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>>> Thanks for the reply Yanbo.
>>>
>>> I understand that the model will be trained using the indexer map
>>> created during the training stage.
>>>
>>> But since I am getting a new set of data during prediction, and I have
>>> to do StringIndexing on the new data also,
>>> Right now I am using a new StringIndexer for this purpose, or is there
>>> any way that I can reuse the Indexer used for training stage.
>>>
>>> Note: I am having a pipeline with StringIndexer in it, and I am fitting
>>> my train data in it and building the model. Then later when i get the new
>>> data for prediction, I am using the same pipeline to fit the data again and
>>> do the prediction.
>>>
>>> Thanks and Regards,
>>> Vishnu Viswanath
>>>
>>>
>>> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com> wrote:
>>>
>>>> Hi Vishnu,
>>>>
>>>> The string and indexer map is generated at model training step and
>>>> used at model prediction step.
>>>> It means that the string and indexer map will not changed when
>>>> prediction. You will use the original trained model when you do
>>>> prediction.
>>>>
>>>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <
>>>> vishnu.viswanat...@gmail.com>:
>>>> > Hi All,
>>>> >
>>>> > I have a general question on using StringIndexer.
>>>> > StringIndexer gives an index to each label in the feature starting
>>>> from 0 (
>>>> > 0 for least frequent word).
>>>> >
>>>> > Suppose I am building a model, and I use StringIndexer for
>>>> transforming on
>>>> > of my column.
>>>> > e.g., suppose A was most frequent word followed by B and C.
>>>> >
>>>> > So the StringIndexer will generate
>>>> >
>>>> > A  0.0
>>>> > B  1.0
>>>> > C  2.0
>>>> >
>>>> > After building the model, I am going to do some prediction using this
>>>> model,
>>>> > So I do the same transformation on my new data which I need to
>>>> predict. And
>>>> > suppose the new dataset has C as the most frequent word, followed by
>>>> B and
>>>> > A. So the StringIndexer will assign index as
>>>> >
>>>> > C 0.0
>>>> > B 1.0
>>>> > A 2.0
>>>> >
>>>> > These indexes are different from what we used for modeling. So won’t
>>>> this
>>>> > give me a wrong prediction if I use StringIndexer?
>>>> >
>>>> >
>>>>
>>>
>>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> ​


Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Vishnu Viswanath
Thanks for the reply Yanbo.

I understand that the model will be trained using the indexer map created
during the training stage.

But since I am getting a new set of data during prediction, and I have to
do StringIndexing on the new data also,
Right now I am using a new StringIndexer for this purpose, or is there any
way that I can reuse the Indexer used for training stage.

Note: I am having a pipeline with StringIndexer in it, and I am fitting my
train data in it and building the model. Then later when i get the new data
for prediction, I am using the same pipeline to fit the data again and do
the prediction.

Thanks and Regards,
Vishnu Viswanath


On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com> wrote:

> Hi Vishnu,
>
> The string and indexer map is generated at model training step and
> used at model prediction step.
> It means that the string and indexer map will not changed when
> prediction. You will use the original trained model when you do
> prediction.
>
> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com>:
> > Hi All,
> >
> > I have a general question on using StringIndexer.
> > StringIndexer gives an index to each label in the feature starting from
> 0 (
> > 0 for least frequent word).
> >
> > Suppose I am building a model, and I use StringIndexer for transforming
> on
> > of my column.
> > e.g., suppose A was most frequent word followed by B and C.
> >
> > So the StringIndexer will generate
> >
> > A  0.0
> > B  1.0
> > C  2.0
> >
> > After building the model, I am going to do some prediction using this
> model,
> > So I do the same transformation on my new data which I need to predict.
> And
> > suppose the new dataset has C as the most frequent word, followed by B
> and
> > A. So the StringIndexer will assign index as
> >
> > C 0.0
> > B 1.0
> > A 2.0
> >
> > These indexes are different from what we used for modeling. So won’t this
> > give me a wrong prediction if I use StringIndexer?
> >
> > --
> > Thanks and Regards,
> > Vishnu Viswanath,
> > www.vishnuviswanath.com
>



-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Vishnu Viswanath
Thank you Jeff.

On Sun, Nov 29, 2015 at 7:36 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> StringIndexer is an estimator which would train a model to be used both in
> training & prediction. So it is consistent between training & prediction.
>
> You may want to read this section of spark ml doc
> http://spark.apache.org/docs/latest/ml-guide.html#how-it-works
>
>
>
> On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Thanks for the reply Yanbo.
>>
>> I understand that the model will be trained using the indexer map created
>> during the training stage.
>>
>> But since I am getting a new set of data during prediction, and I have to
>> do StringIndexing on the new data also,
>> Right now I am using a new StringIndexer for this purpose, or is there
>> any way that I can reuse the Indexer used for training stage.
>>
>> Note: I am having a pipeline with StringIndexer in it, and I am fitting
>> my train data in it and building the model. Then later when i get the new
>> data for prediction, I am using the same pipeline to fit the data again and
>> do the prediction.
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>>
>>
>> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com> wrote:
>>
>>> Hi Vishnu,
>>>
>>> The string and indexer map is generated at model training step and
>>> used at model prediction step.
>>> It means that the string and indexer map will not changed when
>>> prediction. You will use the original trained model when you do
>>> prediction.
>>>
>>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com
>>> >:
>>> > Hi All,
>>> >
>>> > I have a general question on using StringIndexer.
>>> > StringIndexer gives an index to each label in the feature starting
>>> from 0 (
>>> > 0 for least frequent word).
>>> >
>>> > Suppose I am building a model, and I use StringIndexer for
>>> transforming on
>>> > of my column.
>>> > e.g., suppose A was most frequent word followed by B and C.
>>> >
>>> > So the StringIndexer will generate
>>> >
>>> > A  0.0
>>> > B  1.0
>>> > C  2.0
>>> >
>>> > After building the model, I am going to do some prediction using this
>>> model,
>>> > So I do the same transformation on my new data which I need to
>>> predict. And
>>> > suppose the new dataset has C as the most frequent word, followed by B
>>> and
>>> > A. So the StringIndexer will assign index as
>>> >
>>> > C 0.0
>>> > B 1.0
>>> > A 2.0
>>> >
>>> > These indexes are different from what we used for modeling. So won’t
>>> this
>>> > give me a wrong prediction if I use StringIndexer?
>>> >
>>> > --
>>> > Thanks and Regards,
>>> > Vishnu Viswanath,
>>> > www.vishnuviswanath.com
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>> Vishnu Viswanath,
>> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


General question on using StringIndexer in SparkML

2015-11-28 Thread Vishnu Viswanath
Hi All,

I have a general question on using StringIndexer.
StringIndexer gives an index to each label in the feature starting from 0 (
0 for least frequent word).

Suppose I am building a model, and I use StringIndexer for transforming on
of my column.
e.g., suppose A was most frequent word followed by B and C.

So the StringIndexer will generate

A  0.0
B  1.0
C  2.0

After building the model, I am going to do some prediction using this
model, So I do the same transformation on my new data which I need to
predict. And suppose the new dataset has C as the most frequent word,
followed by B and A. So the StringIndexer will assign index as

C 0.0
B 1.0
A 2.0

These indexes are different from what we used for modeling. So won’t this
give me a wrong prediction if I use StringIndexer?
​
-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Adding new column to Dataframe

2015-11-25 Thread Vishnu Viswanath
Hi,

I am trying to add the row number to a spark dataframe.
This is my dataframe:

scala> df.printSchema
root
|-- line: string (nullable = true)

I tried to use df.withColumn but I am getting below exception.

scala> df.withColumn("row",rowNumber)
org.apache.spark.sql.AnalysisException: unresolved operator 'Project
[line#2326,'row_number() AS row#2327];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)

Also, is it possible to add a column from one dataframe to another?
something like

scala> df.withColumn("line2",df2("line"))

org.apache.spark.sql.AnalysisException: resolved attribute(s)
line#2330 missing from line#2326 in operator !Project
[line#2326,line#2330 AS line2#2331];

​

Thanks and Regards,
Vishnu Viswanath
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Re: Adding new column to Dataframe

2015-11-25 Thread Vishnu Viswanath
Thanks Jeff,

rowNumber is a function in org.apache.spark.sql.functions link
<https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$>

I will try to use monotonicallyIncreasingId and see if it works.

You’d better to use join to correlate 2 data frames : Yes, thats why I
thought of adding row number in both the DataFrames and join them based on
row number. Is there any better way of doing this? Both DataFrames will
have same number of rows always, but are not related by any column to do
join.

Thanks and Regards,
Vishnu Viswanath
​

On Wed, Nov 25, 2015 at 6:43 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> >>> I tried to use df.withColumn but I am getting below exception.
>
> What is rowNumber here ? UDF ?  You can use monotonicallyIncreasingId
> for generating id
>
> >>> Also, is it possible to add a column from one dataframe to another?
>
> You can't, because how can you add one dataframe to another if they have
> different number of rows. You'd better to use join to correlate 2 data
> frames.
>
> On Thu, Nov 26, 2015 at 6:39 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to add the row number to a spark dataframe.
>> This is my dataframe:
>>
>> scala> df.printSchema
>> root
>> |-- line: string (nullable = true)
>>
>> I tried to use df.withColumn but I am getting below exception.
>>
>> scala> df.withColumn("row",rowNumber)
>> org.apache.spark.sql.AnalysisException: unresolved operator 'Project 
>> [line#2326,'row_number() AS row#2327];
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
>> at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
>>
>> Also, is it possible to add a column from one dataframe to another?
>> something like
>>
>> scala> df.withColumn("line2",df2("line"))
>>
>> org.apache.spark.sql.AnalysisException: resolved attribute(s) line#2330 
>> missing from line#2326 in operator !Project [line#2326,line#2330 AS 
>> line2#2331];
>>
>> ​
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Thanks and Regards,
Vishnu Viswanath
+1 309 550 2311
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


Re: Adding new column to Dataframe

2015-11-25 Thread Vishnu Viswanath
Thanks Ted,

It looks like I cannot use row_number then. I tried to run a sample window
function and got below error
org.apache.spark.sql.AnalysisException: Could not resolve window function
'avg'. Note that, using window functions currently requires a HiveContext;

On Wed, Nov 25, 2015 at 8:28 PM, Ted Yu <yuzhih...@gmail.com> wrote:

Vishnu:
> rowNumber (deprecated, replaced with row_number) is a window function.
>
>* Window function: returns a sequential number starting at 1 within a
> window partition.
>*
>* @group window_funcs
>* @since 1.6.0
>*/
>   def row_number(): Column = withExpr {
> UnresolvedWindowFunction("row_number", Nil) }
>
> Sample usage:
>
> df =  sqlContext.range(1<<20)
> df2 = df.select((df.id % 1000).alias("A"), (df.id / 1000).alias('B'))
> ws = Window.partitionBy(df2.A).orderBy(df2.B)
> df3 = df2.select("client", "date",
> rowNumber().over(ws).alias("rn")).filter("rn < 0")
>
> Cheers
>
> On Wed, Nov 25, 2015 at 5:08 PM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Thanks Jeff,
>>
>> rowNumber is a function in org.apache.spark.sql.functions link
>> <https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$>
>>
>> I will try to use monotonicallyIncreasingId and see if it works.
>>
>> You’d better to use join to correlate 2 data frames : Yes, thats why I
>> thought of adding row number in both the DataFrames and join them based on
>> row number. Is there any better way of doing this? Both DataFrames will
>> have same number of rows always, but are not related by any column to do
>> join.
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>> ​
>>
>> On Wed, Nov 25, 2015 at 6:43 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> >>> I tried to use df.withColumn but I am getting below exception.
>>>
>>> What is rowNumber here ? UDF ?  You can use monotonicallyIncreasingId
>>> for generating id
>>>
>>> >>> Also, is it possible to add a column from one dataframe to another?
>>>
>>> You can't, because how can you add one dataframe to another if they have
>>> different number of rows. You'd better to use join to correlate 2 data
>>> frames.
>>>
>>> On Thu, Nov 26, 2015 at 6:39 AM, Vishnu Viswanath <
>>> vishnu.viswanat...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to add the row number to a spark dataframe.
>>>> This is my dataframe:
>>>>
>>>> scala> df.printSchema
>>>> root
>>>> |-- line: string (nullable = true)
>>>>
>>>> I tried to use df.withColumn but I am getting below exception.
>>>>
>>>> scala> df.withColumn("row",rowNumber)
>>>> org.apache.spark.sql.AnalysisException: unresolved operator 'Project 
>>>> [line#2326,'row_number() AS row#2327];
>>>> at 
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
>>>> at 
>>>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>>>> at 
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
>>>> at 
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
>>>>
>>>> Also, is it possible to add a column from one dataframe to another?
>>>> something like
>>>>
>>>> scala> df.withColumn("line2",df2("line"))
>>>>
>>>> org.apache.spark.sql.AnalysisException: resolved attribute(s) line#2330 
>>>> missing from line#2326 in operator !Project [line#2326,line#2330 AS 
>>>> line2#2331];
>>>>
>>>> ​
>>>>
>>>> Thanks and Regards,
>>>> Vishnu Viswanath
>>>> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>> Vishnu Viswanath
>> +1 309 550 2311
>> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>>
>
> ​
-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*


how to us DataFrame.na.fill based on condition

2015-11-23 Thread Vishnu Viswanath
Hi

Can someone tell me if there is a way I can use the fill method
in DataFrameNaFunctions based on some condition.

e.g., df.na.fill("value1","column1","condition1")
df.na.fill("value2","column1","condition2")

i want to fill nulls in column1 with values - either value 1 or value 2,
based on some condition.

Thanks,


Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Vishnu Viswanath
Thanks for the reply Davies

I think replace, replaces a value with another value. But what I want to do
is fill in the null value of a column.( I don't have a to_replace here )

Regards,
Vishnu

On Mon, Nov 23, 2015 at 1:37 PM, Davies Liu <dav...@databricks.com> wrote:

> DataFrame.replace(to_replace, value, subset=None)
>
>
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
>
> On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
> <vishnu.viswanat...@gmail.com> wrote:
> > Hi
> >
> > Can someone tell me if there is a way I can use the fill method in
> > DataFrameNaFunctions based on some condition.
> >
> > e.g., df.na.fill("value1","column1","condition1")
> > df.na.fill("value2","column1","condition2")
> >
> > i want to fill nulls in column1 with values - either value 1 or value 2,
> > based on some condition.
> >
> > Thanks,
>



-- 
Thanks and Regards,
Vishnu Viswanath
+1 309 550 2311
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*