Hey Fabian, it seems as simple as this ?
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Collection; import java.util.Collections; /** * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the * elements and the date zone the offset it computed off. Windows cannot overlap. * * <p>For example, in order to window into windows of 1 minute: * <pre> {@code * DataStream<Tuple2<String, Integer>> in = ...; * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...); * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = * keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1), ZoneId.of(AMERICA_NEW_YORK_ZONE))); * } </pre> */ public class TimeZoneAwareTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final ZoneId zoneID; protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) { this.size = size; this.zoneID = zoneID; } /** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp. * * @param size The size of the generated windows. * @return The time policy. */ public static TimeZoneAwareTumblingEventTimeWindows of(Time size, ZoneId zoneID) { return new TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID); } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present ZonedDateTime dateTime = Instant.ofEpochMilli(timestamp).atZone(zoneID); long start = TimeWindow.getWindowStartWithOffset(timestamp, dateTime.getOffset().getTotalSeconds()*1000l, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "TumblingEventTimeWindows(" + size + ")"; } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } } On Wed, May 2, 2018 at 9:31 AM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > True that. Thanks. Wanted to be sure before I go down that path. > > > On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Vishal, >> >> AFAIK it is not possible with Flink's default time windows. >> However, it should be possible to implement a custom WindowAssigner for >> your use case. >> I'd have a look at the TumblingEventTimeWindows class and copy/modify it >> to your needs. >> >> Best, Fabian >> >> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> This does not seem possible but need some confirmation. Anyone ? >>> >>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> How do I align a Window with EDT with day light saving correction ? >>>> The offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 >>>> , 18 and so on but on EDT. >>>> >>> >>> >> >