Hi

I am writing a Generic pipeline which will be executed for 1000's of files and 
tables. 

many of transformations are based on complex rules. So I took out the complex 
rules outside dataflow job and calculated it outside DF job and written complex 
SQL's in a BQ table.

Now i want to read that BQ table, extract metdata from its columns and want to 
use it in Beam pipeline.

// 1. Read Metadata
        PCollection<TableRow> metaData =
                p.apply(format("Read: %s", metadataTable), 
BigQueryIO.readTableRowsWithSchema()
                        .from(metadataTable)
                        .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
                        
.withSelectedFields(Lists.newArrayList("stdBQTable","dlpSelectQuery", 
"dlpJoinQuery","dlpHeaderMapStr","deidTemplateURI")));

        // 2. Extract Variables
        String select_metadata_qry = format("select * from PCOLLECTION where 
databaseName = '%s' and tableName = '%s' limit 1", dataBaseName, tableName);

        PCollectionView<Map<String,String>> metaDataRow =
                metaData.apply("Metadata View", 
SqlTransform.query(select_metadata_qry))
                                               .apply(ParDo.of(new 
GetFieldfromBQRow()))
                                               
.apply(View.<String,String>asMap());

Now I want to use these sql queries in SQL transforms and I want to use what I 
read from BQ. 
PCollection<Row> piiData = allData.apply("Select PII", 
SqlTransform.query(<READFROMBQTABLE>));

I am new to Beam, these kind of operations are very easy to do in Spark.
Can you please guide how I can read value of sql query from a BQ row and use 
that in my pipeline downstream operations ?

Reply via email to