Hi Pablo! The file I am trying to ingest is 54.2 GB unzipped. It is available here: https://obis.org/manual/access/
I created a Jira issue for you, but seem unable to assign it: https://issues.apache.org/jira/browse/BEAM-11313 Thank you so much for taking a look at this! Best Regards Thomas Li Fredriksen On Fri, Nov 20, 2020 at 5:43 PM Pablo Estrada <[email protected]> wrote: > Follow up: What is the size of the file you're consuming? > -P. > > On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <[email protected]> wrote: > >> Hi Thomas! >> This looks like it may be a bug with the azfs implementation. If you >> notice the code, you're hitting this issue when the byte channel needs to >> seek backwards. >> I may take a stab at fixing it. I believe we have to catch a mark >> expiration, and just reopen the file if that happens. >> >> Do you think you could create a JIRA issue to track it and assign it to >> me? >> >> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen < >> [email protected]> wrote: >> >>> Good morning everyone. >>> >>> >>> I am attempting to parse a very large CSV (65 million lines) with BEAM >>> (version 2.25) from an Azure Blob and have created a pipeline for this. I >>> am running the pipeline on dataflow and testing with a smaller version of >>> the file (10'000 lines). >>> >>> I am using FileIO and the filesystem prefix "azfs" to read from azure >>> blobs. >>> >>> The pipeline works with the small test file, but when I run this on the >>> bigger file I am getting an exception "Stream Mark Expired" (pasted below). >>> Reading the same file from a GCP bucket works just fine, including when >>> running with dataflow. >>> >>> Our primary filestore is Azure Storage, so moving to GCP buckets is not >>> an option. >>> >>> Is there anyone who can help me resolve this? >>> >>> >>> Best Regards >>> Thomas Li Fredriksen >>> >>> >>> The exception referenced above: >>> >>> >>> Error message from worker: java.lang.RuntimeException: >>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: >>> Stream mark expired. >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184) >>> >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108) >>> >>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56) >>> >>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39) >>> >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) >>> >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) >>> >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386) >>> >>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) >>> >>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) >>> >>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> >>> java.base/java.lang.Thread.run(Thread.java:834) >>> Caused by: org.apache.beam.sdk.util.UserCodeException: >>> java.lang.RuntimeException: Stream mark expired. >>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) >>> >>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >>> >>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) >>> >>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) >>> >>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) >>> >>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86) >>> >>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown >>> Source) >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) >>> >>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) >>> >>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >>> >>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) >>> >>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) >>> >>> ... 21 more >>> Caused by: java.lang.RuntimeException: Stream mark expired. >>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366) >>> >>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89) >>> >>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152) >>> >>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476) >>> >>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) >>> >>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113) >>> >>> >>>
