Hi,  I'm new to Apache Beam. I'm going to read and filter data from 
CommonCrawl. Recent CommonCrawl dump contains 72000 files which are mentioned 
in  commoncrawl.s3.amazonaws.com 
https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2022-05/wat.paths.gz . 
I want to build a Beam pipeline to read all the files. Unfortunately I am stuck 
on parallelizing this read on top of GCP Dataflow. My code is very simply but 
it seems I'm blocked to just one worker on a step which is responsible for 
passing files to be fetched from AWS. I thought that this is maybe because Beam 
tries to split my work into Bundles but because there are only 72000 small 
strings (links/urls) the Bundle is so small that it goes to a single worker. 
Any ideas how I can overcome this problem? Thanks in advance. I will appreciate 
all the help.     type extractFn struct {  }   func (f *extractFn) 
ProcessElement(ctx context.Context, line string, emit func(string)) {    
emit(line)  }   type fetchFileFn struct {  }   func (f *fetchFileFn) 
ProcessElement(ctx context.Context, url string, emit func(string, string)) {    
if url == "" {      return    }     fullURL := " 
commoncrawl.s3.amazonaws.com https://commoncrawl.s3.amazonaws.com/ " + 
strings.TrimSpace(url)    buf, err := gzipio.ReadHTTP(ctx, fullURL)    
warcCounter.Inc(ctx, 1)    if err == nil {      for {        record, err := 
warc.ReadWARCRecord(buf)        if err != nil {          warcCounter.Dec(ctx, 
1)          return       }       warcLinksCounter.Inc(ctx, 1)       
emit(record.URL, record.Body)      }    }    warcCounter.Dec(ctx, 1)  }    func 
main() {    p := beam.NewPipeline()    s := p.Root()    urls := 
gzipio.ReadHTTPBeam(s, *input)    wetSrc := beam.ParDo(s, &extractFn{}, 
urls)    _ := beam.ParDo(s, &fetchFileFn{}, wetSrc)    
beamx.Run(context.Background(), p)  }    --  Paweł Róg

Reply via email to