????????????????????????????????????????????????????udf????????????????????????????????????????????????????????????????????????????udf????????
------------------ ???????? ------------------ ??????: "Benchao Li"<libenc...@apache.org>; ????????: 2020??6??9??(??????) ????2:47 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: Flink SQL UDF ???????? ??????????????????????????pb format?????????????????????????????????????????? ??????format????????????pb????????class??????????class????????table??schema?? ??????????????????????????pb????????????flink???????????????? ??????????????????????????????????????????pb format???????????????????????? ????????????????jira????????jira?????????????????????????? 1048262223 <1048262...@qq.com> ??2020??6??9?????? ????2:23?????? > Hi > > > > ????????????pb????????????????????????????????????????????????pb??schema(descriptor)??????TypeInformation????????env.addSource().returns()??????????????????TypeInformation??????TypeInformation?????????????????????? > > > > ??????????????????udf????????????udf????????????????????????????????????????????????????????????????????????????????????????????????????????????????????udf??????????????????????????????udf????????????????????????????????udf???????????????????????????????????? > > > ?????????????????????????????? > > > Best, > Yichao Yang > > > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"forideal"<fszw...@163.com&gt;; > ????????:&nbsp;2020??6??9??(??????) ????1:33 > ??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > ????:&nbsp;Flink SQL UDF ???????? > > > > ???????????????? > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ?????????? Flink 1.10 Blink Planer?? > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ????????????Flink UDF ?????? UDF ?????????????????????????????????? > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ?????????????????????? > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ????1: ?????????????? pb ?? bytes???????????????????????????????????? > string???????????? cast ?????????????????? get_int??get_double??get_string ???????????????????????????? > &nbsp;&nbsp;&nbsp;&nbsp; ????2: ?????????????? Json ???????????? > &nbsp; > &nbsp;&nbsp;&nbsp;&nbsp; ??????1???????????? Flink ?????????? ScalarFunction > ????????????????????????Flink ?????? scalar function ???????????????????????? > @Override > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { > // ?????????????????????????????????????????? paramNames ???? pb ?? schema ???????????????????????????????????????????? > } > &nbsp;&nbsp;&nbsp; ?????????????????????????? workaround ????????????????????work???????????????????????? > &nbsp;&nbsp;&nbsp; ????case ?????????????????????????????????????? RAW('java.lang.Object', ?) > ?????????????? cast ?????????????????? > public class TimestampTest extends ScalarFunction { > > public Object eval(long timestamp, String pattern, int num) { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new > Timestamp(timestamp); > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) { > //???? STRING ???? > return String.valueOf(timestamp); > } > if (num < 6) { > //???? BIGINT > return timestamp - 100; > } > if (num < 8) { > //???? DOUBLE > double ss = 0.9; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > (double) timestamp + ss; > } > //???? STRING > return sdf.format(timestamp1); > } > }