Re: Beam IO Connector

2023-08-14 Thread Devon Peticolas
Sure thing Jeremy,

Generally, the workflow we do is that every one of our jobs takes a
--consumerMode and/or --producerMode option and we pass those options to a
Read and Write PTransform which wraps standard IO PTransforms and calls the
correct one in expand based on the option's value.

I have a simplified example in slide 26 of my deck from my talk at the most
recent Beam Summit , here:
https://github.com/x/slides/blob/master/beam-summit-2023/Apache%20Beam%20Summit%20-%20Avro%20and%20Beam%20Schemas.pdf

And I have an example of the current version of the code we use at my
company, Oden, in a gist here:
https://gist.github.com/x/948ac95b768671d342cc3856a3d7c681

The main use-case for us is that all of our dataflow jobs run in both a
Streaming (normal) and Batch (recovery) mode.


On Aug 14, 2023 at 3:09:51 PM, Jeremy Bloom  wrote:

> Thanks. Is there a github link to Devon's code?
>
> On Mon, Aug 14, 2023 at 8:49 AM John Casey 
> wrote:
>
>> I believe Devon Peticolas wrote a similar tool to create an IO that wrote
>> to configurable sinks that might fit your use case
>>
>> On Sat, Aug 12, 2023 at 12:18 PM Bruno Volpato via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi Jeremy,
>>>
>>> Apparently you are trying to use Beam's DirectRunner
>>> , which is
>>> mostly focused on small pipelines / testing purposes.
>>> Even if it runs in the JVM, there are protections in place to make sure
>>> your pipeline will be able to be distributed correctly when choosing a
>>> production-ready runner (e.g., Dataflow, Spark, Flink), from the link above:
>>>
>>> - enforcing immutability of elements
>>> - enforcing encodability of elements
>>>
>>> There are ways to disable those checks (--enforceEncodability=false,
>>> --enforceImmutability=false), but to make sure you take the best out of
>>> Beam and can run the pipeline in one of the runners in the future, I
>>> believe the best way would be to write to a file, and read it back in the
>>> GUI application (for the sink part).
>>>
>>> For the source part, you may want to use Create
>>> 
>>> to create a PCollection with specific elements for the in-memory scenario.
>>>
>>> If you are getting exceptions for supported scenarios that you've
>>> mentioned, there are a few things -- for example, if you are using lambda,
>>> sometimes Java will try to Serialize the entire instance that holds members
>>> being used. Creating your own DoFn classes and passing the Serializables
>>> that what you need to use may resolve.
>>>
>>>
>>> Best,
>>> Bruno
>>>
>>>
>>>
>>>
>>> On Sat, Aug 12, 2023 at 11:34 AM Jeremy Bloom 
>>> wrote:
>>>
 Hello-
 I am fairly new to Beam but have been working with Apache Spark for a
 number of years. The application I am developing uses a data pipeline to
 ingest JSON with a particular schema, uses it to prepare data for a service
 that I do not control (a mathematical optimization solver), runs the
 application and recovers its results, and then publishes the results in
 JSON (same schema).  Although I work in Java, colleagues of mine are
 implementing in Python. This is an open-source, non-commercial project.

 The application has three kinds of IO sources/sinks: file system files
 (using Windows now, but Unix in the future), URL, and in-memory (string,
 byte buffer, etc). The last is primarily used for debugging, displayed in a
 JTextArea.

 I have not found a Beam IO connector that handles all three data
 sources/sinks, particularly the in-memory sink. I have tried adapting
 FileIO and TextIO, however, I continually run up against objects that are
 not serializable, particularly Java OutputStream and its subclasses. I have
 looked at the code for FileIO and TextIO as well as several other custom IO
 implementations, but none of them addresses this particular bug.

 The CSVSink example in the FileIO Javadoc uses a PrintWriter, which is
 not serializable; when I tried the same thing, I got a not-serializable
 exception. How does this example actually avoid this error? In the code for
 TextIO.Sink, the PrintWriter field is marked transient, meaning that it is
 not serialized, but again, when I tried the same thing, I got an exception.

 Please explain, in particular, how to write a Sink that avoids the not
 serializable exception. In general, please explain how I can use a Beam IO
 connector for the three kinds of data sources/sinks I want to use (file
 system, url, and in-memory).

 After the frustrations I had with Spark, I have high hopes for Beam.
 This issue is a blocker for me.

 Thank you.
 Jeremy Bloom

>>>


Re: Beam IO Connector

2023-08-14 Thread Jeremy Bloom
Thanks. Is there a github link to Devon's code?

On Mon, Aug 14, 2023 at 8:49 AM John Casey  wrote:

> I believe Devon Peticolas wrote a similar tool to create an IO that wrote
> to configurable sinks that might fit your use case
>
> On Sat, Aug 12, 2023 at 12:18 PM Bruno Volpato via dev <
> dev@beam.apache.org> wrote:
>
>> Hi Jeremy,
>>
>> Apparently you are trying to use Beam's DirectRunner
>> , which is mostly
>> focused on small pipelines / testing purposes.
>> Even if it runs in the JVM, there are protections in place to make sure
>> your pipeline will be able to be distributed correctly when choosing a
>> production-ready runner (e.g., Dataflow, Spark, Flink), from the link above:
>>
>> - enforcing immutability of elements
>> - enforcing encodability of elements
>>
>> There are ways to disable those checks (--enforceEncodability=false,
>> --enforceImmutability=false), but to make sure you take the best out of
>> Beam and can run the pipeline in one of the runners in the future, I
>> believe the best way would be to write to a file, and read it back in the
>> GUI application (for the sink part).
>>
>> For the source part, you may want to use Create
>>  to
>> create a PCollection with specific elements for the in-memory scenario.
>>
>> If you are getting exceptions for supported scenarios that you've
>> mentioned, there are a few things -- for example, if you are using lambda,
>> sometimes Java will try to Serialize the entire instance that holds members
>> being used. Creating your own DoFn classes and passing the Serializables
>> that what you need to use may resolve.
>>
>>
>> Best,
>> Bruno
>>
>>
>>
>>
>> On Sat, Aug 12, 2023 at 11:34 AM Jeremy Bloom 
>> wrote:
>>
>>> Hello-
>>> I am fairly new to Beam but have been working with Apache Spark for a
>>> number of years. The application I am developing uses a data pipeline to
>>> ingest JSON with a particular schema, uses it to prepare data for a service
>>> that I do not control (a mathematical optimization solver), runs the
>>> application and recovers its results, and then publishes the results in
>>> JSON (same schema).  Although I work in Java, colleagues of mine are
>>> implementing in Python. This is an open-source, non-commercial project.
>>>
>>> The application has three kinds of IO sources/sinks: file system files
>>> (using Windows now, but Unix in the future), URL, and in-memory (string,
>>> byte buffer, etc). The last is primarily used for debugging, displayed in a
>>> JTextArea.
>>>
>>> I have not found a Beam IO connector that handles all three data
>>> sources/sinks, particularly the in-memory sink. I have tried adapting
>>> FileIO and TextIO, however, I continually run up against objects that are
>>> not serializable, particularly Java OutputStream and its subclasses. I have
>>> looked at the code for FileIO and TextIO as well as several other custom IO
>>> implementations, but none of them addresses this particular bug.
>>>
>>> The CSVSink example in the FileIO Javadoc uses a PrintWriter, which is
>>> not serializable; when I tried the same thing, I got a not-serializable
>>> exception. How does this example actually avoid this error? In the code for
>>> TextIO.Sink, the PrintWriter field is marked transient, meaning that it is
>>> not serialized, but again, when I tried the same thing, I got an exception.
>>>
>>> Please explain, in particular, how to write a Sink that avoids the not
>>> serializable exception. In general, please explain how I can use a Beam IO
>>> connector for the three kinds of data sources/sinks I want to use (file
>>> system, url, and in-memory).
>>>
>>> After the frustrations I had with Spark, I have high hopes for Beam.
>>> This issue is a blocker for me.
>>>
>>> Thank you.
>>> Jeremy Bloom
>>>
>>


Re: Beam IO Connector

2023-08-14 Thread John Casey via dev
I believe Devon Peticolas wrote a similar tool to create an IO that wrote
to configurable sinks that might fit your use case

On Sat, Aug 12, 2023 at 12:18 PM Bruno Volpato via dev 
wrote:

> Hi Jeremy,
>
> Apparently you are trying to use Beam's DirectRunner
> , which is mostly
> focused on small pipelines / testing purposes.
> Even if it runs in the JVM, there are protections in place to make sure
> your pipeline will be able to be distributed correctly when choosing a
> production-ready runner (e.g., Dataflow, Spark, Flink), from the link above:
>
> - enforcing immutability of elements
> - enforcing encodability of elements
>
> There are ways to disable those checks (--enforceEncodability=false,
> --enforceImmutability=false), but to make sure you take the best out of
> Beam and can run the pipeline in one of the runners in the future, I
> believe the best way would be to write to a file, and read it back in the
> GUI application (for the sink part).
>
> For the source part, you may want to use Create
>  to
> create a PCollection with specific elements for the in-memory scenario.
>
> If you are getting exceptions for supported scenarios that you've
> mentioned, there are a few things -- for example, if you are using lambda,
> sometimes Java will try to Serialize the entire instance that holds members
> being used. Creating your own DoFn classes and passing the Serializables
> that what you need to use may resolve.
>
>
> Best,
> Bruno
>
>
>
>
> On Sat, Aug 12, 2023 at 11:34 AM Jeremy Bloom 
> wrote:
>
>> Hello-
>> I am fairly new to Beam but have been working with Apache Spark for a
>> number of years. The application I am developing uses a data pipeline to
>> ingest JSON with a particular schema, uses it to prepare data for a service
>> that I do not control (a mathematical optimization solver), runs the
>> application and recovers its results, and then publishes the results in
>> JSON (same schema).  Although I work in Java, colleagues of mine are
>> implementing in Python. This is an open-source, non-commercial project.
>>
>> The application has three kinds of IO sources/sinks: file system files
>> (using Windows now, but Unix in the future), URL, and in-memory (string,
>> byte buffer, etc). The last is primarily used for debugging, displayed in a
>> JTextArea.
>>
>> I have not found a Beam IO connector that handles all three data
>> sources/sinks, particularly the in-memory sink. I have tried adapting
>> FileIO and TextIO, however, I continually run up against objects that are
>> not serializable, particularly Java OutputStream and its subclasses. I have
>> looked at the code for FileIO and TextIO as well as several other custom IO
>> implementations, but none of them addresses this particular bug.
>>
>> The CSVSink example in the FileIO Javadoc uses a PrintWriter, which is
>> not serializable; when I tried the same thing, I got a not-serializable
>> exception. How does this example actually avoid this error? In the code for
>> TextIO.Sink, the PrintWriter field is marked transient, meaning that it is
>> not serialized, but again, when I tried the same thing, I got an exception.
>>
>> Please explain, in particular, how to write a Sink that avoids the not
>> serializable exception. In general, please explain how I can use a Beam IO
>> connector for the three kinds of data sources/sinks I want to use (file
>> system, url, and in-memory).
>>
>> After the frustrations I had with Spark, I have high hopes for Beam. This
>> issue is a blocker for me.
>>
>> Thank you.
>> Jeremy Bloom
>>
>


Re: Beam IO Connector

2023-08-12 Thread Bruno Volpato via dev
Hi Jeremy,

Apparently you are trying to use Beam's DirectRunner
, which is mostly
focused on small pipelines / testing purposes.
Even if it runs in the JVM, there are protections in place to make sure
your pipeline will be able to be distributed correctly when choosing a
production-ready runner (e.g., Dataflow, Spark, Flink), from the link above:

- enforcing immutability of elements
- enforcing encodability of elements

There are ways to disable those checks (--enforceEncodability=false,
--enforceImmutability=false), but to make sure you take the best out of
Beam and can run the pipeline in one of the runners in the future, I
believe the best way would be to write to a file, and read it back in the
GUI application (for the sink part).

For the source part, you may want to use Create
 to
create a PCollection with specific elements for the in-memory scenario.

If you are getting exceptions for supported scenarios that you've
mentioned, there are a few things -- for example, if you are using lambda,
sometimes Java will try to Serialize the entire instance that holds members
being used. Creating your own DoFn classes and passing the Serializables
that what you need to use may resolve.


Best,
Bruno




On Sat, Aug 12, 2023 at 11:34 AM Jeremy Bloom  wrote:

> Hello-
> I am fairly new to Beam but have been working with Apache Spark for a
> number of years. The application I am developing uses a data pipeline to
> ingest JSON with a particular schema, uses it to prepare data for a service
> that I do not control (a mathematical optimization solver), runs the
> application and recovers its results, and then publishes the results in
> JSON (same schema).  Although I work in Java, colleagues of mine are
> implementing in Python. This is an open-source, non-commercial project.
>
> The application has three kinds of IO sources/sinks: file system files
> (using Windows now, but Unix in the future), URL, and in-memory (string,
> byte buffer, etc). The last is primarily used for debugging, displayed in a
> JTextArea.
>
> I have not found a Beam IO connector that handles all three data
> sources/sinks, particularly the in-memory sink. I have tried adapting
> FileIO and TextIO, however, I continually run up against objects that are
> not serializable, particularly Java OutputStream and its subclasses. I have
> looked at the code for FileIO and TextIO as well as several other custom IO
> implementations, but none of them addresses this particular bug.
>
> The CSVSink example in the FileIO Javadoc uses a PrintWriter, which is not
> serializable; when I tried the same thing, I got a not-serializable
> exception. How does this example actually avoid this error? In the code for
> TextIO.Sink, the PrintWriter field is marked transient, meaning that it is
> not serialized, but again, when I tried the same thing, I got an exception.
>
> Please explain, in particular, how to write a Sink that avoids the not
> serializable exception. In general, please explain how I can use a Beam IO
> connector for the three kinds of data sources/sinks I want to use (file
> system, url, and in-memory).
>
> After the frustrations I had with Spark, I have high hopes for Beam. This
> issue is a blocker for me.
>
> Thank you.
> Jeremy Bloom
>