???y???l?F????flink??????????????????????
??????????????????????????????????????????????????????????????????????????????
??????????????1??????map??????????????2?? ????????????????????
??????????????????????????????????????????sink??????????1??
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TimerMain4 {
public static void main(String[] args) throws Exception {
Logger LOG = LoggerFactory.getLogger(TimerMain4.class);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.addSource(new MySourceTuple())
.filter(new RichFilterFunction<Tuple2<String, Long>>() {
private transient volatile Statement sts1;
private transient volatile Connection conn1;
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
conn1 = DriverManager.getConnection("", "", "");
LOG.info("connection --- " + conn1);
sts1 = conn1.createStatement();
}
@Override
public boolean filter(Tuple2<String, Long> value) {
return true;
}
@Override
public void close() throws Exception {
super.close();
sts1.close();
conn1.close();
}
})
.map(Tuple2::toString)
.setParallelism(1)
.addSink(new RichSinkFunction<String>() {
private transient volatile PreparedStatement sts2;
private transient volatile Connection conn2;
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");
conn2 = DriverManager.getConnection("", "", "");
LOG.info("connection --- " + conn2);
sts2 = conn2.prepareStatement("");
}
@Override
public void close() throws Exception {
super.close();
sts2.close();
conn2.close();
}
@Override
public void invoke(String value, Context context) {
LOG.info(value);
}
}).setParallelism(1);
env.execute();
}
}
class MySourceTuple implements SourceFunction<Tuple2<String, Long>> {
private Boolean isRunning = true;
List<String> names = new ArrayList();
private final Random random = new Random();
Long number = 1L;
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
names.add("??");
names.add("??");
names.add("??");
names.add("??");
while (isRunning) {
int index = random.nextInt(4);
ctx.collect(new Tuple2<>(names.get(index), number));
number += 1;
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
[email protected]
???????? Yichao Yang
?????????? 2020-07-09 13:11
???????? user-zh
?????? ?????? ?????????? richfunction????????????????????????
Hi,
??????????????????????????????????????????????????????????????????????????
Best,
Yichao Yang
------------------ ???????? ------------------
??????: "[email protected]"<[email protected]>;
????????: 2020??7??9??(??????) ????11:31
??????: "user-zh"<[email protected]>;
????: ????: ?????????? richfunction????????????????????????
FlinkKafkaConsumer<Bill> kafkaConsumer = new
FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(),
props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill> process =
env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL,
TrafficConstants.HIVEUSERNAME,
TrafficConstants.HIVEPASSWORD)).name("??????????")
.keyBy((KeySelector<Bill, String>) value -> value.getUser_id() +
value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo> map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME,
TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");
??????????????Kafka????-->??????richfilter????????hive??????????????????-->keyby-->process-->??????sink????
public class HiveFilterFunction extends RichFilterFunction<Bill> {
Logger LOG =
LoggerFactory.getLogger(HiveFilterFunction.class);
private final String jdbcUrl;
private final String username;
private final String password;
private transient volatile Statement sts;
private transient volatile Connection connection;
Map<String, String> map = new ConcurrentHashMap();
public HiveFilterFunction(String jdbcUrl, String username,
String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
connection =
DriverManager.getConnection(jdbcUrl, username, password);
LOG.info("hive connection --- " +
connection);
sts = connection.createStatement();
query();
}
@Override
public boolean filter(Bill value) {
return
map.containsKey(value.getIntegrate_item_code())
&& TrafficConstants.getProCode().contains(value.getProvince_code());
}
@Override
public void close() throws Exception {
super.close();
assert null != sts ;
assert null != connection ;
sts.close();
connection.close();
}
private void query() throws Exception {
ResultSet resultSet = null;
try {
sts.execute(TrafficConstants.SETSQL);
resultSet =
sts.executeQuery(TrafficConstants.CODESQL);
while
(resultSet.next()) {
map.put(resultSet.getString("charge_code_cbss"), "");
}
} catch (Exception e) {
LOG.error("hive error", e);
throw new
Exception(e);
} finally {
assert
resultSet != null;
resultSet.close();
}
LOG.info("hive ????????????????");
}
}
public class RdsFlowSink extends RichSinkFunction<BillInfo>{
Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
private final String url;
private final String name;
private final String password;
private transient volatile PreparedStatement insertStatement;
private transient volatile Connection connection;
private transient volatile Counter counter = null;
public RdsFlowSink(String url, String name, String password)
{
this.url = url;
this.name = name;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection =
DriverManager.getConnection(url,name,password);
LOG.info("connection --- " +
connection);
counter =
getRuntimeContext().getMetricGroup().counter("counter");
insertStatement =
connection.prepareStatement(TrafficConstants.FLOWSQL);
}
@Override
public void invoke(BillInfo value, Context context) throws
Exception {
try {
insertStatement.setString(1,value.getSerial_number());
insertStatement.setString(2,value.getUser_id());
insertStatement.setString(3,value.getIntegrate_item_code());
insertStatement.setString(4,value.getFee());
insertStatement.setString(5,value.getCity_code());
counter.inc(1);
insertStatement.execute();
}catch (Exception
e){
LOG.info("invoke --- " + connection);
LOG.error(e.getMessage());
throw new
Exception(e);
}
}
@Override
public void close() throws Exception {
super.close();
assert insertStatement != null;
assert connection != null;
insertStatement.close();
connection.close();
}
}
???????????????????? Class.forName("org.apache.hive.jdbc.HiveDriver"); ????
Class.forName("com.mysql.jdbc.Driver"); ????
[email protected]
???????? JasonLee
?????????? 2020-07-08 18:46
???????? user-zh
?????? ?????????? richfunction????????????????????????
hi
???????????????????????????????????????????? ????????????????????????
????????????????????????
| |
JasonLee
|
|
[email protected]
|
Signature is customized by Netease Mail Master
??2020??07??08?? 18:[email protected] ??????
??????
??????flink1.10.1????streamapi????????????????????richfunction??
????????Class.forName("*****");
????????????????????????????????????????????????????????????????????????????????????
[email protected]