You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.
From: Jonathan Perron <jonathan.per...@lumapps.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Monday, 1 October 2018 at 3:14 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python Hello everybody, I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python. I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore). Here is what I achieved so far: class ExtractFromPostgreSQLFn(beam.DoFn): """ Extract PCollection from PostgreSQL """ def start_bundle(self): self._session = Session() def process(self, element): raise NotImplementedError def finish_bundle(self): self._session.close() class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn): def start_bundle(self, arg1, arg2): self._arg1 = arg1 self._arg2 = arg2 def process(self, element): entities = ( self._session.query(Entity) .filter(Entity.arg1 == self._arg1) .filter(Entity.arg2 == self._arg2) .all() ) return (self._arg1, entities) class ExtractEntity(PTransform): """ A ```PTransform``` Extract of the Entity. """ def __init__(self, arg1, arg2="enabled"): """ Inializes ```ExtractEntity``` Args: arg1 arg2 """ if not arg1: raise ValueError("arg1 cannot be empty") self._arg1 = arg1 self._arg2 = arg2 def expand(self, pcoll): """ This is a composite transform involves the following: 1. Create a query object 2. Run the query against the database. """ database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo( ReadEntityFromPostgresqlFn(self._arg1, self.arg2) ) return database_entities The PTransform is called at the very beginning of my pipeline: p = beam.Pipeline(options=pipeline_options) database = ( p | "Extract datastore users" >> ExtractDatastoreUsers(arg1="123456") ) It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'. Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point). I have thus several questions: 1) Does anyone know why what I am trying to achieve is not working ? 2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ? 3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ? 4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ? Many thanks in advance for your time and help ! Jonathan