Re: Dynamically allocating right-sized task resources

2019-08-05 Thread Yang Wang
Hi Chad,

Just as Xintong said, fine grained resource management has not been
introduced to flink. And i think it is the elegant solution for
your scenario. Task managers with different resource specification will be
allocated and started by Yarn/k8s resource manager according to your
operator resource request. So your resource-intensive tasks will be
deployed to the task manager with more resources.

>> If integrated with k8s natively, Flink’s resource manager will call
kubernetes API to allocate/release new pods and adjust the resource usage
on demand, and then support scale-up and scale down.

We already have an internal implementation of FLINK-9953
 and is trying to merge
it to flink. However, the k8s resource manager will do the same as Yarn,
allocate the task manager with same resource specification. Of course, it
could be powerful after the fine grained resource management has been
introduced to flink. You could specify the resource for every operator to
meet its performance.

BTW, FLINK-9953 does not contains the ability of auto scale. It just
allocates the resource according to the flink job and will not
automatically scale up/down based on the back logs or metrics. The auto
scale of flink cluster is a more general topic and should be
designed independently without resource management system(Yarn/k8s/mesos).

Xintong Song  于2019年8月5日周一 下午11:24写道:

> Hi Chad,
>
> If I understand correctly, the scenarios you talked about are running
> batch jobs, right?
>
> At the moment (Flink 1.8 and earlier), Flink does not differentiate
> different working load of tasks. It uses a slot-sharing approach[1] to
> balance workloads among workers. The general idea is to put tasks with
> different workload into same slot and spread tasks with similar workload to
> different ones, resulting in slots with similar total workload. The
> approach works fine for streaming jobs, where all the tasks are running at
> the same time. However, it might not work that well for batch jobs, were
> tasks are scheduled stage by stage.
>
> You can also refers to resource management strategy in the blink branch.
> Blink was the internal version Flink in Alibaba, which is open sourced
> early this year. It customizes task manager resources (on yarn) according
> to tasks' resource requirements. The community and Alibaba are currently in
> progress of working together to bring good features of Blink into Flink
> master. One of those is fine grained resource management, which could help
> resolve resource management and load balancing issues for both streaming
> and batch jobs.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sun, Aug 4, 2019 at 9:40 PM Chad Dombrova  wrote:
>
>> Hi all,
>> First time poster, so go easy on me :)
>>
>> What is Flink's story for accommodating task workloads with vastly
>> disparate resource requirements: e.g. some require very little CPU and RAM,
>> while others require quite a lot.
>>
>> Our current strategy is to bundle resource-intensive tasks and send them
>> to a different batch-execution framework.  For this we use AWS |
>> Thinkbox Deadline  [1].
>> Deadline's scheduler supports numerous strategies for paring work with a
>> right-sized worker -- criteria (numeric metadata like min/max RAM and CPU
>> requirements) and pools (basically named resource tags) -- as well as when
>> to schedule tasks -- priorities and limits (a check-in/check-out system for
>> finite resources, like a software license).  Are others using a similar
>> strategy, or are you provisioning your task managers for the worst case
>> scenario?
>>
>> Outsourcing to a separate batch framework for resource intensive tasks
>> complicates the design of our pipeline and bifurcates our resource pool, so
>> I'd rather use Flink for the whole process. I searched around and found two
>> Jira tickets which could form the foundations of a solution to this problem:
>>
>> - https://issues.apache.org/jira/browse/FLINK-9953:  Active Kubernetes
>> integration
>> - https://issues.apache.org/jira/browse/FLINK-10240: Pluggable
>> scheduling strategy for batch jobs
>>
>> Sadly, the latter seems to be stalled.
>>
>> I read the design doc
>> 
>>  [2]
>> for the active K8s integration, and this statement seemed crucial:
>>
>> > If integrated with k8s natively, Flink’s resource manager will call
>> kubernetes API to allocate/release new pods and adjust the resource usage
>> on demand, and then support scale-up and scale down.
>>
>> This is particularly powerful when your k8s cluster is itself backed by
>> auto-scaling of nodes (as with GKE autoscaler
>> 
>>  [3]),
>> but it's unclear from the doc *when and

Re: An ArrayIndexOutOfBoundsException after a few message with Flink 1.8.1

2019-08-05 Thread Yun Gao
Hi Nicolas:

 Are you using a custom partitioner? If so, you might need to check if the 
Partitioners#partition has returned a value that is greater than or equal to 
the parallelism of the downstream tasks. The expected return value should be in 
the interval [0, the parallelism of the downstream task).

Best,
Yun


--
From:Nicolas Lalevée 
Send Time:2019 Aug. 5 (Mon.) 22:58
To:user 
Subject:An ArrayIndexOutOfBoundsException after a few message with Flink 1.8.1

Hi,

I have got a weird error after a few messages. I have first seen this error on 
a deployed Flink cluster 1.7.1. Trying to figure it out, I am trying with a 
local Flink 1.8.1. I still get this ArrayIndexOutOfBoundsException. I don't 
have a precise scenario to reproduce it, but it is happening often.
Any idea what could be going wrong there ?

The full stack trace:

Exception in thread "main" java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at 
com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:61)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
 at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
 at 
com.mycompany.myproject.controljob.ControlTopology.run(ControlTopology.java:137)
 at 
com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:53)
Caused by: java.lang.RuntimeException: Index 2 out of bounds for length 2
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for 
length 2
 at 
org.apache.flink.runtime.io.netwo

Re: Memory constrains running Flink on Kubernetes

2019-08-05 Thread Yun Tang
You are correct, the default value of write buffer size is 64 MB [1]. However, 
the java doc for this value is not correct [2]. Already created a PR to fix 
this.

[1] 
https://github.com/facebook/rocksdb/blob/30edf1874c11762a6cacf4434112ce34d13100d3/include/rocksdb/options.h#L191
[2] 
https://github.com/facebook/rocksdb/blob/30edf1874c11762a6cacf4434112ce34d13100d3/java/src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java#L24

Best
Yun Tang

From: wvl 
Sent: Monday, August 5, 2019 17:55
To: Yu Li 
Cc: Yun Tang ; Yang Wang ; Xintong 
Song ; user 
Subject: Re: Memory constrains running Flink on Kubernetes

Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and the 
> default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XX file in the rocksdb 
directories in state:

[CFOptions "xx"]
  ttl=0
  report_bg_io_stats=false
  
compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944
  
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.00
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
  write_buffer_size=67108864
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=1
  memtable_prefix_bloom_size_ratio=0.00
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl mailto:lee...@gmail.com>> 
wrote:
Excellent. Thanks for all the answers so far.

So there was another issue I mentioned which we made some progress gaining 
insight into, namely our metaspace growth when faced with job restarts.

We can easily hit 1Gb metaspace usage within 15 minutes if we restart often.
We attempted to troubleshoot this issue by looking at all the classes in 
metaspace using `jcmd  GC.class_stats`.

Here we observed that after every job restart another entry is created for 
every class in our job. Where the old classes have InstBytes=0. So far so good, 
but moving to the Total column for these entries show that memory is still 
being used.
Also, adding up all entries in the Total column indeed corresponds to our 
metaspace usage. So far we could only conclude that our job classes - none of 
them - were being unloaded.

Then we stumbled upon this ticket. Now here are our results running the 
SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.

We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i 
org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l

First run:
  Class Count: 1
  Metaspace: 30695K

After 800~ runs:
  Class Count: 802
  Metaspace: 39406K

Interesting when we looked a bit later the class count slowly went down, 
slowly, step by step, where just to be sure we used `jcmd  GC.run` to 
force GC every 30s or so. If I had to guess it took about 20 minutes to go from 
800~ to 170~, with metaspace dropping to 35358K. In a sense we've seen this 
behavior, but with much much larger increases in metaspace usage over far fewer 
job restarts.

I've added this information to 
https://issues.apache.org/jira/browse/FLINK-11205.

That said, I'd really like to confirm the following:
- classes should usually only appear once in GC.class_stats output
- flink / the jvm has very slow cleanup of the metaspace
- something clearly is leaking during restarts

On Mon, Jul 29, 2019 at 9:52 AM Yu Li 
mailto:car...@gmail.com>> wrote:
For the memory usage of RocksDB, there's already some discussion in 
FLINK-7289

Re: Dynamically allocating right-sized task resources

2019-08-05 Thread Xintong Song
Hi Chad,

If I understand correctly, the scenarios you talked about are running batch
jobs, right?

At the moment (Flink 1.8 and earlier), Flink does not differentiate
different working load of tasks. It uses a slot-sharing approach[1] to
balance workloads among workers. The general idea is to put tasks with
different workload into same slot and spread tasks with similar workload to
different ones, resulting in slots with similar total workload. The
approach works fine for streaming jobs, where all the tasks are running at
the same time. However, it might not work that well for batch jobs, were
tasks are scheduled stage by stage.

You can also refers to resource management strategy in the blink branch.
Blink was the internal version Flink in Alibaba, which is open sourced
early this year. It customizes task manager resources (on yarn) according
to tasks' resource requirements. The community and Alibaba are currently in
progress of working together to bring good features of Blink into Flink
master. One of those is fine grained resource management, which could help
resolve resource management and load balancing issues for both streaming
and batch jobs.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources

Thank you~

Xintong Song



On Sun, Aug 4, 2019 at 9:40 PM Chad Dombrova  wrote:

> Hi all,
> First time poster, so go easy on me :)
>
> What is Flink's story for accommodating task workloads with vastly
> disparate resource requirements: e.g. some require very little CPU and RAM,
> while others require quite a lot.
>
> Our current strategy is to bundle resource-intensive tasks and send them
> to a different batch-execution framework.  For this we use AWS | Thinkbox
> Deadline  [1].  Deadline's
> scheduler supports numerous strategies for paring work with a right-sized
> worker -- criteria (numeric metadata like min/max RAM and CPU requirements)
> and pools (basically named resource tags) -- as well as when to schedule
> tasks -- priorities and limits (a check-in/check-out system for finite
> resources, like a software license).  Are others using a similar strategy,
> or are you provisioning your task managers for the worst case scenario?
>
> Outsourcing to a separate batch framework for resource intensive tasks
> complicates the design of our pipeline and bifurcates our resource pool, so
> I'd rather use Flink for the whole process. I searched around and found two
> Jira tickets which could form the foundations of a solution to this problem:
>
> - https://issues.apache.org/jira/browse/FLINK-9953:  Active Kubernetes
> integration
> - https://issues.apache.org/jira/browse/FLINK-10240: Pluggable scheduling
> strategy for batch jobs
>
> Sadly, the latter seems to be stalled.
>
> I read the design doc
> 
>  [2]
> for the active K8s integration, and this statement seemed crucial:
>
> > If integrated with k8s natively, Flink’s resource manager will call
> kubernetes API to allocate/release new pods and adjust the resource usage
> on demand, and then support scale-up and scale down.
>
> This is particularly powerful when your k8s cluster is itself backed by
> auto-scaling of nodes (as with GKE autoscaler
>  
> [3]),
> but it's unclear from the doc *when and how* resources are adjusted based
> on demand.  Will it simply scale up a shared pool of resource-identical
> task managers based on the size of the task backlog (or some other metric
> that determines "falling behind"), or does a task have a way of specifying
> and acquiring an execution resource that meets its specific performance
> profile?
>
> Based on the docs for the YARN resource manager [4], it acquires a pool of
> task managers with identical specs, so if this model is also used for the
> K8s resource manager, task managers would continue to be provisioned for
> the worst-case scenario (particularly in terms of RAM per process), which
> for us would mean they are drastically over-provisioned for common tasks.
>
> I'm new to Flink, so there's a good chance I've overlooked something
> important, so I'm looking forward to learning more!
>
> -thanks
> chad
>
>
> [1] https://www.awsthinkbox.com/deadline
> [2]
> https://docs.google.com/document/d/1Zmhui_29VASPcBOEqyMWnF3L6WEWZ4kedrCqya0WaAk/edit
> [3]
> https://cloud.google.com/kubernetes-engine/docs/concepts/cluster-autoscaler
> [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html
>
>
>
>
>


An ArrayIndexOutOfBoundsException after a few message with Flink 1.8.1

2019-08-05 Thread Nicolas Lalevée
Hi,

I have got a weird error after a few messages. I have first seen this error on 
a deployed Flink cluster 1.7.1. Trying to figure it out, I am trying with a 
local Flink 1.8.1. I still get this ArrayIndexOutOfBoundsException. I don't 
have a precise scenario to reproduce it, but it is happening often.
Any idea what could be going wrong there ?

The full stack trace:

Exception in thread "main" java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:61)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at 
com.mycompany.myproject.controljob.ControlTopology.run(ControlTopology.java:137)
at 
com.mycompany.myproject.controljob.ControlTopology.main(ControlTopology.java:53)
Caused by: java.lang.RuntimeException: Index 2 out of bounds for length 2
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for 
length 2
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:254)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:177)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWrite

Re: getting an exception

2019-08-05 Thread Gaël Renoux
Hi Avi and Victor,

I just opened this ticket on JIRA:
https://issues.apache.org/jira/browse/FLINK-13586 (I hadn't seen these
e-mails). Backward compatibility is broken between 1.8.0 and 1.8.1 if you
use Kafka connectors.

Can you upgrade your flink-connector-kafka dependency to 1.8.1 ? It won't
deploy on a 1.8.0 server any more, if that's a concern for you.

Gaël

On Mon, Aug 5, 2019 at 4:37 PM Wong Victor 
wrote:

> Hi Avi:
>
>
>
> It seems you are submitting your job with an older Flink version (< 1.8),
> please check your flink-dist version.
>
>
>
> Regards,
>
> Victor
>
>
>
> *From: *Avi Levi 
> *Date: *Monday, August 5, 2019 at 9:11 PM
> *To: *user 
> *Subject: *getting an exception
>
>
>
> Hi,
>
> I'm using Flink 1.8.1. our code is mostly using Scala.
>
> When I try to submit my job (on my local machine ) it crashes with the
> error below (BTW on the IDE it runs perfectly).
>
> Any assistance would be appreciated.
>
> Thanks
>
> Avi
>
> 2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Unhandled 
> exception.
>
> org.apache.flink.client.program.ProgramInvocationException: The program 
> caused an error:
>
> at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
>
> at 
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>
> at 
> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>
> at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
>
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494)
>
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:448)
>
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:383)
>
> at 
> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)
>
> at 
> com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)
>
> at 
> com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)
>
> at 
> com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)
>
> at 
> com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)
>
> at scala.Function0.apply$mcV$sp(Function0.scala:34)
>
> at scala.Function0.apply$mcV$sp$(Function0.scala:34)
>
> at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>
> at scala.App.$anonfun$main$1$adapted(App.scala:76)
>
> at scala.collection.immutable.List.foreach(List.scala:388)
>
> at scala.App.main(App.scala:76)
>
> at scala.App.main$(App.scala:74)
>
> at 
> com.bluevoyant.lookalike.analytic.StreamingJob$.main(StreamingJob.scala:14)
>
> at 
> com.bluevoyant.lookalike.analytic.StreamingJob.main(StreamingJob.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>
> at 
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>
> ... 7 common frames omitted
>
>
>


-- 
Gaël Renoux
Senior R&D Engineer, DataDome
M +33 6 76 89 16 52  <+33+6+76+89+16+52>
E gael.ren...@datadome.co  
W www.datadome.co





Re: getting an exception

2019-08-05 Thread Wong Victor
Hi Avi:

It seems you are submitting your job with an older Flink version (< 1.8), 
please check your flink-dist version.

Regards,
Victor

From: Avi Levi 
Date: Monday, August 5, 2019 at 9:11 PM
To: user 
Subject: getting an exception

Hi,
I'm using Flink 1.8.1. our code is mostly using Scala.
When I try to submit my job (on my local machine ) it crashes with the error 
below (BTW on the IDE it runs perfectly).
Any assistance would be appreciated.
Thanks
Avi


2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Unhandled 
exception.

org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error:

at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)

at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)

at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)

at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:448)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:383)

at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)

at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)

at 
com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)

at 
com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)

at 
com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)

at scala.Function0.apply$mcV$sp(Function0.scala:34)

at scala.Function0.apply$mcV$sp$(Function0.scala:34)

at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App.$anonfun$main$1$adapted(App.scala:76)

at scala.collection.immutable.List.foreach(List.scala:388)

at scala.App.main(App.scala:76)

at scala.App.main$(App.scala:74)

at 
com.bluevoyant.lookalike.analytic.StreamingJob$.main(StreamingJob.scala:14)

at 
com.bluevoyant.lookalike.analytic.StreamingJob.main(StreamingJob.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)

... 7 common frames omitted



StreamingFileSink not committing file to S3

2019-08-05 Thread Ravi Bhushan Ratnakar
Thanks for your quick response. I am using custom implementation of
BoundedOutOfOrderenessTimestampExtractor and also tweaked to return initial
watermark not a negative value.

One more observation that,  when the job's parallelism is around 120, then
it works well even with idle stream and Flink UI shows watermark. But when
I increase the parallelism above 180 then with idle stream it doesn't write
any file to S3. But the moment I remove idle stream then it works fine with
any number of parallelism.

I have also observed that when the parallelism is above 180, Flink UI never
shows watermark although everything is working fine without idle stream.

Regards,
Ravi

On Sun 4 Aug, 2019, 09:53 Rafi Aroch,  wrote:

> Hi Ravi,
>
> This sounds related an issue where the watermark is not advancing. This
> may happen when you have an idle source. An idle source would report a
> Long.MIN_VALUE, therefore the overall min watermark across all consumer
> subtask will never proceed.
>
> First, verify this is indeed the case by looking at the watermarks
> reported. You can try to assign a custom watermark emitter logic as seen
> here [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records
>
> Thanks,
> Rafi
>
>
> On Sat, Aug 3, 2019 at 8:23 PM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am designing a streaming pipeline using Flink 1.8.1, which consumes
>> messages from Kinesis and apply some business logic on per key basis using
>> KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming
>> messages around 7GB per minutes from multiple Kinesis streams. I am using
>> only one Kinesis Source which is configured with multiple streams.
>>
>> The pipeline processes data and writes output to s3 as expected but I am
>> experiencing a very weird issue when one of the stream is completely empty
>> then it doesn't flush any file to s3 however it is consuming data from rest
>> of the streams. When i remove only this empty stream and again submit the
>> job then everything works fine and it writes output to s3.
>>
>> Regards,
>> Ravi
>>
>


Re: [DataStream API] Best practice to handle custom session window - time gap based but handles event for ending session

2019-08-05 Thread Fabian Hueske
Hi Jungtaek,

I would recommend to implement the logic in a ProcessFunction and avoid
Flink's windowing API.
IMO, the windowing API is difficult to use, because there are many pieces
like WindowAssigner, Window, Trigger, Evictor, WindowFunction that are
orchestrated by Flink.
This makes it very hard to understand what exactly is going on and to
ensure that no state is leaked.

For example, I think your solution is not 100% correct, because a
MergingWindowAssigner lacks the ability to split a window.
In case of out-of-order events, you might have the situation that a LOG OUT
event for 12:00:00 arrives after a game event for 12:00:01 was assigned to
a window.
In that case, you'd need to split the window and the game event at 12:00:01
would need to go to the next session window.

As I said, I would use a ProcessFunction because it is a single function
and provides access to state and timers which is all you need.
The logic you would need to implement would be a bit more, but it would be
much easier to reason about how the data is processed.

Best,
Fabian


Am Mo., 5. Aug. 2019 um 05:18 Uhr schrieb Jungtaek Lim :

> Thanks Dongwon to provide feedback and share your approach!
>
> I'm not sure it could be possible (not an expert), but if we could reset
> intermediate result (aggregated) after processing "fire event", I guess it
> would work as expected, as window would still expand even after "session
> end", but it will provide same effect in point of "outputs". Nice approach!
> I'll play with this approach too.
>
> Thanks again,
> Jungtaek Lim (HeartSaVioR)
>
> On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim  wrote:
>
>> Hi Jungtaek,
>>
>> I've faced a similar problem in the past; we need to calculate an
>> aggregate upon receiving an end message from each user.
>>
>> While you're trying to solve problem by defining a custom window
>> assigner, I took a different approach to the problem by implementing a
>> custom trigger.
>>
>> You can see my implementation in the following link (but I'm not quite
>> sure if my implementation could solve your case):
>>
>> https://github.com/eastcirclek/flink-examples/blob/master/src/main/scala/com/github/eastcirclek/examples/customtrigger/trigger/TrackingEarlyResultEventTimeTrigger.scala
>>
>> Best,
>> Dongwon
>>
>> p.s. FYI, I presented the background of the problem and the general idea
>> last year at FlinkForward 2017 Berlin. Hope this presentation helps you:
>> https://www.youtube.com/watch?v=wPQWFy5JENw
>>
>>
>>
>> On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim  wrote:
>>
>>> Hi Flink users,
>>>
>>> I've been spending time to learn and play with Flink DataStream API, not
>>> an expert level but as a beginner. :)
>>>
>>> To play with custom window API, I just created a small example, session
>>> window based on fixed time gap, but indicate the type of event which may
>>> contain "end of session". I guess it's not unusual to see this kind of
>>> things (like manual logout and login) though I don't have concrete real use
>>> case.
>>>
>>> This is an implementation based on Flink DataStream API:
>>> https://gist.github.com/HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0
>>>
>>> Custom window works pretty well and I could leverage side output very
>>> easily. One thing leading the code a bit longer was new definition of
>>> TimeWindow (to deal with event type of "close session"). Even though I
>>> tried to leverage TimeWindow via inheritance, the code goes a bit verbose
>>> as I need to implement a new Serializer as well.
>>> (Actually it required to implement a new Trigger as well, but took
>>> workaround to leverage existing EventTimeTrigger.)
>>>
>>> Assuming this pattern is not unusual (it would be pretty OK if it's
>>> unusual), could someone point out some points to improve or simplify the
>>> code? That would be really nice if there's something I could contribute in
>>> this case.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> ps. This is an implementation based on Spark Structured Streaming (no
>>> custom window API, so had to put everything in state function of
>>> flatMapGroupsWithState):
>>> https://gist.github.com/HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29
>>>
>>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>


getting an exception

2019-08-05 Thread Avi Levi
Hi,
I'm using Flink 1.8.1. our code is mostly using Scala.
When I try to submit my job (on my local machine ) it crashes with the
error below (BTW on the IDE it runs perfectly).
Any assistance would be appreciated.
Thanks
Avi

2019-08-05 12:58:03.783 [Flink-DispatcherRestEndpoint-thread-3] ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  -
Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The
program caused an error:
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Lorg/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel;Z)V
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:494)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:448)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(FlinkKafkaProducer011.java:383)
at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer(KafkaImpl.scala:18)
at 
com.bluevoyant.commons.queueHandlers.KeyedKafkaProducerImpl.producer$(KafkaImpl.scala:18)
at 
com.bluevoyant.lookalike.analytic.queueHandlers.QueueHandlerImpl$.producer(QueueHandlerImpl.scala:13)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$.delayedEndpoint$com$bluevoyant$lookalike$analytic$StreamingJob$1(StreamingJob.scala:42)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$delayedInit$body.apply(StreamingJob.scala:14)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:388)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at 
com.bluevoyant.lookalike.analytic.StreamingJob$.main(StreamingJob.scala:14)
at 
com.bluevoyant.lookalike.analytic.StreamingJob.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 common frames omitted


Re: Memory constrains running Flink on Kubernetes

2019-08-05 Thread wvl
Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and
the default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XX file in the
rocksdb directories in state:

[CFOptions "xx"]
  ttl=0
  report_bg_io_stats=false

compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944

compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.00
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
 * write_buffer_size=67108864*
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=1
  memtable_prefix_bloom_size_ratio=0.00
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl  wrote:

> Excellent. Thanks for all the answers so far.
>
> So there was another issue I mentioned which we made some progress gaining
> insight into, namely our metaspace growth when faced with job restarts.
>
> We can easily hit 1Gb metaspace usage within 15 minutes if we restart
> often.
> We attempted to troubleshoot this issue by looking at all the classes in
> metaspace using `jcmd  GC.class_stats`.
>
> Here we observed that after every job restart another entry is created for
> every class in our job. Where the old classes have InstBytes=0. So far so
> good, but moving to the Total column for these entries show that memory is
> still being used.
> Also, adding up all entries in the Total column indeed corresponds to our
> metaspace usage. So far we could only conclude that our job classes - none
> of them - were being unloaded.
>
> Then we stumbled upon this ticket. Now here are our results running the
> SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.
>
> We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i
> org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l
>
> *First* run:
>   Class Count: 1
>   Metaspace: 30695K
>
> After *800*~ runs:
>   Class Count: 802
>   Metaspace: 39406K
>
>
> Interesting when we looked a bit later the class count *slowly* went
> down, slowly, step by step, where just to be sure we used `jcmd 
> GC.run` to force GC every 30s or so. If I had to guess it took about 20
> minutes to go from 800~ to 170~, with metaspace dropping to 35358K. In a
> sense we've seen this behavior, but with much much larger increases in
> metaspace usage over far fewer job restarts.
>
> I've added this information to
> https://issues.apache.org/jira/browse/FLINK-11205.
>
> That said, I'd really like to confirm the following:
> - classes should usually only appear once in GC.class_stats output
> - flink / the jvm has very slow cleanup of the metaspace
> - something clearly is leaking during restarts
>
> On Mon, Jul 29, 2019 at 9:52 AM Yu Li  wrote:
>
>> For the memory usage of RocksDB, there's already some discussion in
>> FLINK-7289  and a good
>> suggestion
>> 
>> from Mike to use the WriteBufferManager to limit the total memory usage,
>> FYI.
>>
>> We will drive to make the memory management of state backends more "hands
>> free" in latter release (probably in release 1.10) and please watch the
>> release plan and/or the weekly community update [1] threads.
>>
>> [1] https://s.apache.org/ix7iv
>>
>> Best Regards,
>> Yu
>>
>>
>> On Thu, 25 Jul 2019 at 15:12, Yun Tang  wrote

Re: some confuse for data exchange of parallel operator

2019-08-05 Thread Biao Liu
Hi Kylin,

> Can this map record all data? Or this map only record data from one
parallelism of upstream operator?
Neither of your guess is correct. It depends on the partitioner between the
map operator and upstream operator. You could find more in this document
[1].

1.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/programming-model.html

Thanks,
Biao /'bɪ.aʊ/



On Mon, Aug 5, 2019 at 4:59 PM tangkailin  wrote:

> Hello,
>
>I don’t know how parallel operator exchange data in flink. for
> example, I define a map in a operator with n (n > 1) parallelism for
> counting. Can this map record all data? Or this map only record data from
> one parallelism of upstream operator?
>
>
>
> Thanks,
>
> Kylin
>


Re: StreamingFileSink not committing file to S3

2019-08-05 Thread Theo Diefenthal
Hi Ravi, 

Please checkout [1] and [2]. That is related to Kafka but probably applies to 
Kinesis as well. If one stream is empty, there is no way for Flink to know 
about the watermark of that stream and Flink can't advance the watermark. 
Following downstream operators can thus not know if there will be any more data 
coming from the empty stream. (Think about a source which is just offline or 
has network issues for some time and once back online, will deliver all old 
data). This leads to Flink being unable to commit the final result up until 
there is some data coming in from the empty stream. 

Best regards 
Theo 

[1] [ 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 | 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
 ] 
[2] https://issues.apache.org/jira/browse/FLINK-5479 


Von: "Ravi Bhushan Ratnakar"  
An: "user"  
Gesendet: Samstag, 3. August 2019 19:23:25 
Betreff: StreamingFileSink not committing file to S3 

Hi All, 
I am designing a streaming pipeline using Flink 1.8.1, which consumes messages 
from Kinesis and apply some business logic on per key basis using 
KeyedProcessFunction and Checkpointing(HeapStateBackend). It is consuming 
messages around 7GB per minutes from multiple Kinesis streams. I am using only 
one Kinesis Source which is configured with multiple streams. 

The pipeline processes data and writes output to s3 as expected but I am 
experiencing a very weird issue when one of the stream is completely empty then 
it doesn't flush any file to s3 however it is consuming data from rest of the 
streams. When i remove only this empty stream and again submit the job then 
everything works fine and it writes output to s3. 

Regards, 
Ravi 


some confuse for data exchange of parallel operator

2019-08-05 Thread tangkailin
Hello,
I don’t know how parallel operator exchange data in flink. for example, 
I define a map in a operator with n (n > 1) parallelism for counting. Can this 
map record all data? Or this map only record data from one parallelism of 
upstream operator?

Thanks,
Kylin