streamOperator
.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<EventItem>() {
@Override
public long extractAscendingTimestamp(EventItem eventItem) {
return eventItem.getWindowEnd();
}
})
.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
.keyBy(1)
.timeWindow(Time.minutes(5))
.aggregate(new AccumulatorAggregateFunction<>(), (WindowFunction<Long,
EventItem, Tuple, TimeWindow>) (key, timeWindow, iterable, collector) -> {
String newId = ((Tuple1<String>) key).f0;
Long count = iterable.iterator().next();
collector.collect(EventItem.of(newId, timeWindow.getEnd(), count));
})
.keyBy(1)
.process(new KeyedProcessFunction<Tuple, EventItem, Tuple2<String,
Long>>() {
private MapState<String, Long> itemState;
private ValueState<Long> dayState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Long> mapStateDescriptor = new
MapStateDescriptor<>("ei_pv", TypeInformation.of(String.class),
TypeInformation.of(Long.class));
itemState = getRuntimeContext().getMapState(mapStateDescriptor);
dayState = getRuntimeContext().getState(new
ValueStateDescriptor<Long>("day_state", TypeInformation.of(Long.class)));
dayState.update((long) currentDay(System.currentTimeMillis()));
}
private int currentDay(long epochDay) {
return LocalDate.ofEpochDay(epochDay).getDayOfYear();
}
@Override
public void processElement(EventItem input, Context context,
Collector<Tuple2<String, Long>> collector) throws Exception {
String ei = input.getItemId();
Long cnt = itemState.get(ei);
long viewCount = input.getViewCount();
cnt = cnt != null ? cnt + viewCount : viewCount;
itemState.put(ei, cnt);
context.timerService().registerEventTimeTimer(input.getWindowEnd() + 5000);
}
@Override
public void onTimer(long time, OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
int currentDay = currentDay(time);
boolean isCurrentDay = currentDay == dayState.value();
if (!isCurrentDay) {
itemState.clear();
dayState.update((long) currentDay);
}
for (Map.Entry<String, Long> entry : itemState.entries()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
})
.addSink(textLongSink);
????????????????
------------------ Original ------------------
From: "?? ??"<[email protected]>;
Date: Tue, Mar 5, 2019 01:32 PM
To: "user-zh"<[email protected]>;
Subject: Re: ????????????????????????????????????????
????????????????????????????
).Flink Stream????????????????????????????????????????????????????????
).??????????
ProcessAllWindowFunction,????????????????Window??????????????????????????????????Window??????operator??????????????
).??????????????????????????????????????????????????????????????(??Redis????)
).??????????ProcessAllWIndowFunction??????????????(????????: WordCount
????(??????????????????) )
package
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort
import java.time.ZoneId
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}
import org.apache.flink.util.Collector
import scala.collection.mutable
/**
* nc -lk 1234 ????????
*/
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {
def getConfiguration(isDebug:Boolean = false):Configuration={
val configuration : Configuration = new Configuration()
if(isDebug){
val timeout = "100000 s"
val timeoutHeartbeatPause = "1000000 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",10000000)
configuration.setInteger("heartbeat.timeout",50000000)
}
configuration
}
def main(args: Array[String]): Unit = {
val port = 1234
// get the execution environment
// val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val configuration : Configuration = getConfiguration(true)
val env:StreamExecutionEnvironment =
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')
import org.apache.flink.streaming.api.scala._
val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w =>
WordWithCount(w,1))
.keyBy("word")
//??????window????????????????????????ProcessAllWindowFunction????????????(????????????????????key????????)
//??????window????????????????????????????
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new
ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
override def process(context: Context, elements:
Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
val set = new mutable.HashSet[WordWithCount]{}
for(wordCount <- elements){
if(set.contains(wordCount)){
set.remove(wordCount)
set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
}else{
set.add(wordCount)
}
}
val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word)
< 0 )
for(wordCount <- sortSet) out.collect(wordCount)
}
})
//.countWindow(3)
//.countWindow(3,1)
//.countWindowAll(3)
//textResult.print().setParallelism(1)
val bucketingSink = new
BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")
bucketingSink.setBucketer(new
DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
//bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
//bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
//bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
//bucketingSink.setBatchSize(100 ) // this is 400 MB,
bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
//bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
//setInactiveBucketCheckInterval
//setInactiveBucketThreshold
//??????????????????Sink??????????????????????????????????????
bucketingSink.setInactiveBucketThreshold(2 * 1000)
bucketingSink.setAsyncTimeout(1 * 1000)
dataStreamDeal.setParallelism(1)
.addSink(bucketingSink)
if(args == null || args.size ==0){
env.execute("????????")
//????????
//println(env.getExecutionPlan)
//StreamGraph
//println(env.getStreamGraph.getStreamingPlanAsJSON)
//JsonPlanGenerator.generatePlan(jobGraph)
}else{
env.execute(args(0))
}
println("????")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
/* abstract private class OrderWindowFunction extends
ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {
}*/
}
---------------------------------------------------------------------------------------------------------------------------------------
> ?? 2019??3??5????????1:16???????? <[email protected]> ??????
>
> ????????
> ????????????????????????stream api??????????????????????????????????????
> ????!
---------------------------------------------------------------------------------------------------------------------------------------