Hi Folks,
Cross-posting from the Slack channel from the other day.
I started looking at Beam again over the weekend. I have an unbounded
stream with a CassandraIO input and am trying to write files using FileIO
and ParquetIO.
I'm using the following:
Beam: 2.20.0
Flink Runner/Cluster: 1.9(.3)
java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
--sdkWorkerParallelism=0 --runner=FlinkRunner
When submitting to a Flink cluster I include --flinkMaster=localhost:8081
in the command.
If I replace the FileIO with a simple log writer it prints out the records
and makes progress. Using the FileIO with ParquetIO it stalls on the
stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
->
write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
-> write/WriteFiles/GatherTempFileResults/Add void
key/AddKeys/Map/ParMultiDo(Anonymous).
That brings me to ask the following questions:
1. What's the best way to test and monitor a beam pipeline?
2. What adjustments are required to get this pipeline writing files?
3. Is there some kind of way to evaluate the DAG and identify scenarios
where this stall is likely?
PipelineOptions pipelineOptions = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.create();
Pipeline p = Pipeline.create(pipelineOptions);
CoderRegistry registry = p.getCoderRegistry();
registry.registerCoderForClass(GenericRecord.class,
AvroCoder.of(SCHEMA)); PCollection<Metric> metrics =
p.apply("cassandra",
CassandraIO.<Metric>read()
.withHosts(hosts)
.withPort(9042)
.withLocalDc("datacenter1")
.withKeyspace(KEY_SPACE)
.withTable(TABLE)
.withMinNumberOfSplits(100)
.withEntity(Metric.class)
.withCoder(SerializableCoder.of(Metric.class)));
metrics.apply("window",
Window.<Metric>into(
FixedWindows.of(Duration.standardSeconds(30)))
.withAllowedLateness(Duration.standardSeconds(5))
.accumulatingFiredPanes())
.apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
.apply("write", FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA))
.withNumShards(200)
.to("./metrics/")
.withPrefix("metrics")
.withSuffix(".parquet"));
p.run().waitUntilFinish();
I also loaded this into a Flink cluster and it appears to stall on the
temporary file sharding as outlined above and eventually fails after
processing about 600-700k records.
Rereading the windowing section in the document I changed it to
discardFiredPanes() as it seems the more appropriate behaviour for what I
want but that doesn't appear to have changed the results any.
Regards,
--
Nathan Fisher
w: http://junctionbox.ca/