Sorry, I don't follow. What did you expect. I don't see a bug.

The data looks sorted within groups, which is all sortBy does.

Note, you don't need forceToReducers here. Sorting can only be done on the
reducers.
On Sat, Sep 10, 2016 at 15:12 ravi kiran holur vijay <[email protected]>
wrote:

> Hello,
>
> I am noticing strange behaviour in my Scalding job which uses groupby and
> sortby. If I do not have a .sortBy function, each of the reducers are
> getting all of the values for the same group key. However, if I use
> .sortBy, each reducer is getting only part of the values for the same group
> key. I was wondering if any of you have run into a similar issue before or
> have a hypothesis about what's happening?
>
> Case 1: Observed behaviour = Expected behaviour, without using sortBy
>
> *Reducer 1 output*:
>
> Processing data for group ... 1
> Initializing FM Model with existing parameters ...
> Processing model param ... o
> Processing model param ... w
> Processing model param ... r
> Processing model param ... s
> Processing model param ... t
> Processing model param ... l
> Processing model param ... f
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged
> models=1
>
> *Reducer 2 output*:
> Processing data for group ... 0
> Initializing FM Model with existing parameters ...
> Processing model param ... o
> Processing model param ... w
> Processing model param ... r
> Processing model param ... s
> Processing model param ... t
> Processing model param ... l
> Processing model param ... f
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged
> models=1
>
> Case 2: Observed behaviour != Expected behaviour, after using sortBy
> *Reducer 1 output*
> Processing data for group ... 0
> Initializing FM Model with existing parameters ...
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0
> Processing data for group ... 1
> Initializing FM Model with existing parameters ...
> Processing model param ... f
> Processing model param ... l
> Processing model param ... o
> Processing model param ... r
> Processing model param ... s
> Processing model param ... t
> Processing model param ... w
> Initialized FM Model with: w0=-0.181250, w=34531087, v=0, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged
> models=1
>
> *Reducer 2 output*
> Processing data for group ... 0
> Initializing FM Model with existing parameters ...
> Processing model param ... f
> Processing model param ... l
> Processing model param ... o
> Processing model param ... v
> Processing model param ... r
> Processing model param ... s
> Processing model param ... t
> Processing model param ... w
> Initialized FM Model with: w0=-0.181250, w=34531087, v=1, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged
> models=1
> Processing data for group ... 1
> Initializing FM Model with existing parameters ...
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Processing model param ... v
> Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000,
> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0
>
> *Code*
>
> val data: TypedPipe[(Int, Float, Either[FeatureVector, FMModelParameter])] = 
> modelData
> val fmModels: SortedGrouped[Int, FMModel] = data
>   .groupBy { case (id1, id2, modelParam) => id1 }
>   .sortBy { case (id1, id2, modelParam) => id2 }
>   .forceToReducers
>   //Secondary is needed to ensure model parameters appear before actual 
> training data
>   //TODO: This sortby is causing problems and has a bug
>   .mapGroup {
>   case (groupId, records) =>
>     println("Processing data for group ... " + groupId)
>     val trainedModel = aggregateAndUpdateModel(records)
>     Iterator(trainedModel)
> }
>
> --
> You received this message because you are subscribed to the Google Groups
> "Scalding Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to