Hello, If I understood correctly you read from a file some parameters that you are going to use to prepare an HBase Scan. If this is the case you cannot do this with the current HBaseIO API, but there is ongoing work to support this transparently with the new SDF API. If you want to track the progress this is the JIRA https://issues.apache.org/jira/browse/BEAM-4020 Hopefully it will be ready in the following days/weeks.
In the meantime you can do a workaround by applying a ParDo after you extract the scan parameters from the files and then do a DoFn to request the data, something similar to what SDF is doing, for ref: https://github.com/iemejia/beam/blob/2f9b54c6efa1c97c4b030a9b1af44b1327541e5f/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L37 Hope this helps, Ismaël On Thu, Jul 5, 2018 at 4:53 AM Frank Li <[email protected]> wrote: > hello,every > I'm running a Beam pipeline which uses the TextIO read same text > from text file, PTransform peer line search hbase. result is > PCollection<PCollection<KV<String, > RecordData>>> > > > @Override > public PCollection<PCollection<KV<String, RecordData>>> > expand(PCollection<String> lines) { > > PCollection<PCollection<KV<String, RecordData>>> results = lines > .apply(ParDo.of(new DoFn<String, PCollection<KV<String, RecordData>>>() { > @ProcessElement > public void processElement(ProcessContext c) { > String vin = c.element(); > > Pipeline pipelineHbase = Pipeline.create(c.getPipelineOptions()); > > HBaseIO.Read read = > HBaseIO.read().withConfiguration(conf).withTableId(hbaseTable).withKeyRange( > Bytes.toBytes(String.format("%s-%s", vin, startTime)), > Bytes.toBytes(String.format("%s-%s", vin, endTime))); > PCollection<Result> results = pipelineHbase.apply(read); > > PCollection<KV<String, RecordData>> recordResults = results > .apply(ParDo.of(new Result2RecordNoModifyDataFn())); > > c.output(recordResults); > } > > })); > > return results; > } > > > what process PCollection<PCollection<KV<String, RecordData>>> ???? > >
