回复:[DISCUSS] Proposal of external shuffle service

2018-08-29 Thread Zhijiang(wangzhijiang999)
Glad to receive your positive feedbacks Till! 

Actually our motivation is to support batch job well as you mentioned.

For output level, flink already has the Subpartition abstraction(writer), and 
currently there are PipelinedSubpartition(memory output) and 
SpillableSubpartition(one-sp-one-file output) implementations. We can extend 
this abstraction to realize other persistent outputs (e.g. sort-merge-file).

For transport level(shuffle service), the current SubpartitionView 
abstraction(reader) seems as the brige linked with the output level, then the 
view can understand and read the different output formats. The current 
NetworkEnvironment seems take the role of internal shuffle service in 
TaskManager and the transport server is realized by netty inside. This 
component can also be started in other external containers like NodeManager of 
yarn to take the role of external shuffle service. Further we can abstract to 
extend the shuffle service for transporting outputs by http or rdma instead of 
current netty.  This abstraction should provide the way for output registration 
in order to read the results correctly, similar with current SubpartitionView.

The above is still a rough idea. Next I plan to create a feature jira to cover 
the related changes if possible. It would be better if getting help from 
related committers to review the detail designs together.

Best,
Zhijiang


--
发件人:Till Rohrmann 
发送时间:2018年8月29日(星期三) 17:36
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Proposal of external shuffle service

Thanks for starting this design discussion Zhijiang!

I really like the idea to introduce a ShuffleService abstraction which allows 
to have different implementations depending on the actual use case. Especially 
for batch jobs I can clearly see the benefits of persisting the results 
somewhere else.

Do you already know which interfaces we need to extend and where to introduce 
new abstractions?

Cheers,
Till
On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999) 
 wrote:
Hi all!

 The shuffle service is responsible for transporting upstream produced data to 
the downstream side. In flink, the NettyServer is used for network transport 
service and this component is started in the TaskManager process. That means 
the TaskManager can support internal shuffle service which exists some concerns:
 1. If a task finishes, the ResultPartition of this task still retains 
registered in TaskManager, because the output buffers have to be transported by 
internal shuffle service in TaskManager. That means the TaskManager can not be 
released by ResourceManager until ResultPartition released. It may waste 
container resources and can not support well for dynamic resource scenarios.
 2. If we want to expand another shuffle service implementation, the current 
mechanism is not easy to handle, because the output level (result partition) 
and transport level (shuffle service) are not divided clearly and loss of 
abstraction to be extended.

 For above considerations, we propose the external shuffle service which can be 
deployed on any other external contaienrs, e.g. NodeManager container in yarn. 
Then the TaskManager can be released ASAP ifneeded when all the internal tasks 
finished. The persistent output files of these finished tasks can be served to 
transport by external shuffle service in the same machine.

 Further we can abstract both of the output level and transport level to 
support different implementations. e.g. We realized merging the data of all the 
subpartitions into limited persistent local files for disk improvements in some 
scenarios instead of one-subpartiton-one-file.

 I know it may be a big work for doing this, and I just point out some ideas, 
and wish getting any feedbacks from you!

 Best,
 Zhijiang



[ANNOUNCE] Weekly community update #35

2018-08-29 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #35. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.7 community roadmap

The Flink community started discussing which features will be included in
the next upcoming major release. Join the discussion to voice your opinion
and learn more about the next Flink version [1].

# Extend Flink to support external shuffle services

Zhijiang kicked off a discussion about extending Flink so that the shuffle
service responsible for exchanging data between different stages is
pluggable. This extension would allow to support different implementations
which can be optimized for different use cases. Join the discussion to
learn more about it [2].

# Flink Forward Berlin 2018

Next week Flink Forward Berlin will take place [3]. It is a great
opportunity for the community to meet each other in person, to learn about
the latest Flink features and use cases as well as to discuss Flink's
future development.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-1-7-Development-Priorities-td23933.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-of-external-shuffle-service-td23988.html
[3] https://berlin-2018.flink-forward.org/

Cheers,
Till


Re: [DISCUSS][TABLE] How to handle empty delete for UpsertSource

2018-08-29 Thread Fabian Hueske
Hi Hequn,

That's great! Yes, let's go with option 2 (from the source's point of view)
and later extend Join and Aggregation to discard empty deletes.
I agree that the filtering at the sink should be optional and configurable
via the query configuration.

Again, thanks for starting this discussion. I think it helped us all to get
a better understanding of how upserts work.

Best, Fabian

Am Mi., 29. Aug. 2018 um 17:29 Uhr schrieb Hequn Cheng :

> Hi Fabian
>
> Thanks for your update. The opinions on upsert streams are highly
> enlightening. I think now I am agree with you that we can choose option 2
> to solve the problem: Throw away empty deletes when source generate
> retractions, otherwise pass empty deletes down.
>
> As for the UpsertSink, I think we don't need to filter empty deletes in it,
> unless the external system should not receive empty deletes. It would be
> good to provide an optional parameter in the StreamQueryConfig to indicate
> whether filter empty deletes in upsert sinks(i.e., this is a job
> configuration). In this way, we can also solve the Filter
> problems(FLINK-9528). I will create another subtask about UpsertSink later.
>
> Thanks again for all the suggestions. It really helps me a lot.
> Best, Hequn.
>
>
> On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske  wrote:
>
> > Hi Hequn, hi Piotr,
> >
> > Thanks for pushing this discussion forward and sorry for not responding
> > earlier.
> >
> > After reading the thread, I agree that we do not *need* to (but may)
> > forward empty deletes.
> > As I pointed out before, I see empty deletes not as a semantic problem
> > that needs to be exactly solved but rather as a performance problem that
> > can be optimized (trade-off state costs vs. handling empty delete).
> >
> > Piotr raised a good point. An upsert delete message may consist only of
> > the key fields and the delete flag.
> > Having no data but the keys means we *cannot* handle them as regular
> > records during a join, filter, projection, or aggregation.
> >
> > Upsert streams are only present, if the stream is received by an operator
> > that is able to correctly interpret the upsert messages.
> > Right now, there is only the UpsertSink operator that can handle upsert
> > streams. Join and Aggregation might to support upsert inputs in the
> future
> > as well.
> > There are the following cases:
> >
> > 1. If there is no operator, that can handle an upsert stream, upserts
> need
> > to be converted into retractions.
> > Filtering out empty deletes while converting to retractions comes for
> free.
> > 2. If the receiving operator is a Join or Aggregation, it has all
> > necessary state to check whether the delete is empty or not.
> > In case of an empty delete, it is simply dropped.
> >
> > In both cases (retract stream conversion and stateful upsert operator) we
> > can filter empty deletes for free.
> >
> > The only case left are UpsertSinks. These do not have built-in state,
> > since it is maintained in an external system.
> > As I said before, empty deletes are not a semantic problem. We could
> > forward all empty deletes and the result would still be consistent.
> > However, I understand that empty deletes can cause severe a performance
> > issues.
> > We can address the performance issue with different measures such as
> > best-effort (approximate) filtering or exact state-backed filtering.
> >
> > I think in many cases we can handle empty deletes from upsert sources
> > without adding additional state.
> > As soon as the upsert messages are converted into retraction messages or
> > consumed by a join or aggregation, they can be filtered for free.
> > We only need to add state, if we have an upsert sink AND if that sink
> > wants to remove all empty deletes.
> >
> > There is one more thing that needs to be discussed. How upsert messages
> > are handled by Calc operators.
> > A Calc (projection and/or filter) that receives (and produces) an upsert
> > stream (because it is in front of a Join, Aggregation, UpsertSink) should
> > handle messages as follows:
> > - upsert message/flag=true: upsert messages are handled as regular
> > message. If the predicate evaluates to false, all but the key fields are
> > set to null and the message is forwarded as a delete message
> > - delete message/flag=false: delete messages are converted to the output
> > schema (padded with nulls) and forwarded.
> >
> > What do you think,
> > Fabian
> >
> >
> >
> >
> >
> >
> > Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng <
> > chenghe...@gmail.com>:
> >
> >> Hi Piotrek,
> >>
> >> Great to see your replies, and really thanks for all your suggestions.
> >> Inline is a good way, i will do it same as you :-)
> >>
> >> *> I’m assuming that we are talking about event time and that `(delete
> 1,
> >> a, 1)` happened before `(add 1, a, 2)`, right?*
> >>
> >> We are talking about processing time(FLINK-8577
> >> ). Event time is the
> >> next 

[jira] [Created] (FLINK-10256) Rework JobManagerFailsITCase and JobManagerTest into JobMasterITCase and JobMasterHAITCase

2018-08-29 Thread JIRA
陈梓立 created FLINK-10256:
---

 Summary: Rework JobManagerFailsITCase and JobManagerTest into 
JobMasterITCase and JobMasterHAITCase
 Key: FLINK-10256
 URL: https://issues.apache.org/jira/browse/FLINK-10256
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.7.0
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.7.0


I am planning to rework JobManagerFailsITCase and JobManagerTest into 
JobMasterITCase and JobMasterHAITCase. That is, reorganize the legacy tests, 
make them neat and cover cases explicitly. The PR would follow before this 
weekend.

While reworking, I'd like to add more jm failover test cases list below, for 
the further implement of jm failover with RECONCILING state. For "jm failover", 
I mean a real world failover(like low power or process exit), without calling 
Flink internal postStop logic or something like it.

1. Streaming task with jm failover.
2. Streaming task with jm failover concurrent to task fail.
3. Batch task with jm failover.
4. Batch task with jm failover concurrent to task fail.
5. Batch task with jm failover when some vertex has already been FINISHED.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS][TABLE] How to handle empty delete for UpsertSource

2018-08-29 Thread Hequn Cheng
Hi Fabian

Thanks for your update. The opinions on upsert streams are highly
enlightening. I think now I am agree with you that we can choose option 2
to solve the problem: Throw away empty deletes when source generate
retractions, otherwise pass empty deletes down.

As for the UpsertSink, I think we don't need to filter empty deletes in it,
unless the external system should not receive empty deletes. It would be
good to provide an optional parameter in the StreamQueryConfig to indicate
whether filter empty deletes in upsert sinks(i.e., this is a job
configuration). In this way, we can also solve the Filter
problems(FLINK-9528). I will create another subtask about UpsertSink later.

Thanks again for all the suggestions. It really helps me a lot.
Best, Hequn.


On Tue, Aug 28, 2018 at 9:47 PM Fabian Hueske  wrote:

> Hi Hequn, hi Piotr,
>
> Thanks for pushing this discussion forward and sorry for not responding
> earlier.
>
> After reading the thread, I agree that we do not *need* to (but may)
> forward empty deletes.
> As I pointed out before, I see empty deletes not as a semantic problem
> that needs to be exactly solved but rather as a performance problem that
> can be optimized (trade-off state costs vs. handling empty delete).
>
> Piotr raised a good point. An upsert delete message may consist only of
> the key fields and the delete flag.
> Having no data but the keys means we *cannot* handle them as regular
> records during a join, filter, projection, or aggregation.
>
> Upsert streams are only present, if the stream is received by an operator
> that is able to correctly interpret the upsert messages.
> Right now, there is only the UpsertSink operator that can handle upsert
> streams. Join and Aggregation might to support upsert inputs in the future
> as well.
> There are the following cases:
>
> 1. If there is no operator, that can handle an upsert stream, upserts need
> to be converted into retractions.
> Filtering out empty deletes while converting to retractions comes for free.
> 2. If the receiving operator is a Join or Aggregation, it has all
> necessary state to check whether the delete is empty or not.
> In case of an empty delete, it is simply dropped.
>
> In both cases (retract stream conversion and stateful upsert operator) we
> can filter empty deletes for free.
>
> The only case left are UpsertSinks. These do not have built-in state,
> since it is maintained in an external system.
> As I said before, empty deletes are not a semantic problem. We could
> forward all empty deletes and the result would still be consistent.
> However, I understand that empty deletes can cause severe a performance
> issues.
> We can address the performance issue with different measures such as
> best-effort (approximate) filtering or exact state-backed filtering.
>
> I think in many cases we can handle empty deletes from upsert sources
> without adding additional state.
> As soon as the upsert messages are converted into retraction messages or
> consumed by a join or aggregation, they can be filtered for free.
> We only need to add state, if we have an upsert sink AND if that sink
> wants to remove all empty deletes.
>
> There is one more thing that needs to be discussed. How upsert messages
> are handled by Calc operators.
> A Calc (projection and/or filter) that receives (and produces) an upsert
> stream (because it is in front of a Join, Aggregation, UpsertSink) should
> handle messages as follows:
> - upsert message/flag=true: upsert messages are handled as regular
> message. If the predicate evaluates to false, all but the key fields are
> set to null and the message is forwarded as a delete message
> - delete message/flag=false: delete messages are converted to the output
> schema (padded with nulls) and forwarded.
>
> What do you think,
> Fabian
>
>
>
>
>
>
> Am Fr., 24. Aug. 2018 um 07:33 Uhr schrieb Hequn Cheng <
> chenghe...@gmail.com>:
>
>> Hi Piotrek,
>>
>> Great to see your replies, and really thanks for all your suggestions.
>> Inline is a good way, i will do it same as you :-)
>>
>> *> I’m assuming that we are talking about event time and that `(delete 1,
>> a, 1)` happened before `(add 1, a, 2)`, right?*
>>
>> We are talking about processing time(FLINK-8577
>> ). Event time is the
>> next topic(FLINK-8578 ).
>> And `(delete 1, a, 1)` is an empty delete message comes before `(add 1, a,
>> 2)`. However, (add   2, a, 1) & (add   2, a, 2) may come before or after 
>> (delete
>> 1, a, 1) &(add 1, a, 2).
>>
>> *> Maybe we should say, that’s unspecified behaviour and Flink can either
>> forward empty deletes or filter them out depending on what’s more
>> efficient/easier. In the end this whole topic is very fishy. Look at it
>> from the Flink’s perspective. User is trying to remove the data, that was
>> never there in the first place. If it wasn’t there, why should we
>> retract/produce deletes for 

[jira] [Created] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10255:
-

 Summary: Standby Dispatcher locks submitted JobGraphs
 Key: FLINK-10255
 URL: https://issues.apache.org/jira/browse/FLINK-10255
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are added 
to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
{{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
state.

The problem is that we recover in the 
{{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called if 
don't have the leadership the newly added {{JobGraph}}. Recovering the 
{{JobGraph}} currently locks the {{JobGraph}}. In case that the {{Dispatcher}} 
is not the leader, then we won't start that job after its recovery. However, we 
also don't release the {{JobGraph}} leaving it locked.

There are two possible solutions to the problem. Either we check whether we are 
the leader before recovering jobs or we say that recovering jobs does not lock 
them. Only if we can submit the recovered job we lock them. The latter approach 
has the advantage that it follows a quite similar code path as an initial job 
submission. Moreover, jobs are currently also recovered at other places. In all 
these places we currently would need to release the {{JobGraphs}} if we cannot 
submit the recovered {{JobGraph}} (e.g. {{Dispatcher#grantLeadership}}).

An extension of the first solution could be to stop the 
{{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then we 
would have to make sure that no concurrent callback from the 
{{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10254) Fix check in stateBackend

2018-08-29 Thread aitozi (JIRA)
aitozi created FLINK-10254:
--

 Summary: Fix check in stateBackend
 Key: FLINK-10254
 URL: https://issues.apache.org/jira/browse/FLINK-10254
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: aitozi
Assignee: aitozi
 Fix For: 1.6.0


The checkNotNull is unnecessary of numberOfKeyGroups with a primitive type , we 
just have to make sure it is bigger than 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Proposal of external shuffle service

2018-08-29 Thread Till Rohrmann
Thanks for starting this design discussion Zhijiang!

I really like the idea to introduce a ShuffleService abstraction which
allows to have different implementations depending on the actual use case.
Especially for batch jobs I can clearly see the benefits of persisting the
results somewhere else.

Do you already know which interfaces we need to extend and where to
introduce new abstractions?

Cheers,
Till

On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
 wrote:

> Hi all!
>
> The shuffle service is responsible for transporting upstream produced data
> to the downstream side. In flink, the NettyServer is used for network
> transport service and this component is started in the TaskManager process.
> That means the TaskManager can support internal shuffle service which
> exists some concerns:
> 1. If a task finishes, the ResultPartition of this task still retains
> registered in TaskManager, because the output buffers have to be
> transported by internal shuffle service in TaskManager. That means the
> TaskManager can not be released by ResourceManager until ResultPartition
> released. It may waste container resources and can not support well for
> dynamic resource scenarios.
> 2. If we want to expand another shuffle service implementation, the
> current mechanism is not easy to handle, because the output level (result
> partition) and transport level (shuffle service) are not divided clearly
> and loss of abstraction to be extended.
>
> For above considerations, we propose the external shuffle service which
> can be deployed on any other external contaienrs, e.g. NodeManager
> container in yarn. Then the TaskManager can be released ASAP ifneeded when
> all the internal tasks finished. The persistent output files of these
> finished tasks can be served to transport by external shuffle service in
> the same machine.
>
> Further we can abstract both of the output level and transport level to
> support different implementations. e.g. We realized merging the data of all
> the subpartitions into limited persistent local files for disk improvements
> in some scenarios instead of one-subpartiton-one-file.
>
> I know it may be a big work for doing this, and I just point out some
> ideas, and wish getting any feedbacks from you!
>
> Best,
> Zhijiang


[jira] [Created] (FLINK-10253) Run MetricQueryService with lower priority

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10253:
-

 Summary: Run MetricQueryService with lower priority
 Key: FLINK-10253
 URL: https://issues.apache.org/jira/browse/FLINK-10253
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


We should run the {{MetricQueryService}} with a lower priority than the main 
Flink components. An idea would be to start the underlying threads with a lower 
priority.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10252) Handle oversized metric messges

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10252:
-

 Summary: Handle oversized metric messges
 Key: FLINK-10252
 URL: https://issues.apache.org/jira/browse/FLINK-10252
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
send messages of a smaller size then the current {{akka.framesize}}. We should 
check similarly to FLINK-10251 whether the payload exceeds the maximum 
framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10251:
-

 Summary: Handle oversized response messages in AkkaRpcActor
 Key: FLINK-10251
 URL: https://issues.apache.org/jira/browse/FLINK-10251
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
remote sender does not exceed the maximum framesize of the underlying 
{{ActorSystem}}. If this is the case we should fail fast instead. We can 
achieve this by serializing the response and sending the serialized byte array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10250) Flink doesn't support functions that extend RichAggregateFunction in a group window

2018-08-29 Thread PengYang (JIRA)
PengYang created FLINK-10250:


 Summary: Flink doesn't support functions that extend 
RichAggregateFunction in a group window
 Key: FLINK-10250
 URL: https://issues.apache.org/jira/browse/FLINK-10250
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.6.0, 1.5.3, 1.5.2
Reporter: PengYang


Flink doesn't support functions that extend RichAggregateFunction in a group 
window,in the RichAggregateFunction via state store data and fliter out the 
duplicate data .The Flink will support this feature in a future release?There 
is another question whether flink supports global state state query in the 
future?If flink supports global state queries, it can replace redis these 
external caches In certain aspects.Maybe my thoughts are a little naive, but I 
hope that the function of flink is getting better and better



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10249) Document hadoop/presto s3 file system configuration forwarding

2018-08-29 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-10249:
---

 Summary: Document hadoop/presto s3 file system configuration 
forwarding
 Key: FLINK-10249
 URL: https://issues.apache.org/jira/browse/FLINK-10249
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, filesystem-connector
Reporter: Andrey Zagrebin


Flink hadoop and presto s3 file system factories (S3FileSystemFactory) use 
HadoopConfigLoader which automatically converts and prefixes s3.* config 
options to configure underlying s3 clients. We can leave at least a hint about 
this behaviour for users who want to change config of these underlying s3 
clients, e.g. in docs/ops/deployment/aws.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10248) Add HBase Table Sink

2018-08-29 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10248:
---

 Summary: Add HBase Table Sink
 Key: FLINK-10248
 URL: https://issues.apache.org/jira/browse/FLINK-10248
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10247) Run MetricQueryService in separate thread pool

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10247:
-

 Summary: Run MetricQueryService in separate thread pool
 Key: FLINK-10247
 URL: https://issues.apache.org/jira/browse/FLINK-10247
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


In order to make the {{MetricQueryService}} run independently of the main Flink 
components, it should get its own dedicated thread pool assigned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10246) Harden and separate MetricQueryService

2018-08-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10246:
-

 Summary: Harden and separate MetricQueryService
 Key: FLINK-10246
 URL: https://issues.apache.org/jira/browse/FLINK-10246
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination, Metrics
Affects Versions: 1.6.0, 1.5.3, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.6.1, 1.7.0, 1.5.4


This is an umbrella issue to track the effort to harden Flink's 
{{MetricQueryService}} and to separate it from the rest of the system.

The idea is to setup the {{MetricQueryService}} and the metric system in 
general in such a way that it cannot interfere with or even bring the main 
Flink components down. Moreover, the metric system also should not degrade 
performance by simply using any free CPU cycles but not more. Ideally, the user 
does not see a difference between running Flink with metric query service 
turned on or off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10245) Add DataStream HBase Sink

2018-08-29 Thread Shimin Yang (JIRA)
Shimin Yang created FLINK-10245:
---

 Summary: Add DataStream HBase Sink
 Key: FLINK-10245
 URL: https://issues.apache.org/jira/browse/FLINK-10245
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Shimin Yang
Assignee: Shimin Yang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10244) Add option to measure latency as absolute value

2018-08-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10244:


 Summary: Add option to measure latency as absolute value
 Key: FLINK-10244
 URL: https://issues.apache.org/jira/browse/FLINK-10244
 Project: Flink
  Issue Type: Sub-task
  Components: Metrics
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


Latency metrics are currently organized as a histogram, which is significantly 
more expensive than other metric types.
Depending on the user's metric infrastructure a histogram in Flink is overkill; 
several metric backends explicitly discourage measuring histograms in the 
application itself. Instead only the raw latency data should be exposed.

We should add an option to expose latency not as a histogram but as a simple 
gauge that returns the latency based on the last received latency marker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10243) Add option to reduce latency metrics granularity

2018-08-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10243:


 Summary: Add option to reduce latency metrics granularity
 Key: FLINK-10243
 URL: https://issues.apache.org/jira/browse/FLINK-10243
 Project: Flink
  Issue Type: Sub-task
  Components: Configuration, Metrics
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


The latency is currently tracked separately from each operator subtask to each 
source subtask. The total number of latency metrics in the cluster is thus {{(# 
of sources) * (# of operators) * parallelism²}}, i.e. quadratic scaling.

If we'd ignore the source subtask the scaling would be a lot more manageable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10242) Disable latency metrics by default

2018-08-29 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10242:


 Summary: Disable latency metrics by default
 Key: FLINK-10242
 URL: https://issues.apache.org/jira/browse/FLINK-10242
 Project: Flink
  Issue Type: Sub-task
  Components: Configuration, Metrics
Affects Versions: 1.7.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.7.0


With the plethora of recent issue around latency metrics we should disable them 
by default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)