Wait, sorry. Looking more carefully. Is the bug that originally all data with id 0 was in one reducer but with sorting it winds up on both? That would be a bug. What version of scalding is this?
Can you replicate this bug in a minimal case? Sorting should not change how the keys are paritioned to reducer (which is done by hashCode of the key, which is the same, I suppose). Basically the test you want to write is that after groupBy with sortBy if you take only the keys in the output each key appears exactly once. I have a hard time believing there could have been a bug like this that we didn't notice for 5 years but I guess it is possible. On Sat, Sep 10, 2016 at 17:33 ravi kiran holur vijay <[email protected]> wrote: > Hey Oscar, > > Sorry, sounds like I might have misunderstood the semantics of groupBy > followed by sortBy. > Is there a way to make sure ALL records having the same key end up at the > same reducer (what groupBy does) and within each reducer, have it sorted by > value (what sortBy does)? > > -Ravi > > On Sat, Sep 10, 2016 at 8:06 PM, Oscar Boykin <[email protected]> wrote: > >> Sorry, I don't follow. What did you expect. I don't see a bug. >> >> The data looks sorted within groups, which is all sortBy does. >> >> Note, you don't need forceToReducers here. Sorting can only be done on >> the reducers. >> On Sat, Sep 10, 2016 at 15:12 ravi kiran holur vijay < >> [email protected]> wrote: >> >>> Hello, >>> >>> I am noticing strange behaviour in my Scalding job which uses groupby >>> and sortby. If I do not have a .sortBy function, each of the reducers are >>> getting all of the values for the same group key. However, if I use >>> .sortBy, each reducer is getting only part of the values for the same group >>> key. I was wondering if any of you have run into a similar issue before or >>> have a hypothesis about what's happening? >>> >>> Case 1: Observed behaviour = Expected behaviour, without using sortBy >>> >>> *Reducer 1 output*: >>> >>> Processing data for group ... 1 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... o >>> Processing model param ... w >>> Processing model param ... r >>> Processing model param ... s >>> Processing model param ... t >>> Processing model param ... l >>> Processing model param ... f >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged >>> models=1 >>> >>> *Reducer 2 output*: >>> Processing data for group ... 0 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... o >>> Processing model param ... w >>> Processing model param ... r >>> Processing model param ... s >>> Processing model param ... t >>> Processing model param ... l >>> Processing model param ... f >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged >>> models=1 >>> >>> Case 2: Observed behaviour != Expected behaviour, after using sortBy >>> *Reducer 1 output* >>> Processing data for group ... 0 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0 >>> Processing data for group ... 1 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... f >>> Processing model param ... l >>> Processing model param ... o >>> Processing model param ... r >>> Processing model param ... s >>> Processing model param ... t >>> Processing model param ... w >>> Initialized FM Model with: w0=-0.181250, w=34531087, v=0, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged >>> models=1 >>> >>> *Reducer 2 output* >>> Processing data for group ... 0 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... f >>> Processing model param ... l >>> Processing model param ... o >>> Processing model param ... v >>> Processing model param ... r >>> Processing model param ... s >>> Processing model param ... t >>> Processing model param ... w >>> Initialized FM Model with: w0=-0.181250, w=34531087, v=1, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged >>> models=1 >>> Processing data for group ... 1 >>> Initializing FM Model with existing parameters ... >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Processing model param ... v >>> Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000, >>> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0 >>> >>> *Code* >>> >>> val data: TypedPipe[(Int, Float, Either[FeatureVector, FMModelParameter])] >>> = modelData >>> val fmModels: SortedGrouped[Int, FMModel] = data >>> .groupBy { case (id1, id2, modelParam) => id1 } >>> .sortBy { case (id1, id2, modelParam) => id2 } >>> .forceToReducers >>> //Secondary is needed to ensure model parameters appear before actual >>> training data >>> //TODO: This sortby is causing problems and has a bug >>> .mapGroup { >>> case (groupId, records) => >>> println("Processing data for group ... " + groupId) >>> val trainedModel = aggregateAndUpdateModel(records) >>> Iterator(trainedModel) >>> } >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "Scalding Development" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> For more options, visit https://groups.google.com/d/optout. >>> >> > -- You received this message because you are subscribed to the Google Groups "Scalding Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
