You can’t call transform function as a very first step in any pipeline. First 
block should be always source and followed by any number of transforms based 
upon your use case.

From: Jonathan Perron <jonathan.per...@lumapps.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Monday, 1 October 2018 at 3:14 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python


Hello everybody,

I am new with Apache Beam and I need some advices on own to write properly a 
custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I followed 
the examples in the documentation for Pub/Sub 
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub)
 and Datastore 
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
        self._session = Session()

    def process(self, element):
        raise NotImplementedError

    def finish_bundle(self):
        self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
        self._arg1 = arg1
        self._arg2 = arg2

    def process(self, element):
        entities = (
            self._session.query(Entity)
            .filter(Entity.arg1 == self._arg1)
            .filter(Entity.arg2 == self._arg2)
            .all()
        )
        return (self._arg1, entities)


class ExtractEntity(PTransform):
    """
    A ```PTransform``` Extract of the Entity.
    """

    def __init__(self, arg1, arg2="enabled"):
        """
        Inializes ```ExtractEntity```
        Args:
            arg1
            arg2
        """
        if not arg1:
            raise ValueError("arg1 cannot be empty")

        self._arg1 = arg1
        self._arg2 = arg2

    def expand(self, pcoll):
        """
        This is a composite transform involves the following:
        1. Create a query object
        2. Run the query against the database.
        """
        database_entities = pcoll.pipeline | "Extract datastore users" >> 
beam.ParDo(
            ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
        )
        return database_entities

The PTransform is called at the very beginning of my pipeline:

p = beam.Pipeline(options=pipeline_options)
database = (
    p
    | "Extract datastore users"

    >> ExtractDatastoreUsers(arg1="123456")

)

It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'.

Please note that it's only a draft (I will extract several entities from the 
database, so the query will not be "hard-coded" but passed as a parameter at 
some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform which 
executes ParDo operations, or should I go directly with ParDo operations ?

3) Apart from 
https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations,
 is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy 
is fine ?

Many thanks in advance for your time and help !

Jonathan




Reply via email to