??????????????kafka??????????????????????????????????????????????????????????????????????????????????
????????????????????????????????????????????????????????????????????
????????????????
//????????????kafka????
DataStreamSource<String> dataStreamSource =
KafkaConfigUtil.buildKafka(env).setParallelism(1);
//??????????????????????????
SingleOutputStreamOperator<OrderDetail> orderDetails =
dataStreamSource.flatMap(new OrderSplitService())
.setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5));
//????
SingleOutputStreamOperator<OrderDetail> simpleResults =
orderDetails.flatMap(new OrderDetailFilterService())
.setParallelism(parameterTool.getInt(STREAM_PARALLELISM, 5));
Table orderDetailTable = tableEnv.fromDataStream(simpleResults,
$("orderNo"),
$("memberId"),$("merchantId"),$("storeId"),$("internalId"),$("type"),
$("quantity"),$("unitPrice"),$("gmtPaidLong"),$("gmtPaid"),$("gmtPaidTimeStamp"),$("userActionTime").proctime());????????????????????????????????????????????kafka??????????????????????????????????????????????????????