For the compilation error, have you tried? Long maxLatency = e.get((TupleTag<Long>) (TupleTag) finalOutputTags.get(0));
I'm not sure whether I fully understand the problem. On Sat, May 27, 2017 at 2:15 AM, 郭亚峰(默岭) <yafeng....@alibaba-inc.com> wrote: > > Hi there, > I'm working with a small DSL project on top of beam sdk. In the DSL I will > define some aggregations on top of PCollection typed in "Row", a data type > I defined which contains data stored in "Object" and type stored in > "TypeDescriptor", both are member variables of a "Row". > I tried to leverage/re-use aggregation functions in SDK not re-invent > wheels. However that seems quite challange to me. Especially when I try to > create a ComposedCombineFn, because both "CombineFn" and "TupleTag" will be > generated during expression parse/compile time, and I don't know how to > pass type variable to CombineFns.compose() method. -- I clearly know the > type information, as based on AST generated I will get the TypeDescriptor > of return value of every "CombineFn", but just I don't know how to > manipulate that programmatically to make it work. > > some code framents which I used to verify..(not full code, just something > to demostrate the embracing situation I have), notice this even won't > compile with error: no instance(s) of type variable(s) exist so that > capture etc. at line " > > Long maxLatency = e.get(finalOutputTags.get(0)); > > " > > SqlParsedStatement statement = SqlParser.parse(expressions); > > SqlExecutableStatement exe = SqlParser.compile(statement, schema); > > RowSchema schema = exe.getSchema(); > > List<TupleTag<?>> outputTags = new ArrayList<>(); > > for (CellMeta meta : schema.getCellsMeta()) { > > switch (DslTypes.getTypeName(meta.getType())) { > case "boolean": > outputTags.add(new TupleTag<Boolean>() {}); > > case "bigint": > outputTags.add(new TupleTag<Long>() {}); > > case "double": > outputTags.add(new TupleTag<Double>() {}); > > case "timestamp": > outputTags.add(new TupleTag<Timestamp>() {}); > > case "varchar": > outputTags.add(new TupleTag<String>() {}); > > case "binary": > outputTags.add(new TupleTag<byte[]>() {}); > > default: > outputTags.add(new TupleTag<Void>() {}); > > } > } > > final TupleTagList finalOutputTags = TupleTagList.of(outputTags); > > SimpleFunction<Long, Long> identityFn = > new SimpleFunction<Long, Long>() { > @Override > public Long apply(Long input) { > return input; > } > }; > > > //below code is a mock. in real case I will create CombineFn during > expression compile time. > //and the fns number is uncertain, up to dsl definitions. > CombineFn fn1 = Max.ofLongs(); > CombineFn fn2 = Max.ofLongs(); > > ComposedCombineFn composedCombineFn = CombineFns.compose().with(identityFn, > fn1, finalOutputTags.get(0)); > composedCombineFn = composedCombineFn.with(identityFn, fn2, > finalOutputTags.get(1)); > > PCollection<CoCombineResult> maxAndMean = testPipeline > .apply(Create.of(1L, 2L, 3L, 4L)) > .apply( > Combine.globally(composedCombineFn)); > > PCollection<Long> finalResultCollection = maxAndMean > .apply(ParDo.of( > new DoFn<CoCombineResult, Long>() { > @ProcessElement > public void processElement(ProcessContext c) throws > Exception { > CoCombineResult e = c.element(); > Long maxLatency = e.get(finalOutputTags.get(0)); > Long meanLatency = e.get(finalOutputTags.get(1)); > System.out.println (maxLatency + meanLatency); > c.output(maxLatency + meanLatency); > } > })) > ; > > testPipeline.run(); > >