For the compilation error, have you tried?
Long maxLatency = e.get((TupleTag<Long>) (TupleTag) finalOutputTags.get(0));

I'm not sure whether I fully understand the problem.



On Sat, May 27, 2017 at 2:15 AM, 郭亚峰(默岭) <yafeng....@alibaba-inc.com> wrote:

>
> Hi there,
> I'm working with a small DSL project on top of beam sdk. In the DSL I will
> define some aggregations on top of PCollection typed in "Row", a data type
> I defined which contains data stored in "Object" and type stored in
> "TypeDescriptor", both are member variables of a "Row".
> I tried to leverage/re-use aggregation functions in SDK not re-invent
> wheels. However that seems quite challange to me. Especially when I try to
> create a ComposedCombineFn, because both "CombineFn" and "TupleTag" will be
> generated during expression parse/compile time, and I don't know how to
> pass type variable to CombineFns.compose() method. -- I clearly know the
> type information, as based on AST generated I will get the TypeDescriptor
> of return value of every "CombineFn", but just I don't know how to
>  manipulate that programmatically to make it work.
>
> some code framents which I used to verify..(not full code, just something
> to demostrate the embracing situation I have), notice this even won't
> compile with error: no instance(s) of type variable(s) exist so that
> capture etc. at line "
>
>                         Long maxLatency = e.get(finalOutputTags.get(0));
>
> "
>
> SqlParsedStatement statement = SqlParser.parse(expressions);
>
> SqlExecutableStatement exe = SqlParser.compile(statement, schema);
>
> RowSchema schema = exe.getSchema();
>
> List<TupleTag<?>> outputTags = new ArrayList<>();
>
> for (CellMeta meta : schema.getCellsMeta()) {
>
>     switch (DslTypes.getTypeName(meta.getType())) {
>         case "boolean":
>             outputTags.add(new TupleTag<Boolean>() {});
>
>         case "bigint":
>             outputTags.add(new TupleTag<Long>() {});
>
>         case "double":
>             outputTags.add(new TupleTag<Double>() {});
>
>         case "timestamp":
>             outputTags.add(new TupleTag<Timestamp>() {});
>
>         case "varchar":
>             outputTags.add(new TupleTag<String>() {});
>
>         case "binary":
>             outputTags.add(new TupleTag<byte[]>() {});
>
>         default:
>             outputTags.add(new TupleTag<Void>() {});
>
>     }
> }
>
> final TupleTagList finalOutputTags = TupleTagList.of(outputTags);
>
> SimpleFunction<Long, Long> identityFn =
>     new SimpleFunction<Long, Long>() {
>         @Override
>         public Long apply(Long input) {
>             return input;
>         }
>     };
>
>
> //below code is a mock. in real case I will create CombineFn during 
> expression compile time.
> //and the fns number is uncertain, up to dsl definitions.
> CombineFn fn1 = Max.ofLongs();
> CombineFn fn2 = Max.ofLongs();
>
> ComposedCombineFn composedCombineFn = CombineFns.compose().with(identityFn, 
> fn1, finalOutputTags.get(0));
> composedCombineFn = composedCombineFn.with(identityFn, fn2, 
> finalOutputTags.get(1));
>
> PCollection<CoCombineResult> maxAndMean = testPipeline
>     .apply(Create.of(1L, 2L, 3L, 4L))
>     .apply(
>         Combine.globally(composedCombineFn));
>
> PCollection<Long> finalResultCollection = maxAndMean
>         .apply(ParDo.of(
>                 new DoFn<CoCombineResult, Long>() {
>                     @ProcessElement
>                     public void processElement(ProcessContext c) throws 
> Exception {
>                         CoCombineResult e = c.element();
>                         Long maxLatency = e.get(finalOutputTags.get(0));
>                         Long meanLatency = e.get(finalOutputTags.get(1));
>                         System.out.println (maxLatency + meanLatency);
>                         c.output(maxLatency + meanLatency);
>                     }
>                 }))
>     ;
>
> testPipeline.run();
>
>

Reply via email to