Hey,
Flink is super useful for Beam development, but I’m having trouble writing data
to Parquet. Everything works fine on DirectRunner, DataflowRunner, and
FlinkRunner against a local cluster (1.10.2). However, when I use FlinkRunner
in embedded mode, only a subset of my data arrive on the file store. Local
device is a Mac; Beam is 2.25.0; code is Java; runner flavour classic; Gist of
code and log content here:
https://gist.github.com/chrishinds/aba17d314fa6561b0904ee783090947f
In this test I read ~8000 files, but I only get data back on ~1000 of them; it
looks like I’ve received only one of what ought to have been several Parquet
shards.
The logs also make me think this is what’s happening, multiple temp files
getting copied to the same final filename:
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/10e28959-2db1-431f-835f-7b3192243852<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/10e28959-2db1-431f-835f-7b3192243852>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7ab5a9ab-b46f-4bdd-a5c0-9c6b1fdb4638<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7ab5a9ab-b46f-4bdd-a5c0-9c6b1fdb4638>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/2d3c495a-313b-4ca7-a398-5b32f321f4b7<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/2d3c495a-313b-4ca7-a398-5b32f321f4b7>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/de23ceeb-04d1-4cb6-a0a9-0507c112b93a<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/de23ceeb-04d1-4cb6-a0a9-0507c112b93a>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/a40489ce-7432-443b-9828-9511ab9db0cb<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/a40489ce-7432-443b-9828-9511ab9db0cb>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/c3819566-4edc-4362-b4ca-55e2e2b70f4c<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/c3819566-4edc-4362-b4ca-55e2e2b70f4c>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/3fd67eee-ea50-4b5b-bf3f-fe253dfcf3a4<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/3fd67eee-ea50-4b5b-bf3f-fe253dfcf3a4>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7d5e4096-f5ce-4a79-aec5-e5e4293bc601<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7d5e4096-f5ce-4a79-aec5-e5e4293bc601>,
shard=0,
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5,
paneInfo=PaneInfo.NO_FIRING} to final location
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
I’m no Java pro, this seems like a catastrophic fail, but maybe I'm doing
something unspeakably stupid?
Cheers,
Chris.