Re: Poor performance with large keys using RocksDB and MapState

2020-10-01 Thread Yun Tang
Hi

The option of 'setCacheIndexAndFilterBlocks' is used to ensure we could manage 
the memory usage of RocksDB, could you share logs or more descriptions why 
setCacheIndexAndFilterBlocks seems to make the hash index not work properly?

I guess this might due to the index and filter is more likely to be pop out 
with the competition of data blocks [1], although Flink has tried its best to 
minimize the regression. Please consider to increase the total block cache size 
or decrease state.backend.rocksdb.memory.write-buffer-ratio [2]

[1] 
https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#memory-management

Best
Yun Tang

From: ירון שני 
Sent: Tuesday, September 29, 2020 17:49
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: Poor performance with large keys using RocksDB and MapState

Thanks Yun!,
I used this option, and it greatly helped

2:44

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends 
DefaultConfigurableOptionsFactory {  override def 
createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: 
util.Collection[AutoCloseable]): ColumnFamilyOptions = {
super.createColumnOptions(currentOptions, 
handlesToClose).optimizeForPointLookup(2000)
  }
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)

But now I cant use the RocksDBSharedResources because of 
setCacheIndexAndFilterBlocks seems to make the hash index not work properly and 
the performance is bad again.
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and 
skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi

If you want to improve the performance of point lookup, you could try to use 
additional hash index. This feature needs to pass a prefix extractor, however, 
original interface is not exposed out directly in java API.

You could try to call 
columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would use 
NoopTransform prefix extractor by default[1].
Please also consider to use this feature after Flink-1.10.2 due to RocksDB 
internal bug [2].

[1] 
https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
[2] https://issues.apache.org/jira/browse/FLINK-17800

Best
Yun Tang



From: ירון שני mailto:yaron.sh...@gmail.com>>
Sent: Wednesday, September 23, 2020 23:56
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Poor performance with large keys using RocksDB and MapState

Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the 
following code:

val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 
1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 
1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))

val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  .setStateBackend(be)


env.setParallelism(3)
env.getConfig.enableObjectReuse()

val r = new scala.util.Random(31)
val randStr = r.nextString(4992)
val s = env.fromElements(1).process((value: Int, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, 
_root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
  for (a <- 1 to 1000 * 1000 * 10) {
out.collect( randStr + r.nextString(8) )

  }
}).keyBy(a=>a).process(new ProcessFunction[String, String] {
  private var someState: MapState[String, String] = _

  override def open(parameters: Configuration): Unit = {
someState = getRuntimeContext.getMapState(
  new MapStateDescriptor[String, String]("someState", 
createTypeInformation[String], createTypeInformation[String])
)
  }

  override def processElement(value: _root_.scala.Predef.String, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
 _root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
if(!someState.contains(value)) {
  someState.put(value, value)
}
  }
})

env.execute()

This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix 

Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-10-01 Thread Yang Wang
3. Make sense to me. And we could add a new HA solution "StatefulSet + PV +
FileSystem"
at any time if we need in the future.

Since there are no more open questions, I will start the voting now.
Thanks all for your comments and feedback. Feel feel to continue the
discussion if you get
other concerns.


Best,
Yang

Till Rohrmann  于2020年10月1日周四 下午4:52写道:

> 3. We could avoid force deletions from within Flink. If the user does it,
> then we don't give guarantees.
>
> I am fine with your current proposal. +1 for moving forward with it.
>
> Cheers,
> Till
>
> On Thu, Oct 1, 2020 at 2:32 AM Yang Wang  wrote:
>
> > 2. Yes. This is exactly what I mean. Storing the HA information relevant
> > to a specific component in a single ConfigMap and ensuring that
> “Get(check
> > the leader)-and-Update(write back to the ConfigMap)” is a transactional
> > operation. Since we only store the job graph stateHandler(not the real
> > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> > ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> > could we have more than 1000 Flink jobs in a Flink session cluster.
> >
> > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> > could provide at most one semantics if no manually force-deletion
> > happened[1]. Based on the previous discussion, we have successfully
> avoided
> > the "lock-and-release" in the implementation. So I still insist on using
> > the current Deployment.
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
> >
> >
> > Best,
> > Yang
> >
> > Till Rohrmann  于2020年9月30日周三 下午11:57写道:
> >
> >> Thanks for the clarifications Yang Wang.
> >>
> >> 2. Keeping the HA information relevant for a component (Dispatcher,
> >> JobManager, ResourceManager) in a single ConfigMap sounds good. We
> should
> >> check that we don't exceed the 1 MB size limit with this approach
> though.
> >> The Dispatcher's ConfigMap would then contain the current leader, the
> >> running jobs and the pointers to the persisted JobGraphs. The
> JobManager's
> >> ConfigMap would then contain the current leader, the pointers to the
> >> checkpoints and the checkpoint ID counter, for example.
> >>
> >> 3. Ah ok, I somehow thought that K8s would give us stronger
> >> guarantees than Yarn in this regard. That's a pity.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Sep 30, 2020 at 10:03 AM tison  wrote:
> >>
> >>> Thanks for your explanation. It would be fine if only checking
> >>> leadership & actually write information is atomic.
> >>>
> >>> Best,
> >>> tison.
> >>>
> >>>
> >>> Yang Wang  于2020年9月30日周三 下午3:57写道:
> >>>
>  Thanks till and tison for your comments.
> 
>  @Till Rohrmann 
>  1. I am afraid we could not do this if we are going to use fabric8
>  Kubernetes client SDK for the leader election. The official
> Kubernetes Java
>  client[1] also could not support it. Unless we implement a new
>  LeaderElector in Flink based on the very basic Kubernetes API. But it
> seems
>  that we could gain too much from this.
> 
>  2. Yes, the implementation will be a little complicated if we want to
>  completely eliminate the residual job graphs or checkpoints. Inspired
> by
>  your suggestion, another different solution has come into my mind. We
> could
>  use a same ConfigMap storing the JobManager leader, job graph,
>  checkpoint-counter, checkpoint. Each job will have a specific
> ConfigMap for
>  the HA meta storage. Then it will be easier to guarantee that only the
>  leader could write the ConfigMap in a transactional operation. Since
>  “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>  transactional operation.
> 
>  3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>  However, we still have the chances that two JobManager are running and
>  trying to get/delete a key in the same ConfigMap concurrently.
> Imagine that
>  the kubelet(like NodeManager in YARN) is down, and then the JobManager
>  could not be deleted. A new JobManager pod will be launched. We are
> just in
>  the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>  benefit is we do not need to implement a leader election/retrieval
> service.
> 
>  @tison
>  Actually, I do not think we will have such issue in the Kubernetes HA
>  service. In the Kubernetes LeaderElector[2], we have the leader
> information
>  stored on the annotation of leader ConfigMap. So it would not happen
> the
>  old leader could wrongly override the leader information. Once a
> JobManager
>  want to write his leader information to the ConfigMap, it will check
>  whether it is the leader now. If not, anything will happen. Moreover,
> the
>  Kubernetes Resource Version[3] ensures that no one else has snuck in
> and
>  written a different update while 

Re: Flink 1.12 snapshot throws ClassNotFoundException

2020-10-01 Thread Lian Jiang
Thanks Till.  Making the scala version consistent using 2.11 solved the
ClassNotFoundException.

On Tue, Sep 29, 2020 at 11:58 PM Till Rohrmann  wrote:

> Hi Lian,
>
> I suspect that it is caused by an incompatible Akka version. Flink uses
> Akka 2.5.21 instead of 2.5.12. Moreover, you are mixing Flink jars which
> use Scala 2.11 with Akka dependencies which are built against Scala 2.12.
>
> I am not an Gradle expert but can't Gradle simply pull in the transitive
> dependencies of flink-runtime?
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 2:22 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I use Flink source master to build a snapshot and use the jars in my
>> project. The goal is to avoid hacky deserialization code caused by avro 1.8
>> in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the
>> code throws below ClassNotFoundException. I have verified that the
>> akka-actor jar 2.5.12 is available and specified in -classpath. I can even
>> create an object using akka/serialization/NullSerializer class in my
>> application, indicating there is no problem for this app to use any class
>> under namespace akka/serialization.
>>
>> Caused by: java.lang.NoClassDefFoundError:
>> akka/serialization/BaseSerializer$class
>> at
>> akka.remote.serialization.MiscMessageSerializer.(MiscMessageSerializer.scala:25)
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:33)
>> at scala.util.Try$.apply(Try.scala:213)
>> at
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:28)
>> at
>> akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$4(ReflectiveDynamicAccess.scala:39)
>> at scala.util.Success.flatMap(Try.scala:251)
>> at
>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:39)
>> at akka.serialization.Serialization.serializerOf(Serialization.scala:320)
>> at
>> akka.serialization.Serialization.$anonfun$serializers$2(Serialization.scala:346)
>> at
>> scala.collection.TraversableLike$WithFilter.$anonfun$map$2(TraversableLike.scala:874)
>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:394)
>> at
>> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:721)
>> at
>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:873)
>> at akka.serialization.Serialization.(Serialization.scala:346)
>> at
>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:16)
>> at
>> akka.serialization.SerializationExtension$.createExtension(SerializationExtension.scala:13)
>> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:913)
>> at
>> akka.actor.ActorSystemImpl.$anonfun$loadExtensions$1(ActorSystem.scala:946)
>> at scala.collection.Iterator.foreach(Iterator.scala:943)
>> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>> at akka.actor.ActorSystemImpl.loadExtensions$1(ActorSystem.scala:944)
>> at akka.actor.ActorSystemImpl.loadExtensions(ActorSystem.scala:961)
>> at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:833)
>> at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:823)
>> at akka.actor.ActorSystemImpl._start(ActorSystem.scala:823)
>> at akka.actor.ActorSystemImpl.start(ActorSystem.scala:842)
>> at akka.actor.RobustActorSystem$.internalApply(RobustActorSystem.scala:96)
>> at akka.actor.RobustActorSystem$.apply(RobustActorSystem.scala:70)
>> at akka.actor.RobustActorSystem$.create(RobustActorSystem.scala:55)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:125)
>> at
>> org.apache.flink.runtime.akka.AkkaUtils.createActorSystem(AkkaUtils.scala)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startActorSystem(BootstrapTools.java:276)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startLocalActorSystem(BootstrapTools.java:260)
>> ... 11 more
>>
>>
>> This is my gradle:
>>
>> implementation files('lib/flink-avro-confluent-registry-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-clients_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-connector-kafka_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-connector-wikiedits_2.11-1.12-SNAPSHOT.jar')
>> implementation files('lib/flink-core-1.12-SNAPSHOT.jar')
>> implementation 

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-01 Thread Austin Cawley-Edwards
Hey Till,

Just a quick question on time characteristics -- this should work for
IngestionTime as well, correct? Is there anything special I need to do to
have the CsvTableSource/ toRetractStream call to carry through the assigned
timestamps, or do I have to re-assign timestamps during the conversion? I'm
currently getting the `Record has Long.MIN_VALUE timestamp (= no timestamp
marker)` error, though I'm seeing timestamps being assigned if I step
through the AutomaticWatermarkContext.

Thanks,
Austin

On Thu, Oct 1, 2020 at 10:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Perfect, thanks so much Till!
>
> On Thu, Oct 1, 2020 at 5:13 AM Till Rohrmann  wrote:
>
>> Hi Austin,
>>
>> I believe that the problem is the processing time window. Unlike for
>> event time where we send a MAX_WATERMARK at the end of the stream to
>> trigger all remaining windows, this does not happen for processing time
>> windows. Hence, if your stream ends and you still have an open processing
>> time window, then it will never get triggered.
>>
>> The problem should disappear if you use event time or if you process
>> unbounded streams which never end.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> Thanks for your patience. I've got a small repo that reproduces the
>>> issue here: https://github.com/austince/flink-1.10-sql-windowing-error
>>>
>>> Not sure what I'm doing wrong but it feels silly.
>>>
>>> Thanks so much!
>>> Austin
>>>
>>> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey Till,

 Thanks for the reply -- I'll try to see if I can reproduce this in a
 small repo and share it with you.

 Best,
 Austin

 On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
 wrote:

> Hi Austin,
>
> could you share with us the exact job you are running (including the
> custom window trigger)? This would help us to better understand your
> problem.
>
> I am also pulling in Klou and Timo who might help with the windowing
> logic and the Table to DataStream conversion.
>
> Cheers,
> Till
>
> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm not sure if I've missed something in the docs, but I'm having a
>> bit of trouble with a streaming SQL job that starts w/ raw SQL queries 
>> and
>> then transitions to a more traditional streaming job. I'm on Flink 1.10
>> using the Blink planner, running locally with no checkpointing.
>>
>> The job looks roughly like:
>>
>> CSV 1 -->
>> CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time
>> window w/ process func & custom trigger --> some other ops
>> CSV 3 -->
>>
>>
>> When I remove the windowing directly after the `toRetractStream`, the
>> records make it to the "some other ops" stage, but with the windowing,
>> those operations are sometimes not sent any data. I can also get data 
>> sent
>> to the downstream operators by putting in a no-op map before the window 
>> and
>> placing some breakpoints in there to manually slow down processing.
>>
>>
>> The logs don't seem to indicate anything went wrong and generally
>> look like:
>>
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
>> (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
>> FINISHED.\4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
>> 4819 [Source: Custom File source (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
>> streams are closed for task Source: Custom File source (1/1)
>> (3578629787c777320d9ab030c004abd4) [FINISHED]
>> 4820 [flink-akka.actor.default-dispatcher-5] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering 
>> task
>> and sending final execution state FINISHED to JobManager for task Source:
>> Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
>> ...
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  -
>> Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) 
>> switched
>> from RUNNING to FINISHED.
>> 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
>> ProcessWindowFunction$1) (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
>> 

RE: Blobserver dying mid-application

2020-10-01 Thread Hailu, Andreas
@Chesnay:
I see. I actually had a separate thread with Robert Metzger ago regarding 
connection-related issues we’re plagued with at higher parallelisms, and his 
guidance lead us to look into our somaxconn config. We increased it from 128 to 
1024 in early September. We use the same generic JAR for all of our apps, so I 
don’t think JAR size is the cause. Just so I’m clear: when you say Flink 
session cluster – if we have 2 independent Flink applications  A & B with 
JobManagers that just happen to be running on the same DataNode, they don’t 
share Blobservers, right?

In regard to historical behavior, no, I haven’t seen these Blobserver 
connection problems until after the somaxconn config change. From an app 
perspective, the only way these ones are different is that they’re wide rather 
than deep i.e. large # of jobs to submit instead of a small handful of jobs 
with large amounts of data to process. If we have many jobs to submit, as soon 
as one is complete, we’re trying to submit the next.

I saw an example today of an application using 10 TaskManagers w/ 2 slots with 
a total 194 jobs to submit with at most 20 running in parallel fail with the 
same error. I’m happy to try increasing both the concurrent connections and 
backlog to 128 and 2048 respectively, but I still can’t make sense of how a 
backlog of 1,000 connections is being met by 10 Task Managers/20 connections at 
worst.

$ sysctl -a | grep net.core.somaxconn
net.core.somaxconn = 1024

// ah

From: Chesnay Schepler 
Sent: Thursday, October 1, 2020 1:41 PM
To: Hailu, Andreas [Engineering] ; Till Rohrmann 

Cc: user@flink.apache.org; Nico Kruber 
Subject: Re: Blobserver dying mid-application

All jobs running in a Flink session cluster talk to the same blob server.

The time when tasks are submitted depends on the job; for streaming jobs all 
tasks are deployed when the job starts running; in case of batch jobs the 
submission can be staggered.

I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run  (this is what you see 
in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job is 
quite large, but let's assume it does.

For a) there should be at most numberOfSlots * numberOfTaskExecutors concurrent 
connections, in the worst case of each slot working on a different job, as each 
would download the jars for their respective job. If multiple slots are used 
for the same job at the same time, then the job jar is only retrieved once.

For b) the limit should also be numberOfSlots * numberOfTaskExecutors; it is 
done once per task, and there are only so many tasks that can run at the same 
time.

Thus from what I can tell there should be at most 104 (26 task executors * 2 
slots * 2) concurrent attempts, of which only 54 should land in the backlog.

Did you run into this issue before?
If not, is this application different than your existing applications? Is the 
jar particularly big, jobs particularly short running or more complex than 
others.

One thing to note is that the backlog relies entirely on OS functionality, 
which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore it; it 
appears that 128 is not an uncommon upper limit, but maybe there are lower 
settings out there.
You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?

It may also be worthwhile to monitor the number of such connections. (netstat 
-ant | grep -c SYN_REC)

@Nico Do you have any ideas?

On 10/1/2020 6:26 PM, Hailu, Andreas wrote:
Hi Chesnay, Till, thanks for responding.

@Chesnay:
Apologies, I said cores when I meant slots ☺ So a total of 26 Task managers 
with 2 slots each for a grand total of 52 parallelism.

@Till:
For this application, we have a grand total of 78 jobs, with some of them 
demanding more parallelism than others. Each job has multiple operators – 
depending on the size of the data we’re operating on, we could submit 1 whopper 
with 52 parallelism, or multiple smaller jobs submitted in parallel with a sum 
of 52 parallelism. When does a task submission to a `TaskExecutor` take place? 
Is that on job submission or something else? I’m just curious as a parallelism 
of 52 seems on the lower side to breach 1K connections in the queue, unless 
interactions with the Blobserver are much more frequent than I think. Is it 
possible that separate Flink jobs share the same Blobserver? Because we have 
thousands of Flink applications running concurrently in our YARN cluster.

// ah

From: Chesnay Schepler 
Sent: Thursday, October 1, 2020 5:42 AM
To: Till Rohrmann ; Hailu, 
Andreas [Engineering] 

Cc: user@flink.apache.org
Subject: Re: Blobserver dying mid-application

It would also be 

Re: Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

2020-10-01 Thread David Anderson
If you were to use per-partition watermarking, which you can do by
calling assignTimestampsAndWatermarks directly on the Flink Kafka consumer
[1], then I believe the idle partition(s) would consistently hold back the
overall watermark.

With per-partition watermarking, each Kafka source task will apply the
watermarking separately to each partition it is handling, and then emit as
its watermark the minimum of those per-partition watermarks. At least one
of the Kafka source tasks will therefore have a watermark of 0, and
assuming you have a keyBy after the watermarking and before the process
function, that will hold back the watermark at the process function.

Otherwise, if you apply watermarking to the output of the Kafka source
tasks, then whether the watermarking tasks have a watermark of 0 or not
depends on whether their corresponding Kafka source task has any non-idle
partitions. If the assignment of partitions to instances isn't
deterministic, this could explain why you are seeing different results in
IntelliJ.

Note that the handling of idle sources has been reworked in Flink 1.11 [2],
and bug fixes related to that are still pending [3].

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[3] https://issues.apache.org/jira/browse/FLINK-18934

On Thu, Oct 1, 2020 at 12:10 PM Salva Alcántara 
wrote:

> I am considering this watermarker:
>
> ```scala
> class MyWatermarker(val maxTimeLag: Long = 0)
> extends AssignerWithPeriodicWatermarks[MyEvent] {
>   var maxTs: Long = 0
>
>   override def extractTimestamp(e: MyEvent, previousElementTimestamp:
> Long):
> Long = {
> val timestamp = e.timestamp
> maxTs = maxTs.max(timestamp)
> timestamp
>   }
>
>   override def getCurrentWatermark: WatermarkOld = {
> println(s"event watermark: ${maxTs - maxTimeLag}")
> new WatermarkOld(maxTs - maxTimeLag)
>   }
> ```
>
> The underlying events come from a kafka source, and are then handed to a
> process function. The implementation is irrelevant for the question, I will
> just share the relevant bit:
>
> ```scala
>   override def processElement(
> event: MyEvent,
> ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
> out: Collector[StreamEvent]
>   ): Unit = {
> println(
>   s"In process function, got event: $event, ctx.timestamp:
> ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
> )
>   ...
>   }
> ```
>
> When I run this app on a real kubernetes cluster using a kafka source topic
> having idle partitions, the watermark is held back to 0 as expected:
>
> ```
> In process function, got event: xxx, ctx.timestamp: 1601475710619,
> currentWatermark: 0
> ```
>
> I can also see these logs generated in the watermarker:
>
> ```
> event watermark: 1601475710619
> event watermark: 0
> event watermark: 1601475710619
> event watermark: 0
> ```
>
> The funny thing is that when I run the same application locally on
> IntelliJ,
> and also have idle kafka partitions for the same topic, I am getting also
> the above logs from the watermarker, with the watermark oscillating between
> 0 and the ts of the latest received element, since `maxLag = 0`. However,
> quite unexpectedly for me, the logs from the process function show that the
> watermark is yet advancing:
>
> ```
> In process function, got event: xxx, ctx.timestamp: 1601475710619,
> currentWatermark: 1601475710618
> ```
>
> Why is this happening? FYI, I am using Flink 1.10 with the environment
> parallelism set to 2 and event time semantics in both cases.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Blobserver dying mid-application

2020-10-01 Thread Chesnay Schepler

All jobs running in a Flink session cluster talk to the same blob server.

The time when tasks are submitted depends on the job; for streaming jobs 
all tasks are deployed when the job starts running; in case of batch 
jobs the submission can be staggered.


I'm only aware of 2 cases where we transfer data via the blob server;
a) retrieval of jars required for the user code to run  (this is what 
you see in the stack trace)
b) retrieval of TaskInformation, which _should_ only happen if your job 
is quite large, but let's assume it does.


For a) there should be at most numberOfSlots * numberOfTaskExecutors 
concurrent connections, in the worst case of each slot working on a 
different job, as each would download the jars for their respective job. 
If multiple slots are used for the same job at the same time, then the 
job jar is only retrieved once.


For b) the limit should also be numberOfSlots * numberOfTaskExecutors; 
it is done once per task, and there are only so many tasks that can run 
at the same time.


Thus from what I can tell there should be at most 104 (26 task executors 
* 2 slots * 2) concurrent attempts, of which only 54 should land in the 
backlog.


Did you run into this issue before?
If not, is this application different than your existing applications? 
Is the jar particularly big, jobs particularly short running or more 
complex than others.


One thing to note is that the backlog relies entirely on OS 
functionality, which can be subject to an upper limit enforced by the OS.
The configured backlog size is just a hint to the OS, but it may ignore 
it; it appears that 128 is not an uncommon upper limit, but maybe there 
are lower settings out there.

You can check this limit via sysctl -a | grep net.core.somaxconn
Maybe this value is set to 0, effectively disabling the backlog?

It may also be worthwhile to monitor the number of such 
connections.(|netstat -ant | grep -c SYN_REC)|


@Nico Do you have any ideas?

On 10/1/2020 6:26 PM, Hailu, Andreas wrote:


Hi Chesnay, Till, thanks for responding.

@Chesnay:

Apologies, I said cores when I meant slots JSo a total of 26 Task 
managers with 2 slots each for a grand total of 52 parallelism.


@Till:

For this application, we have a grand total of 78 jobs, with some of 
them demanding more parallelism than others. Each job has multiple 
operators – depending on the size of the data we’re operating on, we 
could submit 1 whopper with 52 parallelism, or multiple smaller jobs 
submitted in parallel with a sum of 52 parallelism. When does a task 
submission to a `TaskExecutor` take place? Is that on job submission 
or something else? I’m just curious as a parallelism of 52 seems on 
the lower side to breach 1K connections in the queue, unless 
interactions with the Blobserver are much more frequent than I think. 
Is it possible that separate Flink jobs share the same Blobserver? 
Because we have thousands of Flink applications running concurrently 
in our YARN cluster.


*// *ah**

*From:*Chesnay Schepler 
*Sent:* Thursday, October 1, 2020 5:42 AM
*To:* Till Rohrmann ; Hailu, Andreas 
[Engineering] 

*Cc:* user@flink.apache.org
*Subject:* Re: Blobserver dying mid-application

It would also be good to know how many slots you have on each task 
executor.


On 10/1/2020 11:21 AM, Till Rohrmann wrote:

Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can
trigger a new connection to the BlobServer. This depends a bit on
how large your TaskInformation is and whether this information is
being offloaded to the BlobServer. What you can definitely try to
do is to increase the blob.fetch.backlog in order to see whether
this solves the problem.

How many jobs and in with what timeline do you submit them to the
Flink cluster? Maybe you can share a bit more details about the
application you are running.

Cheers,

Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas
mailto:andreas.ha...@gs.com>> wrote:

Hello folks, I’m seeing application failures where our
Blobserver is refusing connections mid application:

2020-09-30 13:56:06,227 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor -
Un-registering task and sending final execution state FINISHED
to JobManager for task DataSink (TextOutputFormat

(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
- UTF-8) 3d1890b47f4398d805cf0c1b54286f71.

2020-09-30 13:56:06,423 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable -
Free slot TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147483647, directMemoryInMB=2147483647,

Re: Stateful Functions + ML model prediction

2020-10-01 Thread John Morrow
Hi Flink Users,

I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward 
(https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka & 
Kinesis are supported, and looking at 
https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO packages 
for those two: statefun-kafka-io & statefun-kinesis-io


Is it possible to use Apache Pulsar as a Statefun ingress & egress?

Thanks,
John.


From: John Morrow 
Sent: Wednesday 23 September 2020 11:37
To: Igal Shilman 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds 
like a good direction.

Thanks again,
John.


From: Igal Shilman 
Sent: Wednesday 23 September 2020 09:06
To: John Morrow 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and 
Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the 
necessary context (any previous state for a key, and the message) to the HTTP 
request.
So practically speaking the same remote function can be contacted by different 
Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules 
and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new 
version of the remote function container, as they can be independy restarted 
(without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an 
RichAsyncFunction, as StateFun, by default, invokes many remote functions in 
parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or 
even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP 
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would 
also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can 
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction 

Re: flinksql注册udtf使用ROW类型做为输出输出时出错

2020-10-01 Thread Xingbo Huang
Hello,
这个算是个易用性的问题,我之前有创建了JIRA[1]。你现在直接用[DataTypes.STRING(),
DataTypes.STRING()]作resultType就是对的。关于input_types那个问题,实际上input_types在内部是通过上游的result_type匹配得出来的,所以你这里没对应也是对的,1.12版本将不再需要指定result_type了。

Best,
Xingbo

[1] https://issues.apache.org/jira/browse/FLINK-19138

chenxuying  于2020年9月30日周三 下午7:18写道:

> 上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
> DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(),
> [DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数,
> 好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
> 在 2020-09-30 19:07:06,"chenxuying"  写道:
> >版本:
> >pyflink==1.0
> >apache-flink==1.11.2
> >代码如下:
> >env = StreamExecutionEnvironment.get_execution_environment()
> >env.set_parallelism(1)
> >t_env = StreamTableEnvironment.create(env)
> >t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
> >
> >
> >class SplitStr(TableFunction):
> >def eval(self, data):
> >for row in data:
> >yield row[0], row[1]
> >splitStr = udtf(
> >SplitStr(),
> >DataTypes.ARRAY(
> >DataTypes.ROW(
> >[
> >DataTypes.FIELD("name", DataTypes.STRING()),
> >DataTypes.FIELD("id", DataTypes.STRING())
> >]
> >)
> >),
> >DataTypes.ROW(
> >[
> >DataTypes.FIELD("name", DataTypes.STRING()),
> >DataTypes.FIELD("id", DataTypes.STRING())
> >]
> >)
> >)
> >t_env.register_function("splitStr", splitStr)
> >
> >
> >t_env.sql_update("""
> >CREATE TABLE mySource (
>
> >id varchar,
> >data array>
> >) WITH (
> >'connector' = 'kafka',
> >'topic' = 'mytesttopic',
> >'properties.bootstrap.servers' = '172.17.0.2:9092',
> >'properties.group.id' = 'flink-test-cxy',
> >'scan.startup.mode' = 'latest-offset',
> >'format' = 'json'
> >)
> >""")
> >t_env.sql_update("""
> >CREATE TABLE mysqlsink (
> >id varchar
> >,name varchar
> >,age  varchar
> >)
> >with (
> >'connector' = 'print'
> >)
> >""")
> >t_env.sql_update("insert into mysqlsink select id,name,age from mySource
> ,LATERAL TABLE(splitStr(data)) as T(name, age)")
> >t_env.execute("test")
> >
> >
> >最终报错
> >TypeError: Invalid result_type: result_type should be DataType but
> contains RowField(name, VARCHAR)
> >报错的地方是
> >File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
> >
> >
> >def __init__(self, func, input_types, result_types, deterministic=None,
> name=None):
> >super(UserDefinedTableFunctionWrapper, self).__init__(
> >func, input_types, deterministic, name)
> >
> >
> >if not isinstance(result_types, collections.Iterable):
> >result_types = [result_types]
> >
> >
> >for result_type in result_types:
> >if not isinstance(result_type, DataType):
> >raise TypeError(
> >"Invalid result_type: result_type should be DataType but contains
> {}".format(
> >result_type))
> >
> >
> >self._result_types = result_types
> >self._judtf_placeholder = None
> >
> >
> >断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
> >
> >
> >另外的,假如我在
> >上面在创建udtf的时候,如果这样写
> >splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(),
> DataTypes.BIGINT()])
> >却可以正常运行,但是显然类型跟我实际运行的不对应
>


Experiencing different watermarking behaviour in local/standalone modes when reading from kafka topic with idle partitions

2020-10-01 Thread Salva Alcántara
I am considering this watermarker:

```scala
class MyWatermarker(val maxTimeLag: Long = 0)
extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long):
Long = {
val timestamp = e.timestamp
maxTs = maxTs.max(timestamp)
timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
println(s"event watermark: ${maxTs - maxTimeLag}")
new WatermarkOld(maxTs - maxTimeLag)
  }
```

The underlying events come from a kafka source, and are then handed to a
process function. The implementation is irrelevant for the question, I will
just share the relevant bit:

```scala
  override def processElement(
event: MyEvent,
ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
out: Collector[StreamEvent]
  ): Unit = {
println(
  s"In process function, got event: $event, ctx.timestamp:
${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
)
  ...
  }
```

When I run this app on a real kubernetes cluster using a kafka source topic
having idle partitions, the watermark is held back to 0 as expected:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 0
```

I can also see these logs generated in the watermarker:

```
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
```

The funny thing is that when I run the same application locally on IntelliJ,
and also have idle kafka partitions for the same topic, I am getting also
the above logs from the watermarker, with the watermark oscillating between
0 and the ts of the latest received element, since `maxLag = 0`. However,
quite unexpectedly for me, the logs from the process function show that the
watermark is yet advancing:

```
In process function, got event: xxx, ctx.timestamp: 1601475710619,
currentWatermark: 1601475710618
```

Why is this happening? FYI, I am using Flink 1.10 with the environment
parallelism set to 2 and event time semantics in both cases.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Blobserver dying mid-application

2020-10-01 Thread Chesnay Schepler

It would also be good to know how many slots you have on each task executor.

On 10/1/2020 11:21 AM, Till Rohrmann wrote:

Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a 
new connection to the BlobServer. This depends a bit on how large your 
TaskInformation is and whether this information is being offloaded to 
the BlobServer. What you can definitely try to do is to increase the 
blob.fetch.backlog in order to see whether this solves the problem.


How many jobs and in with what timeline do you submit them to the 
Flink cluster? Maybe you can share a bit more details about the 
application you are running.


Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas > wrote:


Hello folks, I’m seeing application failures where our Blobserver
is refusing connections mid application:

2020-09-30 13:56:06,227 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor -
Un-registering task and sending final execution state FINISHED to
JobManager for task DataSink (TextOutputFormat

(hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
- UTF-8) 3d1890b47f4398d805cf0c1b54286f71.

2020-09-30 13:56:06,423 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable - Free
slot TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.7976931348623157E308,
heapMemoryInMB=2147483647, directMemoryInMB=2147483647,
nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647,
managedMemoryInMB=3046}, allocationId:
e8be16ec74f16c795d95b89cd08f5c37, jobId:
e808de0373bd515224434b7ec1efe249).

2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService - Remove
job e808de0373bd515224434b7ec1efe249 from job leader monitoring.

2020-09-30 13:56:06,424 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.

2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.TaskExecutor - Close
JobManager connection for job e808de0373bd515224434b7ec1efe249.

2020-09-30 13:56:06,426 INFO
[flink-akka.actor.default-dispatcher-18]
org.apache.flink.runtime.taskexecutor.JobLeaderService - Cannot
reconnect to job e808de0373bd515224434b7ec1efe249 because it is
not registered.

2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 (retry 3)

2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Failed to fetch BLOB

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 and store it
under

/fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-0004
Retrying...

2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) -> Map (Map at
readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter
at validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
collapsePipelineIfRequired(Task.java:160)) (1/1)]
org.apache.flink.runtime.blob.BlobClient - Downloading

48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
from d43723-430.dc.gs.com/10.48.128.14:46473
 (retry 4)

2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read
Staging From File System | AVRO) 

Re: Blobserver dying mid-application

2020-10-01 Thread Till Rohrmann
Hi Andreas,

do the logs of the JM contain any information?

Theoretically, each task submission to a `TaskExecutor` can trigger a new
connection to the BlobServer. This depends a bit on how large your
TaskInformation is and whether this information is being offloaded to the
BlobServer. What you can definitely try to do is to increase the
blob.fetch.backlog in order to see whether this solves the problem.

How many jobs and in with what timeline do you submit them to the Flink
cluster? Maybe you can share a bit more details about the application you
are running.

Cheers,
Till

On Wed, Sep 30, 2020 at 11:49 PM Hailu, Andreas 
wrote:

> Hello folks, I’m seeing application failures where our Blobserver is
> refusing connections mid application:
>
>
>
> 2020-09-30 13:56:06,227 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state FINISHED to
> JobManager for task DataSink (TextOutputFormat
> (hdfs:/user/p2epda/lake/delp_prod/PROD/APPROVED/staging/datastore/MandateTradingLine/tmp_pipeline_collapse)
> - UTF-8) 3d1890b47f4398d805cf0c1b54286f71.
>
> 2020-09-30 13:56:06,423 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647,
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647,
> networkMemoryInMB=2147483647, managedMemoryInMB=3046}, allocationId:
> e8be16ec74f16c795d95b89cd08f5c37, jobId: e808de0373bd515224434b7ec1efe249).
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Remove job
> e808de0373bd515224434b7ec1efe249 from job leader monitoring.
>
> 2020-09-30 13:56:06,424 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job e808de0373bd515224434b7ec1efe249.
>
> 2020-09-30 13:56:06,426 INFO  [flink-akka.actor.default-dispatcher-18]
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
> reconnect to job e808de0373bd515224434b7ec1efe249 because it is not
> registered.
>
> 2020-09-30 13:56:09,918 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient  - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 3)
>
> 2020-09-30 13:56:09,920 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient  - Failed to
> fetch BLOB
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 and store it under
> /fs/htmp/yarn/local/usercache/delp_prod/appcache/application_1599723434208_15328880/blobStore-e2888df1-c7be-4ce6-b6b6-58e7c24a140a/incoming/temp-0004
> Retrying...
>
> 2020-09-30 13:56:09,920 INFO  [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient  - Downloading
> 48b8ba9f3de039f74483085f90e5ad64/p-0b27cb203799adb76d2a434ed3d64052d832cff3-6915d0cd0fef97f728cd890986b2bf39
> from d43723-430.dc.gs.com/10.48.128.14:46473 (retry 4)
>
> 2020-09-30 13:56:09,922 ERROR [CHAIN DataSource (dataset | Read Staging
> From File System | AVRO) -> Map (Map at
> readAvroFileWithFilter(FlinkReadUtils.java:82)) -> Filter (Filter at
> validateData(DAXTask.java:97)) -> FlatMap (FlatMap at
> handleBloomFilter(PreMergeTask.java:187)) -> FlatMap (FlatMap at
> collapsePipelineIfRequired(Task.java:160)) (1/1)]
> org.apache.flink.runtime.blob.BlobClient  - Failed to
> 

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-01 Thread Till Rohrmann
Hi Austin,

I believe that the problem is the processing time window. Unlike for event
time where we send a MAX_WATERMARK at the end of the stream to trigger all
remaining windows, this does not happen for processing time windows. Hence,
if your stream ends and you still have an open processing time window, then
it will never get triggered.

The problem should disappear if you use event time or if you process
unbounded streams which never end.

Cheers,
Till

On Thu, Oct 1, 2020 at 12:01 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> Thanks for your patience. I've got a small repo that reproduces the issue
> here: https://github.com/austince/flink-1.10-sql-windowing-error
>
> Not sure what I'm doing wrong but it feels silly.
>
> Thanks so much!
> Austin
>
> On Tue, Sep 29, 2020 at 3:48 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Till,
>>
>> Thanks for the reply -- I'll try to see if I can reproduce this in a
>> small repo and share it with you.
>>
>> Best,
>> Austin
>>
>> On Tue, Sep 29, 2020 at 3:58 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Austin,
>>>
>>> could you share with us the exact job you are running (including the
>>> custom window trigger)? This would help us to better understand your
>>> problem.
>>>
>>> I am also pulling in Klou and Timo who might help with the windowing
>>> logic and the Table to DataStream conversion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Sep 28, 2020 at 11:49 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 I'm not sure if I've missed something in the docs, but I'm having a bit
 of trouble with a streaming SQL job that starts w/ raw SQL queries and then
 transitions to a more traditional streaming job. I'm on Flink 1.10 using
 the Blink planner, running locally with no checkpointing.

 The job looks roughly like:

 CSV 1 -->
 CSV 2 -->  SQL Query to Join --> toRetractStream --> keyed time window
 w/ process func & custom trigger --> some other ops
 CSV 3 -->


 When I remove the windowing directly after the `toRetractStream`, the
 records make it to the "some other ops" stage, but with the windowing,
 those operations are sometimes not sent any data. I can also get data sent
 to the downstream operators by putting in a no-op map before the window and
 placing some breakpoints in there to manually slow down processing.


 The logs don't seem to indicate anything went wrong and generally look
 like:

 4819 [Source: Custom File source (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  - Source: Custom File source
 (1/1) (3578629787c777320d9ab030c004abd4) switched from RUNNING to
 FINISHED.\4819 [Source: Custom File source (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
 Source: Custom File source (1/1) (3578629787c777320d9ab030c004abd4).
 4819 [Source: Custom File source (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
 streams are closed for task Source: Custom File source (1/1)
 (3578629787c777320d9ab030c004abd4) [FINISHED]
 4820 [flink-akka.actor.default-dispatcher-5] INFO
  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
 and sending final execution state FINISHED to JobManager for task Source:
 Custom File source (1/1) 3578629787c777320d9ab030c004abd4.
 ...
 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
 ProcessWindowFunction$1) (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  -
 Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
 ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f) switched
 from RUNNING to FINISHED.
 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
 ProcessWindowFunction$1) (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
 Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
 ProcessWindowFunction$1) (1/1) (907acf9bfa2f4a9bbd13997b8b34d91f).
 4996 [Window(TumblingProcessingTimeWindows(6), TimedCountTrigger,
 ProcessWindowFunction$1) (1/1)] INFO
  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
 streams are closed for task Window(TumblingProcessingTimeWindows(6),
 TimedCountTrigger, ProcessWindowFunction$1) (1/1)
 (907acf9bfa2f4a9bbd13997b8b34d91f) [FINISHED]
 ...
 rest of the shutdown
 ...
 Program execution finished
 Job with JobID 889b161e432c0e69a8d760bbed205d5d has finished.
 Job Runtime: 783 ms


 Is there something I'm missing in my setup? Could it be my custom
 window trigger? Bug? I'm stumped.


 Thanks,
 Austin





Re: [DISCUSS] FLIP-144: Native Kubernetes HA for Flink

2020-10-01 Thread Till Rohrmann
3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang  wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann  于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison  wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang  于2020年9月30日周三 下午3:57写道:
>>>
 Thanks till and tison for your comments.

 @Till Rohrmann 
 1. I am afraid we could not do this if we are going to use fabric8
 Kubernetes client SDK for the leader election. The official Kubernetes Java
 client[1] also could not support it. Unless we implement a new
 LeaderElector in Flink based on the very basic Kubernetes API. But it seems
 that we could gain too much from this.

 2. Yes, the implementation will be a little complicated if we want to
 completely eliminate the residual job graphs or checkpoints. Inspired by
 your suggestion, another different solution has come into my mind. We could
 use a same ConfigMap storing the JobManager leader, job graph,
 checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
 the HA meta storage. Then it will be easier to guarantee that only the
 leader could write the ConfigMap in a transactional operation. Since
 “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
 transactional operation.

 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
 However, we still have the chances that two JobManager are running and
 trying to get/delete a key in the same ConfigMap concurrently. Imagine that
 the kubelet(like NodeManager in YARN) is down, and then the JobManager
 could not be deleted. A new JobManager pod will be launched. We are just in
 the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
 benefit is we do not need to implement a leader election/retrieval service.

 @tison
 Actually, I do not think we will have such issue in the Kubernetes HA
 service. In the Kubernetes LeaderElector[2], we have the leader information
 stored on the annotation of leader ConfigMap. So it would not happen the
 old leader could wrongly override the leader information. Once a JobManager
 want to write his leader information to the ConfigMap, it will check
 whether it is the leader now. If not, anything will happen. Moreover, the
 Kubernetes Resource Version[3] ensures that no one else has snuck in and
 written a different update while the client was in the process of
 performing its update.


 [1].
 https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
 [2].
 https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java