????????????????????????????????????????????????????udf????????????????????????????????????????????????????????????????????????????udf????????




------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<libenc...@apache.org&gt;;
????????:&nbsp;2020??6??9??(??????) ????2:47
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: Flink SQL UDF ????????



??????????????????????????pb format??????????????????????????????????????????
??????format????????????pb????????class??????????class????????table??schema??
??????????????????????????pb????????????flink????????????????

??????????????????????????????????????????pb format????????????????????????
????????????????jira????????jira??????????????????????????

1048262223 <1048262...@qq.com&gt; ??2020??6??9?????? ????2:23??????

&gt; Hi
&gt;
&gt;
&gt;
&gt; 
????????????pb????????????????????????????????????????????????pb??schema(descriptor)??????TypeInformation????????env.addSource().returns()??????????????????TypeInformation??????TypeInformation??????????????????????
&gt;
&gt;
&gt;
&gt; 
??????????????????udf????????????udf????????????????????????????????????????????????????????????????????????????????????????????????????????????????????udf??????????????????????????????udf????????????????????????????????udf????????????????????????????????????
&gt;
&gt;
&gt; ??????????????????????????????
&gt;
&gt;
&gt; Best,
&gt; Yichao Yang
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"forideal"<fszw...@163.com&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??9??(??????) ????1:33
&gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Flink SQL UDF ????????
&gt;
&gt;
&gt;
&gt; ????????????????
&gt;
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ?????????? Flink 1.10 
Blink Planer??
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ????????????Flink UDF 
?????? UDF ??????????????????????????????????
&gt;
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ??????????????????????
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ????1: ?????????????? 
pb ?? bytes????????????????????????????????????
&gt; string???????????? cast ?????????????????? get_int??get_double??get_string 
????????????????????????????
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ????2: ?????????????? Json 
????????????
&gt; &amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ??????1???????????? Flink 
?????????? ScalarFunction
&gt; ????????????????????????Flink ?????? scalar function 
????????????????????????
&gt; @Override
&gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
&gt; // ?????????????????????????????????????????? paramNames ???? pb ?? schema 
????????????????????????????????????????????
&gt; }
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ?????????????????????????? workaround 
????????????????????work????????????????????????
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; ????case 
?????????????????????????????????????? RAW('java.lang.Object', ?)
&gt; ?????????????? cast ??????????????????
&gt; public class TimestampTest extends ScalarFunction {
&gt;
&gt; public Object eval(long timestamp, String pattern, int num) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
Timestamp timestamp1 = new
&gt; Timestamp(timestamp);
&gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if 
(num < 4) {
&gt; //???? STRING ????
&gt; return String.valueOf(timestamp);
&gt; }
&gt; if (num < 6) {
&gt; //???? BIGINT
&gt; return timestamp - 100;
&gt; }
&gt; if (num < 8) {
&gt; //???? DOUBLE
&gt; double ss = 0.9;
&gt; 
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 return
&gt; (double) timestamp + ss;
&gt; }
&gt; //???? STRING
&gt; return sdf.format(timestamp1);
&gt; }
&gt; }

回复