Re: [YAML] add timestamp to a bounded PCollection

2024-01-09 Thread Robert Bradshaw via dev
Just created https://github.com/apache/beam/pull/29969

On Mon, Jan 8, 2024 at 2:49 PM Robert Bradshaw  wrote:
>
> This does appear to be a significant missing feature. I'll try to make
> sure something easier gets in by the next release. See also below.
>
> On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
>  wrote:
> >
> > Hi Yarden,
> >
> > Since it's a bounded source you could try with Sql transformation
> > grouping by the timestamp column. Here are some examples of grouping:
> >
> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
> >
> > However, if you want to add a timestamp column in addition to the
> > original CSV records then, there are multiple ways to achieve that.
> >
> > 1) MapToFields:
> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> > [Your timestamp column could be a callable to get the current
> > timestamp on each record]
> >
> > 2) If you need an extra layer of transformation complexity I would
> > recommend creating a custom transformation:
> >
> > # - type: MyCustomTransform
> > # name: AddDateTimeColumn
> > # config:
> > # prefix: 'whatever'
> >
> > providers:
> > - type: 'javaJar'
> > config:
> > jar: 'gs://path/of/the/java.jar'
> > transforms:
> > MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'
>
> Alternatively you can use PyTransform, if you're more comfortable with
> that by invoking it via its fully qualified name.
>
> pipeline:
>   transforms:
> ...
> - type: MyAssignTimestamps
>   config:
>   kwarg1: ...
>   kwarg2: ...
>
> providers:
>   type:python
>   config:
> packages: ['py_py_package_identifier']
>   transforms:
> MyAssignTimestamps:
> fully_qualified_package.module.AssignTimestampsPTransform
>
>
>
> > Best,
> > Ferran
> >
> > El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> > escribió:
> > >
> > > Hi all,
> > > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > > implement some windowing logic to it.
> > > Was wondering what is the right way to add timestamps to each element, 
> > > assuming I have a column including a timestamp.
> > >
> > > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > > this can be implemented and used from yaml prespective.
> > >
> > > Thanks
> > > Yarden


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Robert Bradshaw via dev
This does appear to be a significant missing feature. I'll try to make
sure something easier gets in by the next release. See also below.

On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
 wrote:
>
> Hi Yarden,
>
> Since it's a bounded source you could try with Sql transformation
> grouping by the timestamp column. Here are some examples of grouping:
>
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
>
> However, if you want to add a timestamp column in addition to the
> original CSV records then, there are multiple ways to achieve that.
>
> 1) MapToFields:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> [Your timestamp column could be a callable to get the current
> timestamp on each record]
>
> 2) If you need an extra layer of transformation complexity I would
> recommend creating a custom transformation:
>
> # - type: MyCustomTransform
> # name: AddDateTimeColumn
> # config:
> # prefix: 'whatever'
>
> providers:
> - type: 'javaJar'
> config:
> jar: 'gs://path/of/the/java.jar'
> transforms:
> MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Alternatively you can use PyTransform, if you're more comfortable with
that by invoking it via its fully qualified name.

pipeline:
  transforms:
...
- type: MyAssignTimestamps
  config:
  kwarg1: ...
  kwarg2: ...

providers:
  type:python
  config:
packages: ['py_py_package_identifier']
  transforms:
MyAssignTimestamps:
fully_qualified_package.module.AssignTimestampsPTransform



> Best,
> Ferran
>
> El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> escribió:
> >
> > Hi all,
> > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > implement some windowing logic to it.
> > Was wondering what is the right way to add timestamps to each element, 
> > assuming I have a column including a timestamp.
> >
> > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > this can be implemented and used from yaml prespective.
> >
> > Thanks
> > Yarden


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Ferran Fernández Garrido
Hi Yarden,

Since it's a bounded source you could try with Sql transformation
grouping by the timestamp column. Here are some examples of grouping:

https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml

However, if you want to add a timestamp column in addition to the
original CSV records then, there are multiple ways to achieve that.

1) MapToFields:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
[Your timestamp column could be a callable to get the current
timestamp on each record]

2) If you need an extra layer of transformation complexity I would
recommend creating a custom transformation:

# - type: MyCustomTransform
# name: AddDateTimeColumn
# config:
# prefix: 'whatever'

providers:
- type: 'javaJar'
config:
jar: 'gs://path/of/the/java.jar'
transforms:
MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Here a good example of how to do that in Java:
https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java

Best,
Ferran

El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () escribió:
>
> Hi all,
> Im quite new to using beam yaml. I am working with a CSV file and want to 
> implement some windowing logic to it.
> Was wondering what is the right way to add timestamps to each element, 
> assuming I have a column including a timestamp.
>
> I am aware of Beam Programming Guide (apache.org) part but not sure how this 
> can be implemented and used from yaml prespective.
>
> Thanks
> Yarden


[YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Yarden BenMoshe
Hi all,
Im quite new to using beam yaml. I am working with a CSV file and want to
implement some windowing logic to it.
Was wondering what is the right way to add timestamps to each element,
assuming I have a column including a timestamp.

I am aware of Beam Programming Guide (apache.org)

part
but not sure how this can be implemented and used from yaml prespective.

Thanks
Yarden