Hi Pablo.

I will check out the PR and perform a test with the large CSV we talked
about tonight or tomorrow.

Fingers crossed :)


On Wed, Dec 2, 2020, 16:05 Pablo Estrada <[email protected]> wrote:

> Hi Thomas!
> This is my guess for a fix for this issue:
> https://github.com/apache/beam/pull/13441
> I have not tested it, so I am not sure that it will work. I wonder if
> you'd be willing to try it? (perhaps I can build a JAR for it... or... I'm
> not sure, what do you think?)
> Best
> -P.
>
> On Fri, Nov 20, 2020 at 11:20 AM Thomas Fredriksen(External) <
> [email protected]> wrote:
>
>> Hi Pablo!
>>
>> The file I am trying to ingest is 54.2 GB unzipped. It is available here:
>> https://obis.org/manual/access/
>>
>> I created a Jira issue for you, but seem unable to assign it:
>> https://issues.apache.org/jira/browse/BEAM-11313
>>
>> Thank you so much for taking a look at this!
>>
>> Best Regards
>> Thomas Li Fredriksen
>>
>> On Fri, Nov 20, 2020 at 5:43 PM Pablo Estrada <[email protected]> wrote:
>>
>>> Follow up: What is the size of the file you're consuming?
>>> -P.
>>>
>>> On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <[email protected]>
>>> wrote:
>>>
>>>> Hi Thomas!
>>>> This looks like it may be a bug with the azfs implementation. If you
>>>> notice the code, you're hitting this issue when the byte channel needs to
>>>> seek backwards.
>>>> I may take a stab at fixing it. I believe we have to catch a mark
>>>> expiration, and just reopen the file if that happens.
>>>>
>>>> Do you think you could create a JIRA issue to track it and assign it to
>>>> me?
>>>>
>>>> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
>>>> [email protected]> wrote:
>>>>
>>>>> Good morning everyone.
>>>>>
>>>>>
>>>>> I am attempting to parse a very large CSV (65 million lines) with BEAM
>>>>> (version 2.25) from an Azure Blob and have created a pipeline for this. I
>>>>> am running the pipeline on dataflow and testing with a smaller version of
>>>>> the file (10'000 lines).
>>>>>
>>>>> I am using FileIO and the filesystem prefix "azfs" to read from azure
>>>>> blobs.
>>>>>
>>>>> The pipeline works with the small test file, but when I run this on
>>>>> the bigger file I am getting an exception "Stream Mark Expired" (pasted
>>>>> below). Reading the same file from a GCP bucket works just fine, including
>>>>> when running with dataflow.
>>>>>
>>>>> Our primary filestore is Azure Storage, so moving to GCP buckets is
>>>>> not an option.
>>>>>
>>>>> Is there anyone who can help me resolve this?
>>>>>
>>>>>
>>>>> Best Regards
>>>>> Thomas Li Fredriksen
>>>>>
>>>>>
>>>>> The exception referenced above:
>>>>>
>>>>>
>>>>> Error message from worker: java.lang.RuntimeException:
>>>>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>>>>> Stream mark expired.
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>>>
>>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>>
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>
>>>>>     java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>>>> java.lang.RuntimeException: Stream mark expired.
>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>
>>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>
>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>>>>
>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>>>
>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>>
>>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>>>>>
>>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>>>>>
>>>>>     ... 21 more
>>>>> Caused by: java.lang.RuntimeException: Stream mark expired.
>>>>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>>>>>
>>>>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>>>>>
>>>>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>>>>>
>>>>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>>>>>
>>>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>>>>>
>>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>>>>>
>>>>>
>>>>>

Reply via email to