Re: Anomaly in handling late arriving data

2019-09-25 Thread Zhu Zhu
Hi Indraneel,

In your case, ("u1", "e12", 8L) is not considered late and will go into the
session window {e7,e8,e9,e11} (range=11~19).
This is because 8+3(session gap) >= 11, the lower bound of the existing
session window

Regarding your 3 questions:
*>> 1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)*
7+3 < 11, so e12 will not go into the session window {e7,e8,e9,e11}.
And it will be fired for the lateness.

*>> 2) allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(1L)) *
Reduce the allowedLateness will cause window {e7,e8} to be fired when e9
arrives.
So when e12 arrives, the existing session window is (e9,e11} (range=14~17).
e12 will be considered to be late in this case.

*>> 3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(4L)) *
The same as case 1).

Thanks,
Zhu Zhu

Indraneel R  于2019年9月26日周四 上午2:24写道:

> Hi Everyone,
>
> I am trying to execute this simple sessionization pipeline, with the
> allowed lateness shown below:
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(2)
>
>
> val source: DataStream[Event] = env.addSource(new
> SourceFunction[Event] {
>   lazy val input: Seq[Event] = Seq(
> Event("u1", "e1", 1L),
> Event("u1", "e5", 6L),
> Event("u1", "e7", 11L),
> Event("u1", "e8", 12L),
> Event("u1", "e9", 16L),
> Event("u1", "e11", 14L),
> *Event("u1", "e12", 8L),*
> Event("u1", "e13", 20L),
>   )
>
>   override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
> {
>   input.foreach(event => {
> ctx.collectWithTimestamp(event, event.timestamp)
> *ctx.emitWatermark(new Watermark(event.timestamp - 1))*
>   })
>   ctx.emitWatermark(new Watermark(Long.MaxValue))
> }
>   }
>
>   override def cancel(): Unit = {}
> })
>
> val tag: OutputTag[Event] = OutputTag("late-data")
>
> val sessionizedStream: DataStream[Event] = source
>   .keyBy(item => item.userId)
> *  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
>   .sideOutputLateData(tag)
> *  .allowedLateness(Time.milliseconds(2L))*
>   .process(new ProcessWindowFunction[Event, Event, String, TimeWindow]
> {
>
> override def process(key: String, context: Context, elements:
> Iterable[Event], out: Collector[Event]): Unit = {
>   val sessionIdForWindow = key + "-" + context.currentWatermark +
> "-" + context.window.getStart
>
>   elements.toSeq
> .sortBy(event => event.timestamp)
> .foreach(event => {
>   out.collect(event.copy(sessionId = sessionIdForWindow, count
> = elements.size))
> })
> }
>   })
>
> sessionizedStream.getSideOutput(tag).print()
> env.execute()
>   }
>
> But heres the problem. I am expecting the event highlighted in red
> above(e12) , to be collected in the side output as a late event.
>
> But it isn't. The event is not printed.
>
> Whats interesting is, if I make *any one* of the following changes, the
> event e12 is considered late and is printed.
>1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
>2) allowedLateness(Time.milliseconds(2L))   change
> to allowedLateness(Time.milliseconds(*1L*))
>   3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
> *7L*) *AND*
> allowedLateness(Time.milliseconds(2L))   *change to *
> allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L
>
> Can someone explain whats going on? What am I missing here?
>
>
> regards
> -Indraneel
>
>


Re: 关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题

2019-09-25 Thread Terry Wang
会不会是你数据量比较大,然后heapMemory配置的相对较小导致的,是否尝试过调大内存和并发观察是否还有OOM?

Best,
Terry Wang



> 在 2019年9月26日,上午9:25,claylin <1012539...@qq.com> 写道:
> 
> 写了个去重的任务,代码如下:
> 
> StreamQueryConfig queryConfig = tabEnv.queryConfig();
>queryConfig.withIdleStateRetentionTime(Time.seconds(20), 
> Time.minutes(6));
> 
> 
>DataStream source = env.socketTextStream("localhost", 10028)
>.map(new MapFunction() {
>@Override
>public Student map(String value) throws Exception {
>String[] vals = value.split(",");
>if (vals.length < 2) {
>return null;
>}
>Student st = new Student();
>st.stNo = vals[0];
>st.name = vals[1];
>return st;
>}
>}).returns(Student.class);
> 
> 
>Table table = tabEnv.fromDataStream(source, "stNo, name");
> 
> 
>Table distinctTab = table.groupBy("stNo, name").select("stNo, 
> name");//.select("name, name.count as cnt");
> 
> 
>DataStream> distinctStream = 
> tabEnv.toRetractStream(distinctTab, Student.class);
> 
> 
>DataStream distintOutStrem = distinctStream.map(tuple2 -> {
>if (tuple2.f0) {
>return tuple2.f1;
>}
>return null;
>}).filter(Objects::nonNull);
> 
> 
>Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, 
> proctime.proctime");
> 
> 
>Table result = 
> after.window(Tumble.over("10.seconds").on("proctime").as("w"))
>.groupBy("name, w")
>.select("name, name.count as cnt, w.start as wStart, w.end as 
> wEnd, w.proctime as wProctime");
> 
> 
>DataStream resultStream = tabEnv.toAppendStream(result, 
> Result.class);
>resultStream.print();
>env.execute(TestState.class.getSimpleName());
> 
> 
> 
> 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>  类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。
> num #instances #bytes  class name
> --
>   1:  5937   44249552  [B
>   2:214238   18291832  [C
>   3:1411995647960  
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
>   4:2135215124504  java.lang.String
>   5:1187274397272  [Ljava.lang.Object;
>   6:1081383460416  java.util.HashMap$Node
>   7: 194401667688  [Ljava.util.HashMap$Node;
>   8: 942531508048  org.apache.flink.types.Row
>   9: 470661506112  
> org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>  10: 129241426104  java.lang.Class
>  11:491229592  
> [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
>  12: 480721153728  java.lang.Long
>  13: 346571109024  java.util.concurrent.ConcurrentHashMap$Node
>  14:  77721078360  [I
>  15: 265911063640  java.util.LinkedHashMap$Entry
>  16: 15301 856856  java.util.LinkedHashMap
>  17: 11771 847512  java.lang.reflect.Field
>  18: 13172 843008  java.nio.DirectByteBuffer
>  19:  8570 754160  java.lang.reflect.Method
>  20:20 655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
>  21: 13402 643296  java.util.HashMap
>  22: 12945 621360  
> org.apache.flink.core.memory.HybridMemorySegment
>  23: 13275 531000  sun.misc.Cleaner
>  24: 15840 506880  com.esotericsoftware.kryo.Registration
>  25:   393 450928  [Ljava.nio.ByteBuffer;
>  26: 13166 421312  java.nio.DirectByteBuffer$Deallocator
>  27: 25852 413632  java.lang.Object
>  28: 14137 339288  java.util.ArrayList
>  29:  6410 307680  
> org.apache.kafka.common.metrics.stats.SampledStat$Sample
>  30:  4572 292608  
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
>  31:   392 288576  
> [Ljava.util.concurrent.ConcurrentHashMap$Node;
>  32:  8412 269184  org.apache.kafka.common.MetricName
>  33:  8412 269184  org.apache.kafka.common.metrics.KafkaMetric
>  34:72 268704  
> [Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
>  35: 10070 241680  
> org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
>  36:  9828 225040  [Ljava.lang.Class;
>  37:  9360 224640  
> com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
>  38:  7905 189720  

????????org.apache.flink.streaming.api.operators.TimerHeapInternalTimer ???????????????? ??????????????????

2019-09-25 Thread claylin
,:

 StreamQueryConfig queryConfig = tabEnv.queryConfig();
queryConfig.withIdleStateRetentionTime(Time.seconds(20), 
Time.minutes(6));


DataStream source = env.socketTextStream("localhost", 10028)
.map(new MapFunction() {
@Override
public Student map(String value) throws Exception {
String[] vals = value.split(",");
if (vals.length < 2) {
return null;
}
Student st = new Student();
st.stNo = vals[0];
st.name = vals[1];
return st;
}
}).returns(Student.class);


Table table = tabEnv.fromDataStream(source, "stNo, name");


Table distinctTab = table.groupBy("stNo, name").select("stNo, 
name");//.select("name, name.count as cnt");


DataStream> distinctStream = 
tabEnv.toRetractStream(distinctTab, Student.class);


DataStream distintOutStrem = distinctStream.map(tuple2 -> {
if (tuple2.f0) {
return tuple2.f1;
}
return null;
}).filter(Objects::nonNull);


Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, 
proctime.proctime");


Table result = 
after.window(Tumble.over("10.seconds").on("proctime").as("w"))
.groupBy("name, w")
.select("name, name.count as cnt, w.start as wStart, w.end as 
wEnd, w.proctime as wProctime");


DataStream resultStream = tabEnv.toAppendStream(result, 
Result.class);
resultStream.print();
env.execute(TestState.class.getSimpleName());



??,jvm??,dumporg.apache.flink.streaming.api.operators.TimerHeapInternalTimer
 
,TimerHeapInternalTimer,??
 num #instances #bytes  class name
--
   1:  5937   44249552  [B
   2:214238   18291832  [C
   3:1411995647960  
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
   4:2135215124504  java.lang.String
   5:1187274397272  [Ljava.lang.Object;
   6:1081383460416  java.util.HashMap$Node
   7: 194401667688  [Ljava.util.HashMap$Node;
   8: 942531508048  org.apache.flink.types.Row
   9: 470661506112  
org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
  10: 129241426104  java.lang.Class
  11:491229592  
[Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
  12: 480721153728  java.lang.Long
  13: 346571109024  java.util.concurrent.ConcurrentHashMap$Node
  14:  77721078360  [I
  15: 265911063640  java.util.LinkedHashMap$Entry
  16: 15301 856856  java.util.LinkedHashMap
  17: 11771 847512  java.lang.reflect.Field
  18: 13172 843008  java.nio.DirectByteBuffer
  19:  8570 754160  java.lang.reflect.Method
  20:20 655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
  21: 13402 643296  java.util.HashMap
  22: 12945 621360  
org.apache.flink.core.memory.HybridMemorySegment
  23: 13275 531000  sun.misc.Cleaner
  24: 15840 506880  com.esotericsoftware.kryo.Registration
  25:   393 450928  [Ljava.nio.ByteBuffer;
  26: 13166 421312  java.nio.DirectByteBuffer$Deallocator
  27: 25852 413632  java.lang.Object
  28: 14137 339288  java.util.ArrayList
  29:  6410 307680  
org.apache.kafka.common.metrics.stats.SampledStat$Sample
  30:  4572 292608  
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  31:   392 288576  
[Ljava.util.concurrent.ConcurrentHashMap$Node;
  32:  8412 269184  org.apache.kafka.common.MetricName
  33:  8412 269184  org.apache.kafka.common.metrics.KafkaMetric
  34:72 268704  
[Lorg.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
  35: 10070 241680  
org.apache.kafka.common.requests.ApiVersionsResponse$ApiVersion
  36:  9828 225040  [Ljava.lang.Class;
  37:  9360 224640  
com.esotericsoftware.kryo.Kryo$DefaultSerializerEntry
  38:  7905 189720  org.apache.flink.api.java.tuple.Tuple2
  39:  2358 150912  org.apache.kafka.common.metrics.Sensor
  40:  1855 148400  java.lang.reflect.Constructor
  41:  1464 143936  [J
 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Aleksandar Mastilovic
Would you guys (Flink devs) be interested in our solution for zookeeper-less 
HA? I could ask the managers how they feel about open-sourcing the improvement.

> On Sep 25, 2019, at 11:49 AM, Yun Tang  wrote:
> 
> As Aleksandar said, k8s with HA configuration could solve your problem. There 
> already have some discussion about how to implement such HA in k8s if we 
> don't have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. 
> Currently, you might only have to choose zookeeper as high-availability 
> service.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-11105 
> 
> [2] https://issues.apache.org/jira/browse/FLINK-12884 
> 
> 
> Best
> Yun Tang
> From: Aleksandar Mastilovic 
> Sent: Thursday, September 26, 2019 1:57
> To: Sean Hester 
> Cc: Hao Sun ; Yuval Itzchakov ; user 
> 
> Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes
>  
> Can’t you simply use JobManager in HA mode? It would pick up where it left 
> off if you don’t provide a Savepoint.
> 
>> On Sep 25, 2019, at 6:07 AM, Sean Hester > > wrote:
>> 
>> thanks for all replies! i'll definitely take a look at the Flink k8s 
>> Operator project.
>> 
>> i'll try to restate the issue to clarify. this issue is specific to starting 
>> a job from a savepoint in job-cluster mode. in these cases the Job Manager 
>> container is configured to run a single Flink job at start-up. the savepoint 
>> needs to be provided as an argument to the entrypoint. the Flink 
>> documentation for this approach is here:
>> 
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>>  
>> 
>> 
>> the issue is that taking this approach means that the job will always start 
>> from the savepoint provided as the start argument in the Kubernetes YAML. 
>> this includes unplanned restarts of the job manager, but we'd really prefer 
>> any unplanned restarts resume for the most recent checkpoint instead of 
>> restarting from the configured savepoint. so in a sense we want the 
>> savepoint argument to be transient, only being used during the initial 
>> deployment, but this runs counter to the design of Kubernetes which always 
>> wants to restore a deployment to the "goal state" as defined in the YAML.
>> 
>> i hope this helps. if you want more details please let me know, and thanks 
>> again for your time.
>> 
>> 
>> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun > > wrote:
>> I think I overlooked it. Good point. I am using Redis to save the path to my 
>> savepoint, I might be able to set a TTL to avoid such issue.
>> 
>> Hao Sun
>> 
>> 
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov > > wrote:
>> Hi Hao,
>> 
>> I think he's exactly talking about the usecase where the JM/TM restart and 
>> they come back up from the latest savepoint which might be stale by that 
>> time.
>> 
>> On Tue, 24 Sep 2019, 19:24 Hao Sun, > > wrote:
>> We always make a savepoint before we shutdown the job-cluster. So the 
>> savepoint is always the latest. When we fix a bug or change the job graph, 
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, 
>> uncaught exception, etc.
>> 
>> Maybe I do not understand your use case well, I do not see a need to start 
>> from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>> 
>> Hao Sun
>> 
>> 
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov > > wrote:
>> AFAIK there's currently nothing implemented to solve this problem, but 
>> working on a possible fix can be implemented on top of 
>> https://github.com/lyft/flinkk8soperator 
>>  which already has a pretty fancy 
>> state machine for rolling upgrades. I'd love to be involved as this is an 
>> issue I've been thinking about as well.
>> 
>> Yuval
>> 
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester > > wrote:
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases 
>> when deploying Flink jobs to start from savepoints using the job-cluster 
>> mode in Kubernetes.
>> 
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix of 
>> Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all 
>> long-running streaming jobs, all essentially acting as microservices. we're 
>> using Helm charts to configure all of our deployments.
>> 
>> we have a number of use cases where we want to restart jobs from a savepoint 
>> to replay recent events, i.e. when we've enhanced the job logic or fixed a 
>> bug. but after the deployment we want to have the job resume it's 
>> 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Yun Tang
As Aleksandar said, k8s with HA configuration could solve your problem. There 
already have some discussion about how to implement such HA in k8s if we don't 
have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you 
might only have to choose zookeeper as high-availability service.

[1] https://issues.apache.org/jira/browse/FLINK-11105
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang

From: Aleksandar Mastilovic 
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester 
Cc: Hao Sun ; Yuval Itzchakov ; user 

Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Can’t you simply use JobManager in HA mode? It would pick up where it left off 
if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator 
project.

i'll try to restate the issue to clarify. this issue is specific to starting a 
job from a savepoint in job-cluster mode. in these cases the Job Manager 
container is configured to run a single Flink job at start-up. the savepoint 
needs to be provided as an argument to the entrypoint. the Flink documentation 
for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will always start 
from the savepoint provided as the start argument in the Kubernetes YAML. this 
includes unplanned restarts of the job manager, but we'd really prefer any 
unplanned restarts resume for the most recent checkpoint instead of restarting 
from the configured savepoint. so in a sense we want the savepoint argument to 
be transient, only being used during the initial deployment, but this runs 
counter to the design of Kubernetes which always wants to restore a deployment 
to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks 
again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun 
mailto:ha...@zendesk.com>> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my 
savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
mailto:yuva...@gmail.com>> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they 
come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, 
mailto:ha...@zendesk.com>> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint 
is always the latest. When we fix a bug or change the job graph, it can resume 
well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught 
exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from 
checkpoint after a bug fix.
>From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
mailto:yuva...@gmail.com>> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working 
on a possible fix can be implemented on top of 
https://github.com/lyft/flinkk8soperator which already has a pretty fancy state 
machine for rolling upgrades. I'd love to be involved as this is an issue I've 
been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when 
deploying Flink jobs to start from savepoints using the job-cluster mode in 
Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of 
Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all 
long-running streaming jobs, all essentially acting as microservices. we're 
using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to 
replay recent events, i.e. when we've enhanced the job logic or fixed a bug. 
but after the deployment we want to have the job resume it's "long-running" 
behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes 
deployment includes the savepoint argument in the configuration. if the Job 
Manager container(s) have an unplanned restart, when they come back up they 
will start from the savepoint instead of resuming from the latest checkpoint. 
everything is working as configured, but that's not exactly what we want. we 
want the savepoint argument to be transient somehow (only used during the 
initial deployment), but Kubernetes doesn't really support the concept of 
transient configuration.

i can see a couple of potential solutions that either involve custom code in 
the jobs or 

Anomaly in handling late arriving data

2019-09-25 Thread Indraneel R
Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the
allowed lateness shown below:

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(2)


val source: DataStream[Event] = env.addSource(new SourceFunction[Event]
{
  lazy val input: Seq[Event] = Seq(
Event("u1", "e1", 1L),
Event("u1", "e5", 6L),
Event("u1", "e7", 11L),
Event("u1", "e8", 12L),
Event("u1", "e9", 16L),
Event("u1", "e11", 14L),
*Event("u1", "e12", 8L),*
Event("u1", "e13", 20L),
  )

  override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
{
  input.foreach(event => {
ctx.collectWithTimestamp(event, event.timestamp)
*ctx.emitWatermark(new Watermark(event.timestamp - 1))*
  })
  ctx.emitWatermark(new Watermark(Long.MaxValue))
}
  }

  override def cancel(): Unit = {}
})

val tag: OutputTag[Event] = OutputTag("late-data")

val sessionizedStream: DataStream[Event] = source
  .keyBy(item => item.userId)
*  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
  .sideOutputLateData(tag)
*  .allowedLateness(Time.milliseconds(2L))*
  .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

override def process(key: String, context: Context, elements:
Iterable[Event], out: Collector[Event]): Unit = {
  val sessionIdForWindow = key + "-" + context.currentWatermark +
"-" + context.window.getStart

  elements.toSeq
.sortBy(event => event.timestamp)
.foreach(event => {
  out.collect(event.copy(sessionId = sessionIdForWindow, count
= elements.size))
})
}
  })

sessionizedStream.getSideOutput(tag).print()
env.execute()
  }

But heres the problem. I am expecting the event highlighted in red
above(e12) , to be collected in the side output as a late event.

But it isn't. The event is not printed.

Whats interesting is, if I make *any one* of the following changes, the
event e12 is considered late and is printed.
   1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
   2) allowedLateness(Time.milliseconds(2L))   change
to allowedLateness(Time.milliseconds(*1L*))
  3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
*7L*) *AND*
allowedLateness(Time.milliseconds(2L))   *change to *
allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L

Can someone explain whats going on? What am I missing here?


regards
-Indraneel


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Aleksandar Mastilovic
Can’t you simply use JobManager in HA mode? It would pick up where it left off 
if you don’t provide a Savepoint.

> On Sep 25, 2019, at 6:07 AM, Sean Hester  wrote:
> 
> thanks for all replies! i'll definitely take a look at the Flink k8s Operator 
> project.
> 
> i'll try to restate the issue to clarify. this issue is specific to starting 
> a job from a savepoint in job-cluster mode. in these cases the Job Manager 
> container is configured to run a single Flink job at start-up. the savepoint 
> needs to be provided as an argument to the entrypoint. the Flink 
> documentation for this approach is here:
> 
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>  
> 
> 
> the issue is that taking this approach means that the job will always start 
> from the savepoint provided as the start argument in the Kubernetes YAML. 
> this includes unplanned restarts of the job manager, but we'd really prefer 
> any unplanned restarts resume for the most recent checkpoint instead of 
> restarting from the configured savepoint. so in a sense we want the savepoint 
> argument to be transient, only being used during the initial deployment, but 
> this runs counter to the design of Kubernetes which always wants to restore a 
> deployment to the "goal state" as defined in the YAML.
> 
> i hope this helps. if you want more details please let me know, and thanks 
> again for your time.
> 
> 
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  > wrote:
> I think I overlooked it. Good point. I am using Redis to save the path to my 
> savepoint, I might be able to set a TTL to avoid such issue.
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  > wrote:
> Hi Hao,
> 
> I think he's exactly talking about the usecase where the JM/TM restart and 
> they come back up from the latest savepoint which might be stale by that time.
> 
> On Tue, 24 Sep 2019, 19:24 Hao Sun,  > wrote:
> We always make a savepoint before we shutdown the job-cluster. So the 
> savepoint is always the latest. When we fix a bug or change the job graph, it 
> can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, 
> uncaught exception, etc.
> 
> Maybe I do not understand your use case well, I do not see a need to start 
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
> 
> Hao Sun
> 
> 
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  > wrote:
> AFAIK there's currently nothing implemented to solve this problem, but 
> working on a possible fix can be implemented on top of 
> https://github.com/lyft/flinkk8soperator 
>  which already has a pretty fancy 
> state machine for rolling upgrades. I'd love to be involved as this is an 
> issue I've been thinking about as well.
> 
> Yuval
> 
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester  > wrote:
> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when 
> deploying Flink jobs to start from savepoints using the job-cluster mode in 
> Kubernetes.
> 
> we're running a ~15 different jobs, all in job-cluster mode, using a mix of 
> Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all 
> long-running streaming jobs, all essentially acting as microservices. we're 
> using Helm charts to configure all of our deployments.
> 
> we have a number of use cases where we want to restart jobs from a savepoint 
> to replay recent events, i.e. when we've enhanced the job logic or fixed a 
> bug. but after the deployment we want to have the job resume it's 
> "long-running" behavior, where any unplanned restarts resume from the latest 
> checkpoint.
> 
> the issue we run into is that any obvious/standard/idiomatic Kubernetes 
> deployment includes the savepoint argument in the configuration. if the Job 
> Manager container(s) have an unplanned restart, when they come back up they 
> will start from the savepoint instead of resuming from the latest checkpoint. 
> everything is working as configured, but that's not exactly what we want. we 
> want the savepoint argument to be transient somehow (only used during the 
> initial deployment), but Kubernetes doesn't really support the concept of 
> transient configuration.
> 
> i can see a couple of potential solutions that either involve custom code in 
> the jobs or custom logic in the container (i.e. a custom entrypoint script 
> that records that the configured savepoint has already been used in a file on 
> a persistent volume or GCS, and potentially when/why/by which deployment). 
> but these seem like unexpected and hacky solutions. before we head down that 
> road i wanted to ask:
> is this is already a solved 

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-25 Thread Zhu Zhu
We will then keep the decision that we do not support customized restart
strategy in Flink 1.10.

Thanks Steven for the inputs!

Thanks,
Zhu Zhu

Steven Wu  于2019年9月26日周四 上午12:13写道:

> Zhu Zhu, that is correct.
>
> On Tue, Sep 24, 2019 at 8:04 PM Zhu Zhu  wrote:
>
>> Hi Steven,
>>
>> As a conclusion, since we will have a meter metric[1] for restarts,
>> customized restart strategy is not needed in your case.
>> Is that right?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月25日周三 上午2:30写道:
>>
>>> Zhu Zhu,
>>>
>>> Sorry, I was using different terminology. yes, Flink meter is what I was
>>> talking about regarding "fullRestarts" for threshold based alerting.
>>>
>>> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>>>
 Steven,

 In my mind, Flink counter only stores its accumulated count and reports
 that value. Are you using an external counter directly?
 Maybe Flink Meter/MeterView is what you need? It stores the count and
 calculates the rate. And it will report its "count" as well as "rate" to
 external metric services.

 The counter "task_failures" only works if the individual failover
 strategy is enabled. However, it is not a public interface and is not
 suggested to use, as the fine grained recovery (region failover) now
 supersedes it.
 I've opened a ticket[1] to add a metric to show failovers that respects
 fine grained recovery.

 [1] https://issues.apache.org/jira/browse/FLINK-14164

 Thanks,
 Zhu Zhu

 Steven Wu  于2019年9月24日周二 上午6:41写道:

>
> When we setup alert like "fullRestarts > 1" for some rolling window,
> we want to use counter. if it is a Gauge, "fullRestarts" will never go
> below 1 after a first full restart. So alert condition will always be true
> after first job restart. If we can apply a derivative to the Gauge value, 
> I
> guess alert can probably work. I can explore if that is an option or not.
>
> Yeah. Understood that "fullRestart" won't increment when fine grained
> recovery happened. I think "task_failures" counter already exists in 
> Flink.
>
>
>
> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>
>> Steven,
>>
>> Thanks for the information. If we can determine this a common issue,
>> we can solve it in Flink core.
>> To get to that state, I have two questions which need your help:
>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>> Gauge. Does the metric reporter you use report Counter and
>> Gauge to external services in different ways? Or anything else can 
>> be
>> different due to the metric type?
>> 2. Is the "number of restarts" what you actually need, rather than
>> the "fullRestart" count? If so, I believe we will have such a counter
>> metric in 1.10, since the previous "fullRestart" metric value is not the
>> number of restarts when grained recovery (feature added 1.9.0) is 
>> enabled.
>> "fullRestart" reveals how many times entire job graph has been
>> restarted. If grained recovery (feature added 1.9.0) is enabled, the 
>> graph
>> would not be restarted when task failures happen and the "fullRestart"
>> value will not increment in such cases.
>>
>> I'd appreciate if you can help with these questions and we can make
>> better decisions for Flink.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>
>>> Zhu Zhu,
>>>
>>> Flink fullRestart metric is a Gauge, which is not good for alerting
>>> on. We publish an equivalent Counter metric for alerting purpose.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>>
 Thanks Steven for the feedback!
 Could you share more information about the metrics you add in you
 customized restart strategy?

 Thanks,
 Zhu Zhu

 Steven Wu  于2019年9月20日周五 上午7:11写道:

> We do use config like "restart-strategy:
> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
> metrics than the Flink provided ones.
>
> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>
>> Thanks everyone for the input.
>>
>> The RestartStrategy customization is not recognized as a public
>> interface as it is not explicitly documented.
>> As it is not used from the feedbacks of this survey, I'll
>> conclude that we do not need to support customized RestartStrategy 
>> for the
>> new scheduler in Flink 1.10
>>
>> Other usages are still supported, including all the strategies
>> and configuring ways described in
>> 

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-25 Thread Steven Wu
Zhu Zhu, that is correct.

On Tue, Sep 24, 2019 at 8:04 PM Zhu Zhu  wrote:

> Hi Steven,
>
> As a conclusion, since we will have a meter metric[1] for restarts,
> customized restart strategy is not needed in your case.
> Is that right?
>
> [1] https://issues.apache.org/jira/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月25日周三 上午2:30写道:
>
>> Zhu Zhu,
>>
>> Sorry, I was using different terminology. yes, Flink meter is what I was
>> talking about regarding "fullRestarts" for threshold based alerting.
>>
>> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>>
>>> Steven,
>>>
>>> In my mind, Flink counter only stores its accumulated count and reports
>>> that value. Are you using an external counter directly?
>>> Maybe Flink Meter/MeterView is what you need? It stores the count and
>>> calculates the rate. And it will report its "count" as well as "rate" to
>>> external metric services.
>>>
>>> The counter "task_failures" only works if the individual failover
>>> strategy is enabled. However, it is not a public interface and is not
>>> suggested to use, as the fine grained recovery (region failover) now
>>> supersedes it.
>>> I've opened a ticket[1] to add a metric to show failovers that respects
>>> fine grained recovery.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月24日周二 上午6:41写道:
>>>

 When we setup alert like "fullRestarts > 1" for some rolling window, we
 want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
 after a first full restart. So alert condition will always be true after
 first job restart. If we can apply a derivative to the Gauge value, I guess
 alert can probably work. I can explore if that is an option or not.

 Yeah. Understood that "fullRestart" won't increment when fine grained
 recovery happened. I think "task_failures" counter already exists in Flink.



 On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:

> Steven,
>
> Thanks for the information. If we can determine this a common issue,
> we can solve it in Flink core.
> To get to that state, I have two questions which need your help:
> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
> Gauge. Does the metric reporter you use report Counter and
> Gauge to external services in different ways? Or anything else can 
> be
> different due to the metric type?
> 2. Is the "number of restarts" what you actually need, rather than
> the "fullRestart" count? If so, I believe we will have such a counter
> metric in 1.10, since the previous "fullRestart" metric value is not the
> number of restarts when grained recovery (feature added 1.9.0) is enabled.
> "fullRestart" reveals how many times entire job graph has been
> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
> would not be restarted when task failures happen and the "fullRestart"
> value will not increment in such cases.
>
> I'd appreciate if you can help with these questions and we can make
> better decisions for Flink.
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月22日周日 上午3:31写道:
>
>> Zhu Zhu,
>>
>> Flink fullRestart metric is a Gauge, which is not good for alerting
>> on. We publish an equivalent Counter metric for alerting purpose.
>>
>> Thanks,
>> Steven
>>
>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>
>>> Thanks Steven for the feedback!
>>> Could you share more information about the metrics you add in you
>>> customized restart strategy?
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>>
 We do use config like "restart-strategy:
 org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
 metrics than the Flink provided ones.

 On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:

> Thanks everyone for the input.
>
> The RestartStrategy customization is not recognized as a public
> interface as it is not explicitly documented.
> As it is not used from the feedbacks of this survey, I'll conclude
> that we do not need to support customized RestartStrategy for the new
> scheduler in Flink 1.10
>
> Other usages are still supported, including all the strategies and
> configuring ways described in
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
> .
>
> Feel free to share in this thread if you has any concern for it.
>
> Thanks,
> Zhu Zhu
>
> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>
>> Thanks Oytun for the reply!
>>
>> Sorry for not have stated it clearly. When saying "customized

Re: Setting environment variables of the taskmanagers (yarn)

2019-09-25 Thread Peter Huang
Hi Richard,

Good suggestion. I just created a Jira ticket. I will find a time this week
to update docs.



Best Regards
Peter Huang

On Wed, Sep 25, 2019 at 8:05 AM Richard Deurwaarder  wrote:

> Hi Peter and Jiayi,
>
> Thanks for the answers this worked perfectly, I just added
>
> containerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
> and
> containerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
>
> to my flink config and they got picked up.
>
> Do you know why this is missing from the docs? If it's not intentional it
> might be nice to add it.
>
> Richard
>
> On Tue, Sep 24, 2019 at 5:53 PM Peter Huang 
> wrote:
>
>> Hi Richard,
>>
>> For the first question, I don't think you need to explicitly specify
>> fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
>> local resource for containers. The configuration path is
>> overridden internally in Flink.
>>
>> For the second question of setting TM environment variables, please use
>> these two configurations in your flink conf.
>>
>> /**
>>  * Prefix for passing custom environment variables to Flink's master process.
>>  * For example for passing LD_LIBRARY_PATH as an env variable to the 
>> AppMaster, set:
>>  * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
>>  * in the flink-conf.yaml.
>>  */
>> public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
>> "containerized.master.env.";
>>
>> /**
>>  * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration 
>> prefix allows
>>  * setting custom environment variables for the workers (TaskManagers).
>>  */
>> public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
>> "containerized.taskmanager.env.";
>>
>>
>>
>> Best Regards
>>
>> Peter Huang
>>
>>
>>
>>
>> On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder 
>> wrote:
>>
>>> Hello,
>>>
>>> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with
>>> yarn. We would like to add the GCS connector to use GCS rather than HDFS.
>>> Following the documentation of the GCS connector[1] we have to specify
>>> which credentials we want to use and there are two ways of doing this:
>>>   * Edit core-site.xml
>>>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>>>
>>> Because we're on a company shared hadoop cluster we do not want to
>>> change the cluster wide core-site.xml.
>>>
>>> This leaves me with two options:
>>>
>>> 1. Create a custom core-site.xml and use --yarnship to send it to all
>>> the taskmanager contains. If I do this, to what value should I set
>>> fs.hdfs.hadoopconf[2] in flink-conf ?
>>> 2. The second option would be to set an environment variable, however
>>> because the taskmanagers are started via yarn I'm having trouble figuring
>>> out how to make sure this environment variable is set for each yarn
>>> container / taskmanager.
>>>
>>> I would appreciate any help you can provide.
>>>
>>> Thank you,
>>>
>>> Richard
>>>
>>> [1]
>>> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>>>
>>


Re: Flink job manager doesn't remove stale checkmarks

2019-09-25 Thread Fabian Hueske
Hi,

You enabled incremental checkpoints.
This means that parts of older checkpoints that did not change since the
last checkpoint are not removed because they are still referenced by the
incremental checkpoints.
Flink will automatically remove them once they are not needed anymore.

Are you sure that the size of your application's state is not growing too
large?

Best, Fabian

Am Di., 24. Sept. 2019 um 10:47 Uhr schrieb Clay Teeter <
clay.tee...@maalka.com>:

> Oh geez,  checkmarks  = checkpoints... sorry.
>
> What i mean by stale "checkpoints" are checkpoints that should be reaped
> by: "state.checkpoints.num-retained: 3".
>
> What is happening is that directories:
>   - state.checkpoints.dir: file:///opt/ha/49/checkpoints
>   - high-availability.storageDir: file:///opt/ha/49/ha
> are growing with every checkpoint and i'm running out of disk space.
>
> On Tue, Sep 24, 2019 at 4:55 AM Biao Liu  wrote:
>
>> Hi Clay,
>>
>> Sorry I don't get your point. I'm not sure what the "stale checkmarks"
>> exactly means. The HA storage and checkpoint directory left after shutting
>> down cluster?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 03:12, Clay Teeter  wrote:
>>
>>> I'm trying to get my standalone cluster to remove stale checkmarks.
>>>
>>> The cluster is composed of a single job and task manager backed by
>>> rocksdb with high availability.
>>>
>>> The configuration on both the job and task manager are:
>>>
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///opt/ha/49/checkpoints
>>> state.backend.incremental: true
>>> state.checkpoints.num-retained: 3
>>> jobmanager.heap.size: 1024m
>>> taskmanager.heap.size: 2048m
>>> taskmanager.numberOfTaskSlots: 24
>>> parallelism.default: 1
>>> high-availability.jobmanager.port: 6123
>>> high-availability.zookeeper.path.root: _49
>>> high-availability: zookeeper
>>> high-availability.storageDir: file:///opt/ha/49/ha
>>> high-availability.zookeeper.quorum: **t:2181
>>>
>>> Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via
>>> NFS and are owned by the flink user.  Also, there are no errors that i can
>>> find.
>>>
>>> Does anyone have any ideas that i could try?
>>>
>>>


Re: Running flink on AWS ECS

2019-09-25 Thread Navneeth Krishnan
Thanks Terry, the reason why I asked this is because somewhere I saw
running one slot per container is beneficial. I couldn’t find the where I
saw that.
Also I think running it with multiple slots will reduce IPC since some of
the data will be processed writhing the same JVM.

Thanks

On Wed, Sep 25, 2019 at 1:16 AM Terry Wang  wrote:

> Hi, Navneeth,
>
> I think both is ok.
> IMO, run one container with number of slots same as virtual cores may be
> better for slots can share the Flink Framework and thus reduce memory cost.
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
> >
> > Hi All,
> >
> > I’m currently running flink on amazon ecs and I have assigned task slots
> based on vcpus per instance. Is it beneficial to run a separate container
> with one slot each or one container with number of slots same as virtual
> cores?
> >
> > Thanks
>
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-25 Thread Richard Deurwaarder
Hi Peter and Jiayi,

Thanks for the answers this worked perfectly, I just added

containerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
and
containerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz

to my flink config and they got picked up.

Do you know why this is missing from the docs? If it's not intentional it
might be nice to add it.

Richard

On Tue, Sep 24, 2019 at 5:53 PM Peter Huang 
wrote:

> Hi Richard,
>
> For the first question, I don't think you need to explicitly specify
> fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
> local resource for containers. The configuration path is
> overridden internally in Flink.
>
> For the second question of setting TM environment variables, please use
> these two configurations in your flink conf.
>
> /**
>  * Prefix for passing custom environment variables to Flink's master process.
>  * For example for passing LD_LIBRARY_PATH as an env variable to the 
> AppMaster, set:
>  * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
>  * in the flink-conf.yaml.
>  */
> public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
> "containerized.master.env.";
>
> /**
>  * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration 
> prefix allows
>  * setting custom environment variables for the workers (TaskManagers).
>  */
> public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
> "containerized.taskmanager.env.";
>
>
>
> Best Regards
>
> Peter Huang
>
>
>
>
> On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder 
> wrote:
>
>> Hello,
>>
>> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with
>> yarn. We would like to add the GCS connector to use GCS rather than HDFS.
>> Following the documentation of the GCS connector[1] we have to specify
>> which credentials we want to use and there are two ways of doing this:
>>   * Edit core-site.xml
>>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>>
>> Because we're on a company shared hadoop cluster we do not want to change
>> the cluster wide core-site.xml.
>>
>> This leaves me with two options:
>>
>> 1. Create a custom core-site.xml and use --yarnship to send it to all the
>> taskmanager contains. If I do this, to what value should I set
>> fs.hdfs.hadoopconf[2] in flink-conf ?
>> 2. The second option would be to set an environment variable, however
>> because the taskmanagers are started via yarn I'm having trouble figuring
>> out how to make sure this environment variable is set for each yarn
>> container / taskmanager.
>>
>> I would appreciate any help you can provide.
>>
>> Thank you,
>>
>> Richard
>>
>> [1]
>> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>>
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Vijay Bhaskar
One of the way you should do is, have a separate cluster job manager
program in kubernetes, which is actually managing jobs. So that you can
decouple the job control. While restarting the job, make sure to follow the
below steps:

a) First job manager takes save point by killing the job and notes down the
save point path by using the save point rest api
b) After  that job manager starts the new job by supplying the save point
path. So that it starts from the latest save point.

So that you no need to rely on yaml configuration.

Also above steps helps only for manual restart of the flink job.
There are another 2 cases possible:

case 1 => Your job restarts by it self with the help of flink cluster, then
latest check point is going to take care of the job state, no need to worry
about
case 2 => Your job is failed. Then state is lost. To overcome this, as per
the documentation best thing is: Take periodic save points. So that while
restarting the job from crashes,
provide the argument of latest save point path  as argument to your job
manager program.

So the key is, have a seprate job manager of flink jobs so that you will
have the flexibility

Regards
Bhaskar


On Wed, Sep 25, 2019 at 6:38 PM Sean Hester 
wrote:

> thanks for all replies! i'll definitely take a look at the Flink k8s
> Operator project.
>
> i'll try to restate the issue to clarify. this issue is specific to
> starting a job from a savepoint in job-cluster mode. in these cases the Job
> Manager container is configured to run a single Flink job at start-up. the
> savepoint needs to be provided as an argument to the entrypoint. the Flink
> documentation for this approach is here:
>
>
> https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint
>
> the issue is that taking this approach means that the job will *always*
> start from the savepoint provided as the start argument in the Kubernetes
> YAML. this includes unplanned restarts of the job manager, but we'd really
> prefer any *unplanned* restarts resume for the most recent checkpoint
> instead of restarting from the configured savepoint. so in a sense we want
> the savepoint argument to be transient, only being used during the initial
> deployment, but this runs counter to the design of Kubernetes which always
> wants to restore a deployment to the "goal state" as defined in the YAML.
>
> i hope this helps. if you want more details please let me know, and thanks
> again for your time.
>
>
> On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:
>
>> I think I overlooked it. Good point. I am using Redis to save the path to
>> my savepoint, I might be able to set a TTL to avoid such issue.
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
>> wrote:
>>
>>> Hi Hao,
>>>
>>> I think he's exactly talking about the usecase where the JM/TM restart
>>> and they come back up from the latest savepoint which might be stale by
>>> that time.
>>>
>>> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>>>
 We always make a savepoint before we shutdown the job-cluster. So the
 savepoint is always the latest. When we fix a bug or change the job graph,
 it can resume well.
 We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
 uncaught exception, etc.

 Maybe I do not understand your use case well, I do not see a need to
 start from checkpoint after a bug fix.
 From what I know, currently you can use checkpoint as a savepoint as
 well

 Hao Sun


 On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
 wrote:

> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator which already has a pretty
> fancy state machine for rolling upgrades. I'd love to be involved as this
> is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
> sean.hes...@bettercloud.com> wrote:
>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>> cases when deploying Flink jobs to start from savepoints using the
>> job-cluster mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a
>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>> are all long-running streaming jobs, all essentially acting as
>> microservices. we're using Helm charts to configure all of our 
>> deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from 
>> the
>> latest checkpoint.
>>
>> the issue we run into is that any 

Re: Multiple Job Managers in Flink HA Setup

2019-09-25 Thread Gary Yao
Hi Steve,

> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that
directory.

You can create a fixed upload directory via the config option
'web.upload.dir'
[1]. To avoid race conditions, it is probably best to make sure that the
directory already exists before starting the JMs (if the path does not
exist,
both JMs may attempt to create it).

Alternatively you can try one of the following:
- Do not use stand-by masters
- Find the leader address from ZooKeeper, and issue a request directly [2]
- Use Flink CLI, which will resolve the leading JM from ZooKeeper. Note that
 the CLI submits the job by uploading a serialized JobGraph [2][3][4][5]
(you
 could also rebuild that part of the CLI if you need programmatic job
 submission).

Lastly, I want to point out that the programmatic job submission is
currently
being reworked (see [6] for details).

> 2) provide a persistent storage directory for the Jar file so I can
perform
> rescaling without needing to re-upload the jar file.

Can you describe how are you rescaling?


Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#web-upload-dir
[2]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L162
[3]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L215
[4]
https://github.com/apache/flink/blob/b6e32a317ec273ee3f3085728a02bc3922c22db6/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java#L79
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-1
[6]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E

On Fri, Sep 20, 2019 at 10:57 PM Steven Nelson 
wrote:

> Hello!
>
> I am having some difficulty with multiple job managers in an HA setup
> using Flink 1.9.0.
>
> I have 2 job managers and have setup the HA setup with the following config
>
> high-availability: zookeeper
> high-availability.cluster-id: /imet-enhance
> high-availability.storageDir: hdfs:///flink/ha/
> high-availability.zookeeper.quorum:
> flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
> high-availability.zookeeper.path.root: /flink
> high-availability.jobmanager.port: 5-50025
>
> I have the job managers behind a load balancer inside a kubernetes cluster
>
> They work great except for one thing. When I use the website (or API) to
> upload the Jar file and start the job sometimes the request goes to a
> different job manager, which doesn't have the jar file in it's temporary
> directory, so it fails to start.
>
> In the 1.7 version of this setup the second Job Manager would return a
> Redirect request. I put an HAProxy in front of it that only allowed traffic
> to flow to the Job Manager that wasn't returning a 300 and this worked well
> for everything. In 1.9 it appears that both Job Managers are able to
> respond (via the internal proxy mechanism I have seen in prior emails).
> However it appears the web file cache is still shared.
>
> I also tried attaching a shared NFS folder between the two machines and
> tried to set their web.tmpdir property to the shared folder, however it
> appears that each job manager creates a seperate job inside that directory.
>
> My end goals are:
> 1) Provide a fault tolerant Flink Cluster
> 2) provide a persistent storage directory for the Jar file so I can
> perform rescaling without needing to re-upload the jar file.
>
> Thoughts?
> -Steve
>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Sean Hester
thanks for all replies! i'll definitely take a look at the Flink k8s
Operator project.

i'll try to restate the issue to clarify. this issue is specific to
starting a job from a savepoint in job-cluster mode. in these cases the Job
Manager container is configured to run a single Flink job at start-up. the
savepoint needs to be provided as an argument to the entrypoint. the Flink
documentation for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will *always*
start from the savepoint provided as the start argument in the Kubernetes
YAML. this includes unplanned restarts of the job manager, but we'd really
prefer any *unplanned* restarts resume for the most recent checkpoint
instead of restarting from the configured savepoint. so in a sense we want
the savepoint argument to be transient, only being used during the initial
deployment, but this runs counter to the design of Kubernetes which always
wants to restore a deployment to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks
again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun  wrote:

> I think I overlooked it. Good point. I am using Redis to save the path to
> my savepoint, I might be able to set a TTL to avoid such issue.
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  wrote:
>
>> Hi Hao,
>>
>> I think he's exactly talking about the usecase where the JM/TM restart
>> and they come back up from the latest savepoint which might be stale by
>> that time.
>>
>> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>>
>>> We always make a savepoint before we shutdown the job-cluster. So the
>>> savepoint is always the latest. When we fix a bug or change the job graph,
>>> it can resume well.
>>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>>> uncaught exception, etc.
>>>
>>> Maybe I do not understand your use case well, I do not see a need to
>>> start from checkpoint after a bug fix.
>>> From what I know, currently you can use checkpoint as a savepoint as well
>>>
>>> Hao Sun
>>>
>>>
>>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
>>> wrote:
>>>
 AFAIK there's currently nothing implemented to solve this problem, but
 working on a possible fix can be implemented on top of
 https://github.com/lyft/flinkk8soperator which already has a pretty
 fancy state machine for rolling upgrades. I'd love to be involved as this
 is an issue I've been thinking about as well.

 Yuval

 On Tue, Sep 24, 2019 at 5:02 PM Sean Hester <
 sean.hes...@bettercloud.com> wrote:

> hi all--we've run into a gap (knowledge? design? tbd?) for our use
> cases when deploying Flink jobs to start from savepoints using the
> job-cluster mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a
> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
> are all long-running streaming jobs, all essentially acting as
> microservices. we're using Helm charts to configure all of our 
> deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic
> Kubernetes deployment includes the savepoint argument in the 
> configuration.
> if the Job Manager container(s) have an unplanned restart, when they come
> back up they will start from the savepoint instead of resuming from the
> latest checkpoint. everything is working as configured, but that's not
> exactly what we want. we want the savepoint argument to be transient
> somehow (only used during the initial deployment), but Kubernetes doesn't
> really support the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom
> code in the jobs or custom logic in the container (i.e. a custom 
> entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>- is this is already a solved problem that i've missed?
>- is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>  

关于1.9使用hive中的udf

2019-09-25 Thread like
各位大佬好:
目前我在使用1.9版本中hive的udf碰到如下问题:
1、hive的udf都是注册在default库中,sql里面带有default关键词,flink程序就会报错
我通过 tableEnv.useCatalog("hive") 
、tableEnv.useDatabase("default")这种方式解决了default关键词的问题
同时发现如果不使用tableEnv.useDatabase("xx_db"),直接使用  xx_db.fun是找不到函数的


2、使用上面的方式能使用hive中指定某个库的udf,但是需要使用flink中注册的表会很麻烦
sql里需要这么写(default_catalog.default_database.xx_table)


 请问大家有没有好的使用方式和建议?感谢 !  

Re: Joins Usage Clarification

2019-09-25 Thread Fabian Hueske
Hi Nishant,

To answer your questions:
1) yes, the SQL time-windowed join and the DataStream API Interval Join are
the same (with different implementations though)
2) DataStream Session-window joins are not directly supported in SQL. You
can play some tricks to make it work, but it wouldn't be elegant and to be
honest, the semantics of the session-window join are not really meaningful,
IMO.

Fabian

Am Mi., 25. Sept. 2019 um 11:26 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Team,
>
> There are 3 types of window join (Tumbling, Session, and Sliding) and 1
> interval Join as mentioned in (For Table API)
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
> 
>
> Plus, there is 1 Time window Join as mentioned in (For SQL)
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>
>
> Need some clarifications/answers to below doubts
> 1. Can some one help me understand how Time window join is different from
> Interval Join. Looks like same to me
> 2. How do I implement session window join in Flink SQL? With an example
> would be appreciated.
>
> Thanks
> Nishant
>
>
>
>
>
>
>
>
>


Joins Usage Clarification

2019-09-25 Thread Nishant Gupta
Hi Team,

There are 3 types of window join (Tumbling, Session, and Sliding) and 1
interval Join as mentioned in (For Table API)
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html


Plus, there is 1 Time window Join as mentioned in (For SQL)
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins


Need some clarifications/answers to below doubts
1. Can some one help me understand how Time window join is different from
Interval Join. Looks like same to me
2. How do I implement session window join in Flink SQL? With an example
would be appreciated.

Thanks
Nishant


Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-25 Thread Zhu Zhu
Yes. 1.8.2 contains all commits in 1.8.1.

Subramanyam Ramanathan 
于2019年9月25日周三 下午5:03写道:

> Hi Zhu,
>
>
>
> Thanks a lot !
>
> Since 1.8.2 is also available, would it be right to assume 1.8.2 would
> also contain the fix ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 9:39 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I checked the commits.
>
> There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0
> contain both of them.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午11:02写道:
>
> Hi Zhu,
>
>
>
> We also use FlinkKafkaProducer(011), hence I felt this fix would also be
> needed for us.
>
>
>
> I agree that the fix for the issue I had originally mentioned would not be
> fixed by this, but I felt that I should be consuming this fix also.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 6:13 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I think you do not need the fix in FLINK-10455 which is for Kafka only.
> It's just a similar issue as you met.
>
> As you said, we need to make sure that the operator/UDF spawned threads
> are stopped in the close() method. In this way, we can avoid the thread to
> throw NoClassDefFoundError due to the class loader gets closed.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午8:07写道:
>
> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> Could you share more information? including:
>
> 1. the URL pattern
>
> 2. the detailed exception and the log around it
>
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
>
> 4. it's session mode or per job mode
>
>
>
> This information would be helpful to identify the failure cause.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Subramanyam Ramanathan  于2019年8月9
> 日周五 上午1:45写道:
>
>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
>
> I'm trying to run a job that's submitted programmatically using the
> ClusterClient API.
>
>public JobSubmissionResult 

RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-25 Thread Subramanyam Ramanathan
Hi Zhu,

Thanks a lot !
Since 1.8.2 is also available, would it be right to assume 1.8.2 would also 
contain the fix ?

Thanks,
Subbu


From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Tuesday, September 24, 2019 9:39 PM
To: Subramanyam Ramanathan 
Cc: Dian Fu ; user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

I checked the commits.
There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0 contain 
both of them.

Thanks,
Zhu Zhu

Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年9月24日周二 下午11:02写道:
Hi Zhu,

We also use FlinkKafkaProducer(011), hence I felt this fix would also be needed 
for us.

I agree that the fix for the issue I had originally mentioned would not be 
fixed by this, but I felt that I should be consuming this fix also.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Tuesday, September 24, 2019 6:13 PM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: Dian Fu mailto:dian0511...@gmail.com>>; 
user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

I think you do not need the fix in FLINK-10455 which is for Kafka only. It's 
just a similar issue as you met.
As you said, we need to make sure that the operator/UDF spawned threads are 
stopped in the close() method. In this way, we can avoid the thread to throw 
NoClassDefFoundError due to the class loader gets closed.

Thanks,
Zhu Zhu


Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年9月24日周二 下午8:07写道:
Hi,

Thank you.
I think the takeaway for us is that we need to make sure that the threads are 
stopped in the close() method.

With regard to FLINK-10455, I see that the fix versions say : 1.5.6, 1.7.0, 
1.7.3, 1.8.1, 1.9.0

However, I’m unable to find 1.7.3 in the downloads 
page(https://flink.apache.org/downloads.html). Is it yet to be released, or 
perhaps I am not looking in the right place ?
We’re currently using 1.7.2. Could you please let me know what is the minimal 
upgrade for me to consume the fix for FLINK-10455 ?

Thanks,
Subbu

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Monday, September 23, 2019 1:54 PM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: Zhu Zhu mailto:reed...@gmail.com>>; 
user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subbu,

The issue you encountered is very similar to the issue which has been fixed in 
FLINK-10455 [1]. Could you check if that fix could solve your problem? The root 
cause for that issue is that the method close() has not closed all things. 
After the method "close()" is called, the classloader (URLClassloader) will be 
closed. If there is thread still running after "close()" method is called, it 
may access the classes in user provided jars. However, as the URLClassloader 
has already been closed, NoClassDefFoundError will be thrown.

Regards,
Dian

[1] https://issues.apache.org/jira/browse/FLINK-10455

在 2019年9月23日,下午2:50,Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 写道:

Hi,

I was able to simulate the issue again and understand the cause a little better.

The issue occurs when :
-One of the RichMapFunction transformations uses a third party library 
in the open() method that spawns a thread.
-The thread doesn’t get properly closed in the close() method.
-Once the job starts failing, we start seeing a NoClassDefFound error 
from that thread.

I understand that cleanup should be done in the close() method. However, just 
wanted to know, do we have some kind of a configuration setting  which would 
help us clean up such threads ?
I can attach the code if required.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Friday, August 9, 2019 7:43 AM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

Could you share more information? including:
1. the URL pattern
2. the detailed exception and the log around it
3. the cluster the job is running on, e.g. standalone, yarn, k8s
4. it's session mode or per job mode

This information would be helpful to identify the failure cause.

Thanks,
Zhu Zhu











Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年8月9日周五 上午1:45写道:

Hello,

I'm currently using flink 1.7.2.

I'm trying to run a job that's submitted programmatically using the 
ClusterClient API.
   public JobSubmissionResult run(PackagedProgram prog, int 
parallelism)


The job makes use of some jars which I add to the packaged program through the 

Re: Question about reading ORC file in Flink

2019-09-25 Thread Fabian Hueske
Thank you very much for coming back and reporting the good news! :-)
If you think that there is something that we can do to improve Flink's ORC
input format, for example log a warning, please open a Jira.

Thank you,
Fabian

Am Mi., 25. Sept. 2019 um 05:14 Uhr schrieb 163 :

> Hi Fabian,
>
> After debugging in local mode, I found that Flink orc connector is no
> problem, but some fields in our schema is in capital form,so these fields
> can not be matched.
> But the program directly read orc file using includeColumns method, which
> will use equalsIgnoreCase to match the column, so it can read the fields.
>
> Thanks for your Help!
>
> Qi Shu
>
>
> 在 2019年9月24日,下午4:36,Fabian Hueske  写道:
>
> Hi QiShu,
>
> It might be that Flink's OrcInputFormat has a bug.
> Can you open a Jira issue to report the problem?
> In order to be able to fix this, we need as much information as possible.
> It would be great if you could create a minimal example of an ORC file and
> a program that reproduces the issue.
> If that's not possible, we need the schema of an Orc file that cannot be
> correctly read.
>
> Thanks,
> Fabian
>
> Am Mo., 23. Sept. 2019 um 11:40 Uhr schrieb ShuQi :
>
>> Hi Guys,
>>
>> The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS
>> and the job is executed successfully, no any exception or error. But some
>> fields(such as tagIndustry) are always null, actually these fields are not
>> null. I can read these fields by direct reading it. Below is my code:
>>
>> //main
>>  final ParameterTool params = ParameterTool.fromArgs(args);
>>
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setGlobalJobParameters(params);
>>
>> Configuration config = new Configuration();
>>
>>
>> OrcTableSource orcTableSource = OrcTableSource
>> .builder()
>> .path(params.get("input"))
>> .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
>> .withConfiguration(config)
>> .build();
>>
>> DataSet dataSet = orcTableSource.getDataSet(env);
>>
>> DataSet> counts = dataSet.flatMap(new
>> Tokenizer()).groupBy(0).sum(1);
>>
>> //read field
>> public void flatMap(Row row, Collector> out) {
>>
>> String content = ((String) row.getField(6));
>> String tagIndustry = ((String) row.getField(35));
>>
>> LOGGER.info("arity: " + row.getArity());
>> LOGGER.info("content: " + content);
>> LOGGER.info("tagIndustry: " + tagIndustry);
>> LOGGER.info("===");
>>
>> if (Strings.isNullOrEmpty(content) ||
>> Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
>> return;
>> }
>> // normalize and split the line
>> String[] tokens = content.toLowerCase().split("\\W+");
>>
>> // emit the pairs
>> for (String token : tokens) {
>> if (token.length() > 0) {
>> out.collect(new Tuple2<>(token, 1));
>> }
>> }
>> }
>>
>> Thanks for your help!
>>
>> QiShu
>>
>>
>>
>>
>>
>
>


Re: Running flink on AWS ECS

2019-09-25 Thread Terry Wang
Hi, Navneeth,

I think both is ok. 
IMO, run one container with number of slots same as virtual cores may be better 
for slots can share the Flink Framework and thus reduce memory cost.

Best,
Terry Wang



> 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
> 
> Hi All,
> 
> I’m currently running flink on amazon ecs and I have assigned task slots 
> based on vcpus per instance. Is it beneficial to run a separate container 
> with one slot each or one container with number of slots same as virtual 
> cores?
> 
> Thanks



Running flink on AWS ECS

2019-09-25 Thread Navneeth Krishnan
Hi All,

I’m currently running flink on amazon ecs and I have assigned task slots
based on vcpus per instance. Is it beneficial to run a separate container
with one slot each or one container with number of slots same as virtual
cores?

Thanks


Explain time based windows

2019-09-25 Thread srikanth flink
Hi,

I'm trying to join a dynamic table and static(periodic batch update) table
using:
SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip
AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '65' MINUTE AND
K.k_proctime + INTERVAL '5' MINUTE.

Note, KafkaSource is a dynamic table, BadIP is a static table. Data rate is
2.2G every 1 minute.
1) Help me understand the query wrt the time intervals.
2) Would there be a possibility duplicate data or changes of missing data
due to time intervals?
3) Would Flink evict the older records less than the time interval defined?


Thanks
Srikanth