Hi, I'm new to Apache Beam. I'm going to read and filter data from
CommonCrawl. Recent CommonCrawl dump contains 72000 files which are mentioned
in commoncrawl.s3.amazonaws.com
https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2022-05/wat.paths.gz .
I want to build a Beam pipeline to read all the files. Unfortunately I am stuck
on parallelizing this read on top of GCP Dataflow. My code is very simply but
it seems I'm blocked to just one worker on a step which is responsible for
passing files to be fetched from AWS. I thought that this is maybe because Beam
tries to split my work into Bundles but because there are only 72000 small
strings (links/urls) the Bundle is so small that it goes to a single worker.
Any ideas how I can overcome this problem? Thanks in advance. I will appreciate
all the help. type extractFn struct { } func (f *extractFn)
ProcessElement(ctx context.Context, line string, emit func(string)) {
emit(line) } type fetchFileFn struct { } func (f *fetchFileFn)
ProcessElement(ctx context.Context, url string, emit func(string, string)) {
if url == "" { return } fullURL := "
commoncrawl.s3.amazonaws.com https://commoncrawl.s3.amazonaws.com/ " +
strings.TrimSpace(url) buf, err := gzipio.ReadHTTP(ctx, fullURL)
warcCounter.Inc(ctx, 1) if err == nil { for { record, err :=
warc.ReadWARCRecord(buf) if err != nil { warcCounter.Dec(ctx,
1) return } warcLinksCounter.Inc(ctx, 1)
emit(record.URL, record.Body) } } warcCounter.Dec(ctx, 1) } func
main() { p := beam.NewPipeline() s := p.Root() urls :=
gzipio.ReadHTTPBeam(s, *input) wetSrc := beam.ParDo(s, &extractFn{},
urls) _ := beam.ParDo(s, &fetchFileFn{}, wetSrc)
beamx.Run(context.Background(), p) } -- Paweł Róg