udf???????????????? org.apache.flink.types.Row[]??????????????????

------------------ ???????? ------------------
??????:&nbsp;"Benchao Li"<[email protected]&gt;;
????????:&nbsp;2020??5??20??(??????) ????6:51
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: flink1.10.x ???? arrar<row&gt; ????



????????????UDF????????????????????????????????????????????????????????????????????????????????????

?????????????? <[email protected]&gt; ??2020??5??20?????? ????4:25??????

&gt; 1.blink_planner ????ddl????array??????????????select ????????????????????
&gt;&nbsp; 2.blink_planner 
????????????????????????????????????????????????flink????????????
&gt;
&gt;
&gt; 3.????????flink-planner????????????
&gt;
&gt;
&gt;
&gt; CREATE TABLE sourceTable (
&gt;
&gt; &amp;nbsp;event_time_line array<ROW (
&gt;
&gt; &amp;nbsp; `rule_name` VARCHAR,
&gt;
&gt; &amp;nbsp; `count` VARCHAR
&gt;
&gt; &amp;nbsp;)&amp;gt;
&gt;
&gt; ) WITH (
&gt;
&gt; &amp;nbsp;'connector.type' = 'kafka',
&gt;
&gt; &amp;nbsp;'connector.version' = 'universal',
&gt;
&gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset',
&gt;
&gt; &amp;nbsp;'connector.topic' = 'topic_test_1',
&gt;
&gt; &amp;nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt;
&gt; &amp;nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
&gt;
&gt; &amp;nbsp;'update-mode' = 'append',
&gt;
&gt; &amp;nbsp;'format.type' = 'json',
&gt;
&gt; &amp;nbsp;'format.derive-schema' = 'true'
&gt;
&gt; );
&gt;
&gt; --????????????
&gt;
&gt; select event_time_line from sourceTable ;
&gt;
&gt; --??????????????????????????value??????????????????????size??????????
&gt;
&gt; select type_change(event_time_line) from sourceTable ;
&gt;
&gt; &amp;nbsp;
&gt;
&gt; public class TypeChange extends ScalarFunction {
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /**
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; *
&gt; 
??null??????????????????????????????????????????????????????????planner????????????????
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @param rows
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @return
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String eval(Row [] rows){
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp;return
&gt; JSONObject.toJSONString(rows);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt;
&gt; &amp;nbsp;
&gt;
&gt; }



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复