Very silly of me. Thank you :) > (Also, in your combiner, I'm not understanding why you have all_terms.add(new_tags[0]). Did you want all_terms.update(*new_tags[0]))?
This is because each term can be a list of many different terms. That is why. Hence they need not be unique. But for the other things like titles and abstracts they should always be unique given the way query_with_keywords() is written. Sayak Paul | sayak.dev On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <[email protected]> wrote: > The problem is in your line > > collected_entries = beam.CombineGlobally(GatherRecords()) > > You're not applying the CombineGlobally transform to anything, just > assigning it to the variable collected_entries. This should probably > be > > collected_entries = records | beam.CombineGlobally(GatherRecords()) > > (Also, in your combiner, I'm not understanding why you have > all_terms.add(new_tags[0]). Did you want > all_terms.update(*new_tags[0]))? > > On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <[email protected]> wrote: > > > > Hi folks, > > > > I am currently working on a pipeline with which I want to gather a bunch > of paper titles, abstracts, and their term categories from arXiv. I am > using a combination of CombineGlobally and a custom CombineFn to maintain > three different sets to accumulate these records. > > > > I might have written the accumulator in the wrong manner but I am not > sure where it's going wrong i.e. the pipeline is able to collect the > entries using the arxiv API but not able to accumulate the results. > > > > Here's my notebook for reproducing the issue. > > > > Sayak Paul | sayak.dev > > >
