Just to be sure can you try with scalding 0.16.0? On Sun, Sep 11, 2016 at 11:23 ravi kiran holur vijay <[email protected]> wrote:
> Hey Oscar, > > Yes, that's correct. I am seeing data with id 0 being distributed to > multiple reducers, which sounds counterintuitive to what a groupBy followed > by a sortBy should do. However, if I comment the line with sortBy, I see > data with id 0 ending up at a single reducer. I filed a new issue to track > this and will work on coming up with a minimal test case for replicating > this. > > I am using Scalding 0.15.0 with Cascading 2.6.3 running > on hadoop-0.20.1-dev-qubole distribution. > > -Ravi > > On Sat, Sep 10, 2016 at 9:13 PM, Oscar Boykin <[email protected]> wrote: > >> Wait, sorry. Looking more carefully. Is the bug that originally all data >> with id 0 was in one reducer but with sorting it winds up on both? That >> would be a bug. What version of scalding is this? >> >> Can you replicate this bug in a minimal case? Sorting should not change >> how the keys are paritioned to reducer (which is done by hashCode of the >> key, which is the same, I suppose). >> >> Basically the test you want to write is that after groupBy with sortBy if >> you take only the keys in the output each key appears exactly once. >> >> I have a hard time believing there could have been a bug like this that >> we didn't notice for 5 years but I guess it is possible. >> >> On Sat, Sep 10, 2016 at 17:33 ravi kiran holur vijay < >> [email protected]> wrote: >> >>> Hey Oscar, >>> >>> Sorry, sounds like I might have misunderstood the semantics of groupBy >>> followed by sortBy. >>> Is there a way to make sure ALL records having the same key end up at >>> the same reducer (what groupBy does) and within each reducer, have it >>> sorted by value (what sortBy does)? >>> >>> -Ravi >>> >>> On Sat, Sep 10, 2016 at 8:06 PM, Oscar Boykin <[email protected]> wrote: >>> >>>> Sorry, I don't follow. What did you expect. I don't see a bug. >>>> >>>> The data looks sorted within groups, which is all sortBy does. >>>> >>>> Note, you don't need forceToReducers here. Sorting can only be done on >>>> the reducers. >>>> On Sat, Sep 10, 2016 at 15:12 ravi kiran holur vijay < >>>> [email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> I am noticing strange behaviour in my Scalding job which uses groupby >>>>> and sortby. If I do not have a .sortBy function, each of the reducers are >>>>> getting all of the values for the same group key. However, if I use >>>>> .sortBy, each reducer is getting only part of the values for the same >>>>> group >>>>> key. I was wondering if any of you have run into a similar issue before or >>>>> have a hypothesis about what's happening? >>>>> >>>>> Case 1: Observed behaviour = Expected behaviour, without using sortBy >>>>> >>>>> *Reducer 1 output*: >>>>> >>>>> Processing data for group ... 1 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... o >>>>> Processing model param ... w >>>>> Processing model param ... r >>>>> Processing model param ... s >>>>> Processing model param ... t >>>>> Processing model param ... l >>>>> Processing model param ... f >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, >>>>> reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, >>>>> statsFreq=1000000, merged models=1 >>>>> >>>>> *Reducer 2 output*: >>>>> Processing data for group ... 0 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... o >>>>> Processing model param ... w >>>>> Processing model param ... r >>>>> Processing model param ... s >>>>> Processing model param ... t >>>>> Processing model param ... l >>>>> Processing model param ... f >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Initialized FM Model with: w0=-0.181250, w=34531087, v=5, >>>>> reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, >>>>> statsFreq=1000000, merged models=1 >>>>> >>>>> Case 2: Observed behaviour != Expected behaviour, after using sortBy >>>>> *Reducer 1 output* >>>>> Processing data for group ... 0 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000, >>>>> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0 >>>>> Processing data for group ... 1 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... f >>>>> Processing model param ... l >>>>> Processing model param ... o >>>>> Processing model param ... r >>>>> Processing model param ... s >>>>> Processing model param ... t >>>>> Processing model param ... w >>>>> Initialized FM Model with: w0=-0.181250, w=34531087, v=0, >>>>> reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, >>>>> statsFreq=1000000, merged models=1 >>>>> >>>>> *Reducer 2 output* >>>>> Processing data for group ... 0 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... f >>>>> Processing model param ... l >>>>> Processing model param ... o >>>>> Processing model param ... v >>>>> Processing model param ... r >>>>> Processing model param ... s >>>>> Processing model param ... t >>>>> Processing model param ... w >>>>> Initialized FM Model with: w0=-0.181250, w=34531087, v=1, >>>>> reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, >>>>> statsFreq=1000000, merged models=1 >>>>> Processing data for group ... 1 >>>>> Initializing FM Model with existing parameters ... >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Processing model param ... v >>>>> Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000, >>>>> regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0 >>>>> >>>>> *Code* >>>>> >>>>> val data: TypedPipe[(Int, Float, Either[FeatureVector, >>>>> FMModelParameter])] = modelData >>>>> val fmModels: SortedGrouped[Int, FMModel] = data >>>>> .groupBy { case (id1, id2, modelParam) => id1 } >>>>> .sortBy { case (id1, id2, modelParam) => id2 } >>>>> .forceToReducers >>>>> //Secondary is needed to ensure model parameters appear before actual >>>>> training data >>>>> //TODO: This sortby is causing problems and has a bug >>>>> .mapGroup { >>>>> case (groupId, records) => >>>>> println("Processing data for group ... " + groupId) >>>>> val trainedModel = aggregateAndUpdateModel(records) >>>>> Iterator(trainedModel) >>>>> } >>>>> >>>>> -- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Scalding Development" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to [email protected]. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>> > -- You received this message because you are subscribed to the Google Groups "Scalding Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
