Re: [External Sender] Re: [Question] Error handling for IO Write Functions

2023-11-09 Thread Robert Bradshaw via dev
+1

Specifically, p.run().waitUntilFinish() would throw an exception if there
were errors during pipeline execution.

On Wed, Nov 8, 2023 at 8:05 AM John Casey via dev 
wrote:

> Yep, thats a common misunderstanding with beam.
>
> The code that is actually executed in the try block is just for pipeline
> construction, and no data is processed at this point in time.
>
> Once the pipeline is constructed, the various pardos are serialized, and
> sent to the runners, where they are actually executed.
>
> In this case, if there was an exception in the pardo that converts rows to
> avro, you would see the "Exception when converting Beam Row to Avro Record"
> log in whatever logs your runner provides you, and the exception would
> propagate up to your runner.
>
> In this case, your log log.info("Finished writing Parquet file to path
> {}", writePath); is inaccurate, it will log when the pipeline is
> constructed, not when the parquet write completes
>
> On Wed, Nov 8, 2023 at 10:51 AM Ramya Prasad via dev 
> wrote:
>
>> Hey John,
>>
>> Yes that's how my code is set up, I have the FileIO.write() in its own
>> try-catch block. I took a second look at where exactly the code is failing,
>> and it's actually in a ParDo function which is happening before I call
>> FileIO.write(). But even within that, I've tried adding a try-catch but the
>> error isn't stopping the actual application run in a Spark cluster. In the
>> cluster, I see that the exception is being thrown from my ParDo, but then
>> immediately after that, I see the line* INFO ApplicationMaster: Final
>> app status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup
>> looks like:
>>
>> @Slf4j
>> public class ParquetWriteActionStrategy {
>>
>> public void executeWriteAction(Pipeline p) throws Exception {
>>
>> try {
>>
>> // transform PCollection from type Row to GenericRecords
>> PCollection records = p.apply("transform 
>> PCollection from type Row to GenericRecords",
>> ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(@Element Row row, 
>> OutputReceiver out) {
>> try {
>> 
>> } catch (Exception e) {
>> log.error("Exception when converting Beam 
>> Row to Avro Record: {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>> })).setCoder(AvroCoder.of(avroSchema));
>> records.apply("Writing Parquet Output File", 
>> FileIO.
>> write()
>> .via()
>> .to(writePath)
>> .withSuffix(".parquet"));
>>
>> log.info("Finished writing Parquet file to path {}", writePath);
>> } catch (Exception e) {
>> log.error("Error in Parquet Write Action. {}", e.getMessage());
>> throw e;
>> }
>>
>> }
>>
>>
>> On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev 
>> wrote:
>>
>>> There are 2 execution times when using Beam. The first execution is
>>> local, when a pipeline is constructed, and the second is remote on the
>>> runner, processing data.
>>>
>>> Based on what you said, it sounds like you are wrapping pipeline
>>> construction in a try-catch, and constructing FileIO isn't failing.
>>>
>>> e.g.
>>>
>>> try {
>>>
>>> FileIO.write().someOtherconfigs()
>>>
>>> } catch ...
>>>
>>> this will catch any exceptions in constructing fileio, but the running
>>> pipeline won't propagate exceptions through this exception block.
>>>
>>> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
 File write failures should be throwing exceptions that will
 terminate the pipeline on failure. (Generally a distributed runner will
 make multiple attempts before abandoning the entire pipeline of course.)

 Are you seeing files failing to be written but no exceptions being
 thrown? If so, this is definitely a bug that we want to resolve.


 On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev <
 dev@beam.apache.org> wrote:

> Hello,
>
> I am a developer using Apache Beam in my Java application, and I need
> some help on how to handle exceptions when writing a file to S3. I have
> tried wrapping my code within a try-catch block, but no exception is being
> thrown within the try block. I'm assuming that FileIO doesn't throw any
> exceptions upon failure. Is there a way in which I can either terminate 
> the
> program on failure or at least be made aware of if any of my write
> operations fail?
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital 

Re: [External Sender] Re: [Question] Error handling for IO Write Functions

2023-11-08 Thread John Casey via dev
Yep, thats a common misunderstanding with beam.

The code that is actually executed in the try block is just for pipeline
construction, and no data is processed at this point in time.

Once the pipeline is constructed, the various pardos are serialized, and
sent to the runners, where they are actually executed.

In this case, if there was an exception in the pardo that converts rows to
avro, you would see the "Exception when converting Beam Row to Avro Record"
log in whatever logs your runner provides you, and the exception would
propagate up to your runner.

In this case, your log log.info("Finished writing Parquet file to path {}",
writePath); is inaccurate, it will log when the pipeline is constructed,
not when the parquet write completes

On Wed, Nov 8, 2023 at 10:51 AM Ramya Prasad via dev 
wrote:

> Hey John,
>
> Yes that's how my code is set up, I have the FileIO.write() in its own
> try-catch block. I took a second look at where exactly the code is failing,
> and it's actually in a ParDo function which is happening before I call
> FileIO.write(). But even within that, I've tried adding a try-catch but the
> error isn't stopping the actual application run in a Spark cluster. In the
> cluster, I see that the exception is being thrown from my ParDo, but then
> immediately after that, I see the line* INFO ApplicationMaster: Final app
> status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup looks
> like:
>
> @Slf4j
> public class ParquetWriteActionStrategy {
>
> public void executeWriteAction(Pipeline p) throws Exception {
>
> try {
>
> // transform PCollection from type Row to GenericRecords
> PCollection records = p.apply("transform 
> PCollection from type Row to GenericRecords",
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(@Element Row row, 
> OutputReceiver out) {
> try {
> 
> } catch (Exception e) {
> log.error("Exception when converting Beam Row 
> to Avro Record: {}", e.getMessage());
> throw e;
> }
>
> }
> })).setCoder(AvroCoder.of(avroSchema));
> records.apply("Writing Parquet Output File", 
> FileIO.
> write()
> .via()
> .to(writePath)
> .withSuffix(".parquet"));
>
> log.info("Finished writing Parquet file to path {}", writePath);
> } catch (Exception e) {
> log.error("Error in Parquet Write Action. {}", e.getMessage());
> throw e;
> }
>
> }
>
>
> On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev 
> wrote:
>
>> There are 2 execution times when using Beam. The first execution is
>> local, when a pipeline is constructed, and the second is remote on the
>> runner, processing data.
>>
>> Based on what you said, it sounds like you are wrapping pipeline
>> construction in a try-catch, and constructing FileIO isn't failing.
>>
>> e.g.
>>
>> try {
>>
>> FileIO.write().someOtherconfigs()
>>
>> } catch ...
>>
>> this will catch any exceptions in constructing fileio, but the running
>> pipeline won't propagate exceptions through this exception block.
>>
>> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> File write failures should be throwing exceptions that will
>>> terminate the pipeline on failure. (Generally a distributed runner will
>>> make multiple attempts before abandoning the entire pipeline of course.)
>>>
>>> Are you seeing files failing to be written but no exceptions being
>>> thrown? If so, this is definitely a bug that we want to resolve.
>>>
>>>
>>> On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Hello,

 I am a developer using Apache Beam in my Java application, and I need
 some help on how to handle exceptions when writing a file to S3. I have
 tried wrapping my code within a try-catch block, but no exception is being
 thrown within the try block. I'm assuming that FileIO doesn't throw any
 exceptions upon failure. Is there a way in which I can either terminate the
 program on failure or at least be made aware of if any of my write
 operations fail?

 Thanks and sincerely,
 Ramya
 --

 The information contained in this e-mail may be confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby 

Re: [External Sender] Re: [Question] Error handling for IO Write Functions

2023-11-08 Thread Ramya Prasad via dev
Hey John,

Yes that's how my code is set up, I have the FileIO.write() in its own
try-catch block. I took a second look at where exactly the code is failing,
and it's actually in a ParDo function which is happening before I call
FileIO.write(). But even within that, I've tried adding a try-catch but the
error isn't stopping the actual application run in a Spark cluster. In the
cluster, I see that the exception is being thrown from my ParDo, but then
immediately after that, I see the line* INFO ApplicationMaster: Final app
status: SUCCEEDED, exitCode: 0. *This is roughly what my code setup looks
like:

@Slf4j
public class ParquetWriteActionStrategy {

public void executeWriteAction(Pipeline p) throws Exception {

try {

// transform PCollection from type Row to GenericRecords
PCollection records = p.apply("transform
PCollection from type Row to GenericRecords",
ParDo.of(new DoFn() {
@ProcessElement
public void processElement(@Element Row row,
OutputReceiver out) {
try {

} catch (Exception e) {
log.error("Exception when converting
Beam Row to Avro Record: {}", e.getMessage());
throw e;
}

}
})).setCoder(AvroCoder.of(avroSchema));
records.apply("Writing Parquet Output File", FileIO.
write()
.via()
.to(writePath)
.withSuffix(".parquet"));

log.info("Finished writing Parquet file to path {}", writePath);
} catch (Exception e) {
log.error("Error in Parquet Write Action. {}", e.getMessage());
throw e;
}

}


On Wed, Nov 8, 2023 at 9:16 AM John Casey via dev 
wrote:

> There are 2 execution times when using Beam. The first execution is local,
> when a pipeline is constructed, and the second is remote on the runner,
> processing data.
>
> Based on what you said, it sounds like you are wrapping pipeline
> construction in a try-catch, and constructing FileIO isn't failing.
>
> e.g.
>
> try {
>
> FileIO.write().someOtherconfigs()
>
> } catch ...
>
> this will catch any exceptions in constructing fileio, but the running
> pipeline won't propagate exceptions through this exception block.
>
> On Tue, Nov 7, 2023 at 5:21 PM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
>
>> File write failures should be throwing exceptions that will terminate the
>> pipeline on failure. (Generally a distributed runner will make multiple
>> attempts before abandoning the entire pipeline of course.)
>>
>> Are you seeing files failing to be written but no exceptions being
>> thrown? If so, this is definitely a bug that we want to resolve.
>>
>>
>> On Tue, Nov 7, 2023 at 11:17 AM Ramya Prasad via dev 
>> wrote:
>>
>>> Hello,
>>>
>>> I am a developer using Apache Beam in my Java application, and I need
>>> some help on how to handle exceptions when writing a file to S3. I have
>>> tried wrapping my code within a try-catch block, but no exception is being
>>> thrown within the try block. I'm assuming that FileIO doesn't throw any
>>> exceptions upon failure. Is there a way in which I can either terminate the
>>> program on failure or at least be made aware of if any of my write
>>> operations fail?
>>>
>>> Thanks and sincerely,
>>> Ramya
>>> --
>>>
>>> The information contained in this e-mail may be confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>>
>>>
>>>
>>>

__



The information contained in this e-mail may be confidential and/or proprietary 
to Capital One and/or its affiliates and may only be used solely in performance 
of work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have