[jira] [Created] (FLINK-12735) Make shuffle environment implementation independent with IOManager

2019-06-04 Thread zhijiang (JIRA)
zhijiang created FLINK-12735:


 Summary: Make shuffle environment implementation independent with 
IOManager
 Key: FLINK-12735
 URL: https://issues.apache.org/jira/browse/FLINK-12735
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


The current creation of {{NetworkEnvironment}} is relying on {{IOManager}} from 
{{TaskManagerServices}}. In order not to rely on external specific components 
for implementing shuffle environment, and let the specific implementation 
creates internal components if required.

The current abstract {{IOManager}} has two roles, one is for file channel 
management based on temp directories configuration, and the other is providing 
abstract methods for reading/writing files. We could further extract the file 
channel management as a separate internal class which could be reused for all 
the required components, like current {{BoundedBlockingSubpartition}}. To do 
so, the shuffle environment should also creates its internal channel manager to 
break dependency with {{IOManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread Dian Fu
Hi Vino,

Thanks a lot for your reply.

> 1) When, Why and How to judge the memory is exhausted?

My point here is that the local aggregate operator can buffer the inputs in 
memory and send out the results AT ANY TIME. i.e. element count or the time 
interval reached a pre-configured value, the memory usage of buffered elements 
reached a configured valued (suppose we can estimate the object size 
efficiently), or even when checkpoint is triggered.

> 
> 2) If the local aggregate operator rarely needs to operate the state, what
> do you think about fault tolerance?

AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which can 
be used here to send out the results to the downstream when checkpoint is 
triggered. Then fault tolerance can work well. 

Even if there is no such a method available, we can still store the buffered 
elements or pre-aggregate results to state when checkpoint is triggered. The 
state access will be much less compared with window operator as only the 
elements not sent out when checkpoint occur have to be written to state. 
Suppose the checkpoint interval is 3 minutes and the trigger interval is 10 
seconds, then only about less than "10/180" elements will be written to state.


Thanks,
Dian


> 在 2019年6月5日,上午11:43,Biao Liu  写道:
> 
> Hi Vino,
> 
> +1 for this feature. It's useful for data skew. And it could also reduce
> shuffled datum.
> 
> I have some concerns about the API part. From my side, this feature should
> be more like an improvement. I'm afraid the proposal is an overkill about
> the API part. Many other systems support pre-aggregation as an optimization
> of global aggregation. The optimization might be used automatically or
> manually but with a simple API. The proposal introduces a series of
> flexible local aggregation APIs. They could be independent with global
> aggregation. It doesn't look like an improvement but introduces a lot of
> features. I'm not sure if there is a bigger picture later. As for now the
> API part looks a little heavy for me.
> 
> 
> vino yang  于2019年6月5日周三 上午10:38写道:
> 
>> Hi Litree,
>> 
>> From an implementation level, the localKeyBy API returns a general
>> KeyedStream, you can call all the APIs which KeyedStream provides, we did
>> not restrict its usage, although we can do this (for example returns a new
>> stream object named LocalKeyedStream).
>> 
>> However, to achieve the goal of local aggregation, it only makes sense to
>> call the window API.
>> 
>> Best,
>> Vino
>> 
>> litree  于2019年6月4日周二 下午10:41写道:
>> 
>>> Hi Vino,
>>> 
>>> 
>>> I have read your design,something I want to know is the usage of these
>> new
>>> APIs.It looks like when I use localByKey,i must then use a window
>> operator
>>> to return a datastream,and then use keyby and another window operator to
>>> get the final result?
>>> 
>>> 
>>> thanks,
>>> Litree
>>> 
>>> 
>>> On 06/04/2019 17:22, vino yang wrote:
>>> Hi Dian,
>>> 
>>> Thanks for your reply.
>>> 
>>> I know what you mean. However, if you think deeply, you will find your
>>> implementation need to provide an operator which looks like a window
>>> operator. You need to use state and receive aggregation function and
>>> specify the trigger time. It looks like a lightweight window operator.
>>> Right?
>>> 
>>> We try to reuse Flink provided functions and reduce complexity. IMO, It
>> is
>>> more user-friendly because users are familiar with the window API.
>>> 
>>> Best,
>>> Vino
>>> 
>>> 
>>> Dian Fu  于2019年6月4日周二 下午4:19写道:
>>> 
 Hi Vino,
 
 Thanks a lot for starting this discussion. +1 to this feature as I
>> think
 it will be very useful.
 
 Regarding to using window to buffer the input elements, personally I
>>> don't
 think it's a good solution for the following reasons:
 1) As we know that WindowOperator will store the accumulated results in
 states, this is not necessary for Local Aggregate operator.
 2) For WindowOperator, each input element will be accumulated to
>> states.
 This is also not necessary for Local Aggregate operator and storing the
 input elements in memory is enough.
 
 Thanks,
 Dian
 
> 在 2019年6月4日,上午10:03,vino yang  写道:
> 
> Hi Ken,
> 
> Thanks for your reply.
> 
> As I said before, we try to reuse Flink's state concept (fault
>>> tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider
>> cache.
> 
> In addition, if we use Flink's state, the OOM related issue is not a
>>> key
> problem we need to consider.
> 
> Best,
> Vino
> 
> Ken Krugler  于2019年6月4日周二 上午1:37写道:
> 
>> Hi all,
>> 
>> Cascading implemented this “map-side reduce” functionality with an
>> LLR
>> cache.
>> 
>> That worked well, as then the skewed keys would always be in the
>>> cache.
>> 
>> The API let you decide the size of the cache, in terms of number of
>> entries.
>> 
>> Having a memory limit would have be

Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread Biao Liu
Hi Vino,

+1 for this feature. It's useful for data skew. And it could also reduce
shuffled datum.

I have some concerns about the API part. From my side, this feature should
be more like an improvement. I'm afraid the proposal is an overkill about
the API part. Many other systems support pre-aggregation as an optimization
of global aggregation. The optimization might be used automatically or
manually but with a simple API. The proposal introduces a series of
flexible local aggregation APIs. They could be independent with global
aggregation. It doesn't look like an improvement but introduces a lot of
features. I'm not sure if there is a bigger picture later. As for now the
API part looks a little heavy for me.


vino yang  于2019年6月5日周三 上午10:38写道:

> Hi Litree,
>
> From an implementation level, the localKeyBy API returns a general
> KeyedStream, you can call all the APIs which KeyedStream provides, we did
> not restrict its usage, although we can do this (for example returns a new
> stream object named LocalKeyedStream).
>
> However, to achieve the goal of local aggregation, it only makes sense to
> call the window API.
>
> Best,
> Vino
>
> litree  于2019年6月4日周二 下午10:41写道:
>
> > Hi Vino,
> >
> >
> > I have read your design,something I want to know is the usage of these
> new
> > APIs.It looks like when I use localByKey,i must then use a window
> operator
> > to return a datastream,and then use keyby and another window operator to
> > get the final result?
> >
> >
> > thanks,
> > Litree
> >
> >
> > On 06/04/2019 17:22, vino yang wrote:
> > Hi Dian,
> >
> > Thanks for your reply.
> >
> > I know what you mean. However, if you think deeply, you will find your
> > implementation need to provide an operator which looks like a window
> > operator. You need to use state and receive aggregation function and
> > specify the trigger time. It looks like a lightweight window operator.
> > Right?
> >
> > We try to reuse Flink provided functions and reduce complexity. IMO, It
> is
> > more user-friendly because users are familiar with the window API.
> >
> > Best,
> > Vino
> >
> >
> > Dian Fu  于2019年6月4日周二 下午4:19写道:
> >
> > > Hi Vino,
> > >
> > > Thanks a lot for starting this discussion. +1 to this feature as I
> think
> > > it will be very useful.
> > >
> > > Regarding to using window to buffer the input elements, personally I
> > don't
> > > think it's a good solution for the following reasons:
> > > 1) As we know that WindowOperator will store the accumulated results in
> > > states, this is not necessary for Local Aggregate operator.
> > > 2) For WindowOperator, each input element will be accumulated to
> states.
> > > This is also not necessary for Local Aggregate operator and storing the
> > > input elements in memory is enough.
> > >
> > > Thanks,
> > > Dian
> > >
> > > > 在 2019年6月4日,上午10:03,vino yang  写道:
> > > >
> > > > Hi Ken,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > As I said before, we try to reuse Flink's state concept (fault
> > tolerance
> > > > and guarantee "Exactly-Once" semantics). So we did not consider
> cache.
> > > >
> > > > In addition, if we use Flink's state, the OOM related issue is not a
> > key
> > > > problem we need to consider.
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > Ken Krugler  于2019年6月4日周二 上午1:37写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Cascading implemented this “map-side reduce” functionality with an
> LLR
> > > >> cache.
> > > >>
> > > >> That worked well, as then the skewed keys would always be in the
> > cache.
> > > >>
> > > >> The API let you decide the size of the cache, in terms of number of
> > > >> entries.
> > > >>
> > > >> Having a memory limit would have been better for many of our use
> > cases,
> > > >> though FWIR there’s no good way to estimate in-memory size for
> > objects.
> > > >>
> > > >> — Ken
> > > >>
> > > >>> On Jun 3, 2019, at 2:03 AM, vino yang 
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> The localKeyBy API returns an instance of KeyedStream (we just
> added
> > an
> > > >>> inner flag to identify the local mode) which is Flink has provided
> > > >> before.
> > > >>> Users can call all the APIs(especially *window* APIs) which
> > KeyedStream
> > > >>> provided.
> > > >>>
> > > >>> So if users want to use local aggregation, they should call the
> > window
> > > >> API
> > > >>> to build a local window that means users should (or say "can")
> > specify
> > > >> the
> > > >>> window length and other information based on their needs.
> > > >>>
> > > >>> I think you described another idea different from us. We did not
> try
> > to
> > > >>> react after triggering some predefined threshold. We tend to give
> > users
> > > >> the
> > > >>> discretion to make decisions.
> > > >>>
> > > >>> Our design idea tends to reuse Flink provided concept and functions
> > > like
> > > >>> state and window (IMO, we do not need to worry about OOM and the
> > issues
> > > >> you
> > > >>> mentioned).
> > > >>>
> > > >>> Best,
> > > >>> V

Re: [DISCUSS] FLIP-33: Standardize connector metrics

2019-06-04 Thread Becket Qin
Hi Piotr,

Thanks for the explanation. Please see some clarifications below.

By time-based metric, I meant the portion of time spent on producing the
record to downstream. For example, a source connector can report that it's
spending 80% of time to emit record to downstream processing pipeline. In
another case, a sink connector may report that its spending 30% of time
producing the records to the external system.

This is in some sense equivalent to the buffer usage metric:
   - 80% of time spent on emitting records to downstream ---> downstream
node is bottleneck ---> output buffer is probably full.
   - 30% of time spent on emitting records to downstream ---> downstream
node is not bottleneck ---> output buffer is probably not full.

However, the time-based metric has a few advantages that the buffer usage
metric may not have.

1.  Buffer usage metric may not be applicable to all the connector
implementations, while reporting time-based metric are always doable.
Some source connectors may not have any input buffer, or they may use some
third party library that does not expose the input buffer at all.
Similarly, for sink connectors, the implementation may not have any output
buffer, or the third party library does not expose such buffer.

2. Although both type of metrics can detect bottleneck, time-based metrics
can be used to generate a more informed action to remove the bottleneck.
For example, when the downstream is bottleneck, the output buffer usage
metric is likely to be 100%, and the input buffer usage might be 0%. That
means we don't know what is the suitable parallelism to lift the
bottleneck. The time-based metric, on the other hand, would give useful
information, e.g. if 80% of time was spent on emitting records, we can
roughly increase the downstream node parallelism by 4 times.

Admittedly, the time-based metrics are more expensive than buffer usage. So
we may have to do some sampling to reduce the cost. But in general, using
time-based metrics seems worth adding.

That being said, I don't think buffer usage metric and time-based metrics
are mutually exclusive. We can probably have both. It is just that in
practice, features like auto-scaling might prefer time-based metrics for
the reason stated above.

> 1. Define the metrics that would allow us to manually detect bottlenecks.
As I wrote, we already have them in most of the places, except of
sources/sinks.
> 2. Use those metrics, to automatically detect bottlenecks. Currently we
are only automatically detecting back pressure and reporting it to the user
in web UI (is it exposed as a metric at all?). Detecting the root cause of
the back pressure (bottleneck) is one step further.
> 3. Use the knowledge about where exactly the bottleneck is located, to
try to do something with it.

As explained above, I think time-based metric also addresses item 1 and
item 2.

Any thoughts?

Thanks,

Jiangjie (Becket) Qin



On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski  wrote:

> Hi again :)
>
> >   - pending.bytes, Gauge
> >   - pending.messages, Gauge
>
>
> +1
>
> And true, instead of overloading one of the metric it is better when user
> can choose to provide only one of them.
>
> Re 2:
>
> > If I understand correctly, this metric along with the pending mesages /
> > bytes would answer the questions of:
>
> >  - Does the connector consume fast enough? Lagging behind + empty buffer
> =
> > cannot consume fast enough.
> >  - Does the connector emit fast enough? Lagging behind + full buffer =
> > cannot emit fast enough, i.e. the Flink pipeline is slow.
>
> Yes, exactly. This can also be used to support decisions like changing the
> parallelism of the sources and/or down stream operators.
>
> I’m not sure if I understand your proposal with time based measurements.
> Maybe I’m missing something, but I do not see how measuring time alone
> could answer the problem: where is the bottleneck. Time spent on the
> next/emit might be short or long (depending on how heavy to process the
> record is) and the source can still be bottlenecked/back pressured or not.
> Usually the easiest and the most reliable way how to detect bottlenecks is
> by checking usage of input & output buffers, since when input buffer is
> full while output buffer is empty, that’s the definition of a bottleneck.
> Also this is usually very easy and cheap to measure (it works effectively
> the same way as current’s Flink back pressure monitoring, but more cleanly,
> without probing thread’s stack traces).
>
> Also keep in mind that we are already using the buffer usage metrics for
> detecting the bottlenecks in Flink’s internal network exchanges (manual
> work). That’s the reason why I wanted to extend this to sources/sinks,
> since they are currently our blind spot.
>
> > One feature we are currently working on to scale Flink automatically
> relies
> > on some metrics answering the same question
>
> That would be very helpful feature. I think in order to achieve that we
> would need to:
> 1. De

[jira] [Created] (FLINK-12734) remove getVolcanoPlanner method from FlinkOptimizeContext and RelNodeBlock does not depend on TableEnvironment

2019-06-04 Thread godfrey he (JIRA)
godfrey he created FLINK-12734:
--

 Summary: remove getVolcanoPlanner method from FlinkOptimizeContext 
and RelNodeBlock does not depend on TableEnvironment
 Key: FLINK-12734
 URL: https://issues.apache.org/jira/browse/FLINK-12734
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


there are two improvements:
1. remove {{getVolcanoPlanner}} method from {{FlinkOptimizeContext}}. 
{{VolcanoPlanner}} limits that the planer a {{RelNode}} tree belongs to and the 
{{VolcanoPlanner}} used to optimize the {{RelNode}} tree should be same 
instance. (see: {{VolcanoPlanner}}#registerImpl)
so, we can use planner instance in {{RelNode}}'s cluster directly instead of 
{{getVolcanoPlanner}} from {{FlinkOptimizeContext}}.

2. {{RelNodeBlock}} does not depend on {{TableEnvironment}}
In {{RelNodeBlock}}, only {{TableConfig}} is used.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Tzu-Li (Gordon) Tai
On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang  wrote:

>  Hi Gordon & Seth, this looks like a very useful feature for analyze and
> manage states.
> I agree that using DataSet is probably the most practical choice right
> now. But in the longer adding the TableAPI support for this will be nice.
>

Agreed. Migrating this API in the future to the TableAPI is definitely
something we have considered.


> When analyzing the savepoint, I assume that the state backend restores the
> state first? This approach is generic and works for any state backend.


Yes, that is correct. The process of reading state in snapshots is
currently:
1) the metadata file is read when creating the input splits for the
InputFormat. Each input split is assigned operator state and key-group
state handles.
2) For each input split, a state backend is launched and is restored with
all state of the assigned state handles. Only partially some state is
transformed into DataSets (using the savepoint.read*State(...) methods).


> However, sometimes it may be more efficient to directly analyzing the
> files on DFS without copying. We can probably add interface to allow state
> backend optimize such behavior in the future.


That sounds like an interesting direction, though at the moment it may only
make sense for full savepoints / checkpoints.
One blocker for enabling this, is having the type information of state
available in the snapshot metadata file so that schema / type of state is
known before actually reading state.
Making state schema / type information available in the metadata file is
already a recurring discussion in this thread that would be useful for not
only this feature you mentioned, but also for features like SQL integration
in the future.
Therefore, this seems to be a reasonable next step when extending on top of
the initial scope of the API proposed in the FLIP.


> Also a quick question on the example in wiki: DataSet keyedState =
> operator.readKeyedState("uid", new ReaderFunction());Should
> operator.readKeyedState  be replaced with savepoint.readKeyedState here?
>

Correct, this is indeed a typo. I've corrected this in the FLIP.

Cheers,
Gordon


>
> Regards,Xiaowei
>
> On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> aljos...@apache.org> wrote:
>
>  +1 I think is is a very valuable new additional and we should try and not
> get stuck on trying to design the perfect solution for everything
>
> > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai 
> wrote:
> >
> > +1 to renaming it as State Processing API and adding it under the
> > flink-libraries module.
> >
> > I also think we can start with the development of the feature. From the
> > feedback so far, it seems like we're in a good spot to add in at least
> the
> > initial version of this API, hopefully making it ready for 1.9.0.
> >
> > Cheers,
> > Gordon
> >
> > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman  wrote:
> >
> >> It seems like a recurring piece of feedback was a different name. I’d
> like
> >> to propose moving the functionality to the libraries module and naming
> this
> >> the State Processing API.
> >>
> >> Seth
> >>
> >>> On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
> >>>
> >>> The SavepointOutputFormat only writes out the savepoint metadata file
> >> and should be mostly ignored.
> >>>
> >>> The actual state is written out by stream operators and tied into the
> >> flink runtime[1, 2, 3].
> >>>
> >>> This is the most important part and the piece that I don’t think can be
> >> reasonably extracted.
> >>>
> >>> Seth
> >>>
> >>> [1]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> >>>
> >>> [2]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> >>>
> >>> [3]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> >>>
>  On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
> 
>  Hi Seth,
> 
>  yes, that helped! :-)
> 
>  What I was looking for is essentially
> >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
> >> would be great if this could be written in a way, that would enable
> reusing
> >> it in different engine (as I mentioned - Apache Spark). There seem to be
> >> some issues though. It uses interface Savepoint, which uses several
> other
> >> objects and interfaces from Flink's runtime. Maybe some convenience API
> >> might help - Apache Beam, handles operator naming, so that definitely
> >> should be transitionable between systems, but I'm not sure, how to
> >> construct OperatorID from this name. Would you think, that it is
> possible
> 

Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread vino yang
Hi Litree,

>From an implementation level, the localKeyBy API returns a general
KeyedStream, you can call all the APIs which KeyedStream provides, we did
not restrict its usage, although we can do this (for example returns a new
stream object named LocalKeyedStream).

However, to achieve the goal of local aggregation, it only makes sense to
call the window API.

Best,
Vino

litree  于2019年6月4日周二 下午10:41写道:

> Hi Vino,
>
>
> I have read your design,something I want to know is the usage of these new
> APIs.It looks like when I use localByKey,i must then use a window operator
> to return a datastream,and then use keyby and another window operator to
> get the final result?
>
>
> thanks,
> Litree
>
>
> On 06/04/2019 17:22, vino yang wrote:
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu  于2019年6月4日周二 下午4:19写道:
>
> > Hi Vino,
> >
> > Thanks a lot for starting this discussion. +1 to this feature as I think
> > it will be very useful.
> >
> > Regarding to using window to buffer the input elements, personally I
> don't
> > think it's a good solution for the following reasons:
> > 1) As we know that WindowOperator will store the accumulated results in
> > states, this is not necessary for Local Aggregate operator.
> > 2) For WindowOperator, each input element will be accumulated to states.
> > This is also not necessary for Local Aggregate operator and storing the
> > input elements in memory is enough.
> >
> > Thanks,
> > Dian
> >
> > > 在 2019年6月4日,上午10:03,vino yang  写道:
> > >
> > > Hi Ken,
> > >
> > > Thanks for your reply.
> > >
> > > As I said before, we try to reuse Flink's state concept (fault
> tolerance
> > > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> > >
> > > In addition, if we use Flink's state, the OOM related issue is not a
> key
> > > problem we need to consider.
> > >
> > > Best,
> > > Vino
> > >
> > > Ken Krugler  于2019年6月4日周二 上午1:37写道:
> > >
> > >> Hi all,
> > >>
> > >> Cascading implemented this “map-side reduce” functionality with an LLR
> > >> cache.
> > >>
> > >> That worked well, as then the skewed keys would always be in the
> cache.
> > >>
> > >> The API let you decide the size of the cache, in terms of number of
> > >> entries.
> > >>
> > >> Having a memory limit would have been better for many of our use
> cases,
> > >> though FWIR there’s no good way to estimate in-memory size for
> objects.
> > >>
> > >> — Ken
> > >>
> > >>> On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
> > >>>
> > >>> Hi Piotr,
> > >>>
> > >>> The localKeyBy API returns an instance of KeyedStream (we just added
> an
> > >>> inner flag to identify the local mode) which is Flink has provided
> > >> before.
> > >>> Users can call all the APIs(especially *window* APIs) which
> KeyedStream
> > >>> provided.
> > >>>
> > >>> So if users want to use local aggregation, they should call the
> window
> > >> API
> > >>> to build a local window that means users should (or say "can")
> specify
> > >> the
> > >>> window length and other information based on their needs.
> > >>>
> > >>> I think you described another idea different from us. We did not try
> to
> > >>> react after triggering some predefined threshold. We tend to give
> users
> > >> the
> > >>> discretion to make decisions.
> > >>>
> > >>> Our design idea tends to reuse Flink provided concept and functions
> > like
> > >>> state and window (IMO, we do not need to worry about OOM and the
> issues
> > >> you
> > >>> mentioned).
> > >>>
> > >>> Best,
> > >>> Vino
> > >>>
> > >>> Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
> > >>>
> >  Hi,
> > 
> >  +1 for the idea from my side. I’ve even attempted to add similar
> > feature
> >  quite some time ago, but didn’t get enough traction [1].
> > 
> >  I’ve read through your document and I couldn’t find it mentioning
> >  anywhere, when the pre aggregated result should be emitted down the
> > >> stream?
> >  I think that’s one of the most crucial decision, since wrong
> decision
> > >> here
> >  can lead to decrease of performance or to an explosion of
> memory/state
> >  consumption (both with bounded and unbounded data streams). For
> > >> streaming
> >  it can also lead to an increased latency.
> > 
> >  Since this is also a decision that’s impossible to make
> automatically
> >  perfectly reliably, first and foremost I would expect this to be
> >  configurable via the API. With maybe some predefined triggers, like
> on
> >  watermark (for windowed operations), on checkpoint barrier (

Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread vino yang
Hi Dian,

The different opinion is fine for me, If there is a better solution or
there are obvious deficiencies in our design, we are very happy to accept
and improve it.

I agree with you that customized local aggregate operator is more scalable
in the way of the trigger mechanism. However, I have two questions about
your reply.

1) When, Why and How to judge the memory is exhausted?

IMO, the operator is in a high abstract level, when implementing we should
not care about the memory is exhausted.

2) If the local aggregate operator rarely needs to operate the state, what
do you think about fault tolerance?

We reuse Flink's state concept because we can get the benefit from the
fault tolerance. We need to guarantee correctness semantics.

Best,
Vino


Dian Fu  于2019年6月4日周二 下午10:31写道:

> Hi Vino,
>
> It may seem similar to window operator but there are also a few key
> differences. For example, the local aggregate operator can send out the
> results at any time and the window operator can only send out the results
> at the end of window (without early fire). This means that the local
> aggregate operator can send out the results not only when the trigger time
> is reached, but also when the memory is exhausted. This difference makes
> optimization available as it means that the local aggregate operator rarely
> need to operate the state.
>
> I admit that window operator can solve part of the problem (the data skew)
> and just wonder if we can do more. Using window operator at present seems
> OK for me as it can indeed solve part of the problems. We just need to
> think a little more in the design and make sure that the current solution
> is consistent with future optimizations.
>
> Thanks,
>
> Dian
>
> 在 2019年6月4日,下午5:22,vino yang  写道:
>
> Hi Dian,
>
> Thanks for your reply.
>
> I know what you mean. However, if you think deeply, you will find your
> implementation need to provide an operator which looks like a window
> operator. You need to use state and receive aggregation function and
> specify the trigger time. It looks like a lightweight window operator.
> Right?
>
> We try to reuse Flink provided functions and reduce complexity. IMO, It is
> more user-friendly because users are familiar with the window API.
>
> Best,
> Vino
>
>
> Dian Fu  于2019年6月4日周二 下午4:19写道:
>
> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> 在 2019年6月4日,上午10:03,vino yang  写道:
>
> Hi Ken,
>
> Thanks for your reply.
>
> As I said before, we try to reuse Flink's state concept (fault tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>
> In addition, if we use Flink's state, the OOM related issue is not a key
> problem we need to consider.
>
> Best,
> Vino
>
> Ken Krugler  于2019年6月4日周二 上午1:37写道:
>
> Hi all,
>
> Cascading implemented this “map-side reduce” functionality with an LLR
> cache.
>
> That worked well, as then the skewed keys would always be in the cache.
>
> The API let you decide the size of the cache, in terms of number of
> entries.
>
> Having a memory limit would have been better for many of our use cases,
> though FWIR there’s no good way to estimate in-memory size for objects.
>
> — Ken
>
> On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
>
> Hi Piotr,
>
> The localKeyBy API returns an instance of KeyedStream (we just added an
> inner flag to identify the local mode) which is Flink has provided
>
> before.
>
> Users can call all the APIs(especially *window* APIs) which KeyedStream
> provided.
>
> So if users want to use local aggregation, they should call the window
>
> API
>
> to build a local window that means users should (or say "can") specify
>
> the
>
> window length and other information based on their needs.
>
> I think you described another idea different from us. We did not try to
> react after triggering some predefined threshold. We tend to give users
>
> the
>
> discretion to make decisions.
>
> Our design idea tends to reuse Flink provided concept and functions
>
> like
>
> state and window (IMO, we do not need to worry about OOM and the issues
>
> you
>
> mentioned).
>
> Best,
> Vino
>
> Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
>
> Hi,
>
> +1 for the idea from my side. I’ve even attempted to add similar
>
> feature
>
> quite some time ago, but didn’t get enough traction [1].
>
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the
>
> stream?
>
> I think that’s

Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-06-04 Thread Shaoxuan Wang
Stavros,
They have the similar logic concept, but the implementation details are
quite different. It is hard to migrate the interface with different
implementations. The built-in algorithms are useful legacy that we will
consider migrate to the new API (but still with different implementations).
BTW, the new API has already been merged via FLINK-12473.

Thanks,
Shaoxuan



On Mon, Jun 3, 2019 at 6:08 PM Stavros Kontopoulos 
wrote:

> Hi,
>
> Some portion of the code could be migrated to the new Table API no?
> I am saying that because the new API design is based on scikit-learn and
> the old one was also inspired by it.
>
> Best,
> Stavros
> On Wed, May 22, 2019 at 1:24 PM Shaoxuan Wang  wrote:
>
> > Another consensus (from the offline discussion) is that we will
> > delete/deprecate flink-libraries/flink-ml. I have started a survey and
> > discussion [1] in dev/user-ml to collect the feedback. Depending on the
> > replies, we will decide if we shall delete it in Flink1.9 or
> > deprecate&delete in the next release after 1.9.
> >
> > [1]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Tue, May 21, 2019 at 9:22 PM Gen Luo  wrote:
> >
> > > Yes, this is our conclusion. I'd like to add only one point that
> > > registering user defined aggregator is also needed which is currently
> > > provided by 'bridge' and finally will be merged into Table API. It's
> same
> > > with collect().
> > >
> > > I will add a TableEnvironment argument in Estimator.fit() and
> > > Transformer.transform() to get rid of the dependency on
> > > flink-table-planner. This will be committed soon.
> > >
> > > Aljoscha Krettek  于2019年5月21日周二 下午7:31写道:
> > >
> > > > We discussed this in private and came to the conclusion that we
> should
> > > > (for now) have the dependency on flink-table-api-xxx-bridge because
> we
> > > need
> > > > access to the collect() method, which is not yet available in the
> Table
> > > > API. Once that is available the code can be refactored but for now we
> > > want
> > > > to unblock work on this new module.
> > > >
> > > > We also agreed that we don’t need a direct dependency on
> > > > flink-table-planner.
> > > >
> > > > I hope I summarised our discussion correctly.
> > > >
> > > > > On 17. May 2019, at 12:20, Gen Luo  wrote:
> > > > >
> > > > > Thanks for your reply.
> > > > >
> > > > > For the first question, it's not strictly necessary. But I perfer
> not
> > > to
> > > > > have a TableEnvironment argument in Estimator.fit() or
> > > > > Transformer.transform(), which is not part of machine learning
> > concept,
> > > > and
> > > > > may make our API not as clean and pretty as other systems do. I
> would
> > > > like
> > > > > another way other than introducing flink-table-planner to do this.
> If
> > > > it's
> > > > > impossible or severely opposed, I may make the concession to add
> the
> > > > > argument.
> > > > >
> > > > > Other than that, "flink-table-api-xxx-bridge"s are still needed. A
> > vary
> > > > > common case is that an algorithm needs to guarantee that it's
> running
> > > > under
> > > > > a BatchTableEnvironment, which makes it possible to collect result
> > each
> > > > > iteration. A typical algorithm like this is ALS. By flink1.8, this
> > can
> > > be
> > > > > only achieved by converting Table to DataSet than call
> > > DataSet.collect(),
> > > > > which is available in flink-table-api-xxx-bridge. Besides,
> > registering
> > > > > UDAGG is also depending on it.
> > > > >
> > > > > In conclusion, '"planner" can be removed from dependencies but
> > > > introducing
> > > > > "bridge"s are inevitable. Whether and how to acquire
> TableEnvironment
> > > > from
> > > > > a Table can be discussed.
> > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-12733) Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint

2019-06-04 Thread Congxian Qiu(klion26) (JIRA)
Congxian Qiu(klion26) created FLINK-12733:
-

 Summary: Expose Rest API for TERMINATE/SUSPEND Job with Checkpoint
 Key: FLINK-12733
 URL: https://issues.apache.org/jira/browse/FLINK-12733
 Project: Flink
  Issue Type: Sub-task
Reporter: Congxian Qiu(klion26)
Assignee: Congxian Qiu(klion26)






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Xiaowei Jiang
 Hi Gordon & Seth, this looks like a very useful feature for analyze and manage 
states. 
I agree that using DataSet is probably the most practical choice right now. But 
in the longer adding the TableAPI support for this will be nice.
When analyzing the savepoint, I assume that the state backend restores the 
state first? This approach is generic and works for any state backend. However, 
sometimes it may be more efficient to directly analyzing the files on DFS 
without copying. We can probably add interface to allow state backend optimize 
such behavior in the future.
Also a quick question on the example in wiki: DataSet keyedState = 
operator.readKeyedState("uid", new ReaderFunction());Should 
operator.readKeyedState  be replaced with savepoint.readKeyedState here?

Regards,Xiaowei

On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek 
 wrote:  
 
 +1 I think is is a very valuable new additional and we should try and not get 
stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai  wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman  wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
 On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
 
 Hi Seth,
 
 yes, that helped! :-)
 
 What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
 
 I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
 
 All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
 
 Many thanks for comments,
 
 Jan
 
> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> @Jan Gotcha,
> 
> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
> 
> 1) partitioned using key groups
> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
> 

[jira] [Created] (FLINK-12732) Add savepoint reader for consuming partitioned operator state

2019-06-04 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-12732:


 Summary: Add savepoint reader for consuming partitioned operator 
state
 Key: FLINK-12732
 URL: https://issues.apache.org/jira/browse/FLINK-12732
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Runtime / State Backends
Reporter: Seth Wiesman
Assignee: Seth Wiesman






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread litree
Hi Vino,


I have read your design,something I want to know is the usage of these new 
APIs.It looks like when I use localByKey,i must then use a window operator to 
return a datastream,and then use keyby and another window operator to get the 
final result?


thanks,
Litree


On 06/04/2019 17:22, vino yang wrote:
Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu  于2019年6月4日周二 下午4:19写道:

> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> > 在 2019年6月4日,上午10:03,vino yang  写道:
> >
> > Hi Ken,
> >
> > Thanks for your reply.
> >
> > As I said before, we try to reuse Flink's state concept (fault tolerance
> > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> >
> > In addition, if we use Flink's state, the OOM related issue is not a key
> > problem we need to consider.
> >
> > Best,
> > Vino
> >
> > Ken Krugler  于2019年6月4日周二 上午1:37写道:
> >
> >> Hi all,
> >>
> >> Cascading implemented this “map-side reduce” functionality with an LLR
> >> cache.
> >>
> >> That worked well, as then the skewed keys would always be in the cache.
> >>
> >> The API let you decide the size of the cache, in terms of number of
> >> entries.
> >>
> >> Having a memory limit would have been better for many of our use cases,
> >> though FWIR there’s no good way to estimate in-memory size for objects.
> >>
> >> — Ken
> >>
> >>> On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> The localKeyBy API returns an instance of KeyedStream (we just added an
> >>> inner flag to identify the local mode) which is Flink has provided
> >> before.
> >>> Users can call all the APIs(especially *window* APIs) which KeyedStream
> >>> provided.
> >>>
> >>> So if users want to use local aggregation, they should call the window
> >> API
> >>> to build a local window that means users should (or say "can") specify
> >> the
> >>> window length and other information based on their needs.
> >>>
> >>> I think you described another idea different from us. We did not try to
> >>> react after triggering some predefined threshold. We tend to give users
> >> the
> >>> discretion to make decisions.
> >>>
> >>> Our design idea tends to reuse Flink provided concept and functions
> like
> >>> state and window (IMO, we do not need to worry about OOM and the issues
> >> you
> >>> mentioned).
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
> >>>
>  Hi,
> 
>  +1 for the idea from my side. I’ve even attempted to add similar
> feature
>  quite some time ago, but didn’t get enough traction [1].
> 
>  I’ve read through your document and I couldn’t find it mentioning
>  anywhere, when the pre aggregated result should be emitted down the
> >> stream?
>  I think that’s one of the most crucial decision, since wrong decision
> >> here
>  can lead to decrease of performance or to an explosion of memory/state
>  consumption (both with bounded and unbounded data streams). For
> >> streaming
>  it can also lead to an increased latency.
> 
>  Since this is also a decision that’s impossible to make automatically
>  perfectly reliably, first and foremost I would expect this to be
>  configurable via the API. With maybe some predefined triggers, like on
>  watermark (for windowed operations), on checkpoint barrier (to
> decrease
>  state size?), on element count, maybe memory usage (much easier to
> >> estimate
>  with a known/predefined types, like in SQL)… and with some option to
>  implement custom trigger.
> 
>  Also what would work the best would be to have a some form of memory
>  consumption priority. For example if we are running out of memory for
>  HashJoin/Final aggregation, instead of spilling to disk or crashing
> the
> >> job
>  with OOM it would be probably better to prune/dump the pre/local
>  aggregation state. But that’s another story.
> 
>  [1] https://github.com/apache/flink/pull/4626 <
>  https://github.com/apache/flink/pull/462

Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread Dian Fu
Hi Vino,

It may seem similar to window operator but there are also a few key
differences. For example, the local aggregate operator can send out the
results at any time and the window operator can only send out the results
at the end of window (without early fire). This means that the local
aggregate operator can send out the results not only when the trigger time
is reached, but also when the memory is exhausted. This difference makes
optimization available as it means that the local aggregate operator rarely
need to operate the state.

I admit that window operator can solve part of the problem (the data skew)
and just wonder if we can do more. Using window operator at present seems
OK for me as it can indeed solve part of the problems. We just need to
think a little more in the design and make sure that the current solution
is consistent with future optimizations.

Thanks,

Dian

在 2019年6月4日,下午5:22,vino yang  写道:

Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu  于2019年6月4日周二 下午4:19写道:

Hi Vino,

Thanks a lot for starting this discussion. +1 to this feature as I think
it will be very useful.

Regarding to using window to buffer the input elements, personally I don't
think it's a good solution for the following reasons:
1) As we know that WindowOperator will store the accumulated results in
states, this is not necessary for Local Aggregate operator.
2) For WindowOperator, each input element will be accumulated to states.
This is also not necessary for Local Aggregate operator and storing the
input elements in memory is enough.

Thanks,
Dian

在 2019年6月4日,上午10:03,vino yang  写道:

Hi Ken,

Thanks for your reply.

As I said before, we try to reuse Flink's state concept (fault tolerance
and guarantee "Exactly-Once" semantics). So we did not consider cache.

In addition, if we use Flink's state, the OOM related issue is not a key
problem we need to consider.

Best,
Vino

Ken Krugler  于2019年6月4日周二 上午1:37写道:

Hi all,

Cascading implemented this “map-side reduce” functionality with an LLR
cache.

That worked well, as then the skewed keys would always be in the cache.

The API let you decide the size of the cache, in terms of number of
entries.

Having a memory limit would have been better for many of our use cases,
though FWIR there’s no good way to estimate in-memory size for objects.

— Ken

On Jun 3, 2019, at 2:03 AM, vino yang  wrote:

Hi Piotr,

The localKeyBy API returns an instance of KeyedStream (we just added an
inner flag to identify the local mode) which is Flink has provided

before.

Users can call all the APIs(especially *window* APIs) which KeyedStream
provided.

So if users want to use local aggregation, they should call the window

API

to build a local window that means users should (or say "can") specify

the

window length and other information based on their needs.

I think you described another idea different from us. We did not try to
react after triggering some predefined threshold. We tend to give users

the

discretion to make decisions.

Our design idea tends to reuse Flink provided concept and functions

like

state and window (IMO, we do not need to worry about OOM and the issues

you

mentioned).

Best,
Vino

Piotr Nowojski  于2019年6月3日周一 下午4:30写道:

Hi,

+1 for the idea from my side. I’ve even attempted to add similar

feature

quite some time ago, but didn’t get enough traction [1].

I’ve read through your document and I couldn’t find it mentioning
anywhere, when the pre aggregated result should be emitted down the

stream?

I think that’s one of the most crucial decision, since wrong decision

here

can lead to decrease of performance or to an explosion of memory/state
consumption (both with bounded and unbounded data streams). For

streaming

it can also lead to an increased latency.

Since this is also a decision that’s impossible to make automatically
perfectly reliably, first and foremost I would expect this to be
configurable via the API. With maybe some predefined triggers, like on
watermark (for windowed operations), on checkpoint barrier (to

decrease

state size?), on element count, maybe memory usage (much easier to

estimate

with a known/predefined types, like in SQL)… and with some option to
implement custom trigger.

Also what would work the best would be to have a some form of memory
consumption priority. For example if we are running out of memory for
HashJoin/Final aggregation, instead of spilling to disk or crashing

the

job

with OOM it would be probably better to prune/dump the pre/local
aggregation state. But that’s another story.

[

Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Aljoscha Krettek
+1 I think is is a very valuable new additional and we should try and not get 
stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai  wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman  wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
 On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
 
 Hi Seth,
 
 yes, that helped! :-)
 
 What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
 
 I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
 
 All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
 
 Many thanks for comments,
 
 Jan
 
> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> @Jan Gotcha,
> 
> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
> 
> 1) partitioned using key groups
> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
> 3) those metadata handles are aggregated down to a single savepoint
>> handle
> 4) that handle is written out as a final metadata file
> 
> What’s important here is that the api actually depends on the data
>> flow collection and state is written out as a side effect of taking a
>> savepoint. The FLIP describes a lose coupling to the dataset api for
>> eventual migration to BoundedStream, that is true. However, the api does
>> require knowing what concrete data flow is being used to perform these
>> re-partitionings  and post aggregations.
> 
> I’m linking to my prototype implementation, particularly what actually
>> happens when you call write and run the transformations. Let me know if
>> that helps clarify.
> 
> Seth
> 
> 
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepo

[jira] [Created] (FLINK-12731) Load shuffle service implementations from plugin manager

2019-06-04 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-12731:
---

 Summary: Load shuffle service implementations from plugin manager
 Key: FLINK-12731
 URL: https://issues.apache.org/jira/browse/FLINK-12731
 Project: Flink
  Issue Type: Sub-task
Reporter: Andrey Zagrebin


The simple way to load shuffle service is to do it from class path with the 
default class loader. Additionally, shuffle service implementations can be 
loaded as plugins with their own class loaders using PluginManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12730) Combine BitSet implementations in flink-runtime

2019-06-04 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12730:


 Summary: Combine BitSet implementations in flink-runtime
 Key: FLINK-12730
 URL: https://issues.apache.org/jira/browse/FLINK-12730
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Liya Fan
Assignee: Liya Fan


There are two implementations for BitSet in flink-runtime ocmponent: one is 
org.apache.flink.runtime.operators.util.BloomFilter#BitSet, while the other is 
org.apache.flink.runtime.operators.util.BitSet

The two classes are quite similar in their API and implementations. The only 
difference is that, the former is based based on long operation while the 
latter is based on byte operation. This has the following consequence:
 # The byte based BitSet has better performance for get/set operations.
 # The long based BitSet has better performance for the clear operation.

We combine the two implementations and make the best of both worlds.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12729) Add savepoint reader for consuming non-partitioned operator state

2019-06-04 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-12729:


 Summary: Add savepoint reader for consuming non-partitioned 
operator state
 Key: FLINK-12729
 URL: https://issues.apache.org/jira/browse/FLINK-12729
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream, Runtime / State Backends
Reporter: Seth Wiesman
Assignee: Seth Wiesman






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: What does flink session mean ?

2019-06-04 Thread Till Rohrmann
Yes, interactive programming solves the problem by storing the meta
information on the client whereas in the past we thought whether to keep
the information on the JM. But this would then not allow to share results
between different clusters. Thus, the interactive programming approach is a
bit more general, I think.

Cheers,
Till

On Tue, Jun 4, 2019 at 11:13 AM Jeff Zhang  wrote:

> Thanks for the reply, @Till Rohrmann .  Regarding
> reuse computed results. I think JM keep all the metadata of intermediate
> data, and interactive programming is also trying to reuse computed results.
> It looks like it may not be necessary to introduce the session concept as
> long as we can achieve reusing computed results. Let me if I understand it
> correctly.
>
>
>
> Till Rohrmann  于2019年6月4日周二 下午4:03写道:
>
>> Hi Jeff,
>>
>> the session functionality which you find in Flink's client are the
>> remnants of an uncompleted feature which was abandoned. The idea was that
>> one could submit multiple parts of a job to the same cluster where these
>> parts are added to the same ExecutionGraph. That way we wanted to allow to
>> reuse computed results when using a notebook for ad-hoc queries, for
>> example. But as I said, this feature has never been completed.
>>
>> Cheers,
>> Till
>>
>> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>>
>>>
>>> Hi Folks,
>>>
>>>
>>> When I read the flink client api code, the concept of session is a
>>> little vague and unclear to me. It looks like the session concept is only
>>> applied in batch mode (I only see it in ExecutionEnvironment but not in
>>> StreamExecutionEnvironment). But for local mode
>>> (LocalExecutionEnvironment), starting one new session is starting one new
>>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>>> new session is just starting one new ClusterClient instead of one new
>>> cluster. So I am confused what does flink session really mean. Could anyone
>>> help me understand this ? Thanks.
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Tzu-Li (Gordon) Tai
+1 to renaming it as State Processing API and adding it under the
flink-libraries module.

I also think we can start with the development of the feature. From the
feedback so far, it seems like we're in a good spot to add in at least the
initial version of this API, hopefully making it ready for 1.9.0.

Cheers,
Gordon

On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman  wrote:

> It seems like a recurring piece of feedback was a different name. I’d like
> to propose moving the functionality to the libraries module and naming this
> the State Processing API.
>
> Seth
>
> > On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
> >
> > The SavepointOutputFormat only writes out the savepoint metadata file
> and should be mostly ignored.
> >
> > The actual state is written out by stream operators and tied into the
> flink runtime[1, 2, 3].
> >
> > This is the most important part and the piece that I don’t think can be
> reasonably extracted.
> >
> > Seth
> >
> > [1]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> >
> > [2]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> >
> > [3]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> >
> >> On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
> >>
> >> Hi Seth,
> >>
> >> yes, that helped! :-)
> >>
> >> What I was looking for is essentially
> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
> would be great if this could be written in a way, that would enable reusing
> it in different engine (as I mentioned - Apache Spark). There seem to be
> some issues though. It uses interface Savepoint, which uses several other
> objects and interfaces from Flink's runtime. Maybe some convenience API
> might help - Apache Beam, handles operator naming, so that definitely
> should be transitionable between systems, but I'm not sure, how to
> construct OperatorID from this name. Would you think, that it is possible
> to come up with something that could be used in this portable way?
> >>
> >> I understand, there are some more conditions, that need to be satisfied
> (grouping, aggregating, ...), which would of course have to be handled by
> the target system. But Apache Beam can help leverage that. My idea would
> be, that there can be runner specified PTransform, that takes PCollection
> of some tuples of `(operator name, key, state name, value1), (operator
> name, key, state name, value2)`, and Runner's responsibility would be to
> group/aggregate this so that it can be written by runner's provided writer
> (output format).
> >>
> >> All of this would need a lot more design, these are just ideas of "what
> could be possible", I was just wondering if this FLIP can make some first
> steps towards this.
> >>
> >> Many thanks for comments,
> >>
> >> Jan
> >>
> >>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> >>> @Jan Gotcha,
> >>>
> >>> So in reusing components it explicitly is not a writer. This is not a
> savepoint output format in the way we have a parquet output format. The
> reason for the Transform api is to hide the underlying details, it does not
> simply append a output writer to the end of a dataset. This gets into the
> implementation details but at a high level, the dataset is:
> >>>
> >>> 1) partitioned using key groups
> >>> 2) data is run through a standard stream operator that takes a
> snapshot of its state after processing all records and outputs metadata
> handles for each subtask
> >>> 3) those metadata handles are aggregated down to a single savepoint
> handle
> >>> 4) that handle is written out as a final metadata file
> >>>
> >>> What’s important here is that the api actually depends on the data
> flow collection and state is written out as a side effect of taking a
> savepoint. The FLIP describes a lose coupling to the dataset api for
> eventual migration to BoundedStream, that is true. However, the api does
> require knowing what concrete data flow is being used to perform these
> re-partitionings  and post aggregations.
> >>>
> >>> I’m linking to my prototype implementation, particularly what actually
> happens when you call write and run the transformations. Let me know if
> that helps clarify.
> >>>
> >>> Seth
> >>>
> >>>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> >>>
> >>>
> >>>
>  On May 31, 2019, at 7:46 AM, Jan Lukavský  wrote:
> 
>  Hi Seth,
> 
>  that sounds reasonable. What I was asking for was not to reverse
> engineer binary format, but t

Re: [Discuss] FLIP-43: Savepoint Connector

2019-06-04 Thread Seth Wiesman
It seems like a recurring piece of feedback was a different name. I’d like to 
propose moving the functionality to the libraries module and naming this the 
State Processing API. 

Seth

> On May 31, 2019, at 3:47 PM, Seth Wiesman  wrote:
> 
> The SavepointOutputFormat only writes out the savepoint metadata file and 
> should be mostly ignored.
> 
> The actual state is written out by stream operators and tied into the flink 
> runtime[1, 2, 3].
> 
> This is the most important part and the piece that I don’t think can be 
> reasonably extracted.
> 
> Seth
> 
> [1] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> 
> [2] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> 
> [3] 
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> 
>> On May 31, 2019, at 3:08 PM, Jan Lukavský  wrote:
>> 
>> Hi Seth,
>> 
>> yes, that helped! :-)
>> 
>> What I was looking for is essentially 
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It 
>> would be great if this could be written in a way, that would enable reusing 
>> it in different engine (as I mentioned - Apache Spark). There seem to be 
>> some issues though. It uses interface Savepoint, which uses several other 
>> objects and interfaces from Flink's runtime. Maybe some convenience API 
>> might help - Apache Beam, handles operator naming, so that definitely should 
>> be transitionable between systems, but I'm not sure, how to construct 
>> OperatorID from this name. Would you think, that it is possible to come up 
>> with something that could be used in this portable way?
>> 
>> I understand, there are some more conditions, that need to be satisfied 
>> (grouping, aggregating, ...), which would of course have to be handled by 
>> the target system. But Apache Beam can help leverage that. My idea would be, 
>> that there can be runner specified PTransform, that takes PCollection of 
>> some tuples of `(operator name, key, state name, value1), (operator name, 
>> key, state name, value2)`, and Runner's responsibility would be to 
>> group/aggregate this so that it can be written by runner's provided writer 
>> (output format).
>> 
>> All of this would need a lot more design, these are just ideas of "what 
>> could be possible", I was just wondering if this FLIP can make some first 
>> steps towards this.
>> 
>> Many thanks for comments,
>> 
>> Jan
>> 
>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>> @Jan Gotcha,
>>> 
>>> So in reusing components it explicitly is not a writer. This is not a 
>>> savepoint output format in the way we have a parquet output format. The 
>>> reason for the Transform api is to hide the underlying details, it does not 
>>> simply append a output writer to the end of a dataset. This gets into the 
>>> implementation details but at a high level, the dataset is:
>>> 
>>> 1) partitioned using key groups
>>> 2) data is run through a standard stream operator that takes a snapshot of 
>>> its state after processing all records and outputs metadata handles for 
>>> each subtask
>>> 3) those metadata handles are aggregated down to a single savepoint handle
>>> 4) that handle is written out as a final metadata file
>>> 
>>> What’s important here is that the api actually depends on the data flow 
>>> collection and state is written out as a side effect of taking a savepoint. 
>>> The FLIP describes a lose coupling to the dataset api for eventual 
>>> migration to BoundedStream, that is true. However, the api does require 
>>> knowing what concrete data flow is being used to perform these 
>>> re-partitionings  and post aggregations.
>>> 
>>> I’m linking to my prototype implementation, particularly what actually 
>>> happens when you call write and run the transformations. Let me know if 
>>> that helps clarify.
>>> 
>>> Seth
>>> 
>>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>> 
>>> 
>>> 
 On May 31, 2019, at 7:46 AM, Jan Lukavský  wrote:
 
 Hi Seth,
 
 that sounds reasonable. What I was asking for was not to reverse engineer 
 binary format, but to make the savepoint write API a little more reusable, 
 so that it could be wrapped into some other technology. I don't know the 
 details enough to propose a solution, but it seems to me, that it could be 
 possible to use something like Writer instead of Transform. Or maybe the 
 Transform can use the Writer internally, the goal is just to enable to 
 create the

Re: Flink internals

2019-06-04 Thread Piotr Nowojski
Hi,

You can also read the FLIP proposals. Unluckily, one that is very internal [1] 
about credit based flow control [2] was not published as an official FLIP :( 
Regarding network stack and some of the other topics, there are some 
blogs/Flink Forward talks as well. 

Piotrek
[1] 
https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit
 

[2] https://issues.apache.org/jira/browse/FLINK-7282 


> On 4 Jun 2019, at 11:55, John Tipper  wrote:
> 
> Hi Till, Fan & Rong,
> 
> Thanks for your feedback - I'd seen the Flink internal page but sadly, as 
> Rong pointed out, it's pretty limited and not maintained.  I'll ask on the 
> mailing lists, but I think it would be really helpful if there were a guide 
> for Flink developers who want to contribute to Flink, as opposed to the 
> current documentation which is really aimed at Flink application developers 
> (and is really pretty good).  I'd happily contribute to this, but I can't 
> find my way round the internals...
> 
> Many thanks,
> 
> John
> 
> From: Till Rohrmann 
> Sent: 04 June 2019 09:12
> To: dev
> Subject: Re: Flink internals
> 
> Hi John,
> 
> unfortunately, there are no really good and up to date documents about
> Flink's internals. There was some discussion about updating the internals
> [1] but the community did decide against submitting it as a season of docs
> project. I agree that we should update our documentation about Flink's
> internals, though.
> 
> At the moment, the best way about learning about Flink is the code and the
> user/dev ML where you can ask about concepts and how things work.
> 
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Apache-Flink-at-Season-of-Docs-td28133.html
> 
> Cheers,
> Till
> 
> On Fri, May 31, 2019 at 2:50 AM Rong Rong  wrote:
> 
>> Hi Fan, John,
>> 
>> The flink internal link[1] seems to be not updated in the last years.
>> I found out some higher level pages in the official documentation here[2]
>> however they are still very limited.
>> 
>> Is there any other well maintained, internal documentations for
>> contributors?
>> I think it might be a good idea to maintain a higher-level guidance doc for
>> new contributors about Flink internals?
>> 
>> Best,
>> Rong
>> 
>> 
>> [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
>> [2]
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/components.html
>> 
>> On Thu, May 30, 2019 at 4:48 AM Fan Liya  wrote:
>> 
>>> You can find some articles here:
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
>>> 
>>> Best,
>>> Liya Fan
>>> 
>>> On Thu, May 30, 2019 at 6:29 PM John Tipper 
>>> wrote:
>>> 
 Hi all,
 
 Is there a guide somewhere for the internals of Flink for developers
 wanting to get involved in core development?  I'm particularly
>> interested
 in any notes on how the codebase is put together so that it's possible
>> to
 learn how it works internally.
 
 I'm particularly interested in what classes are involved in the barrier
 mechanism and state and where these are found, but it's a large
>> codebase
>>> so
 I'm sure it would be helpful for new developers to have pointers to
>> what
 does what inside the codebase or where to start looking for details of
 certain things.
 
 Many thanks,
 
 John
 
>>> 
>> 



[jira] [Created] (FLINK-12728) taskmanager container can't launch on nodemanager machine because of kerberos

2019-06-04 Thread wgcn (JIRA)
wgcn created FLINK-12728:


 Summary:   taskmanager  container  can't  launch  on nodemanager 
machine because of kerberos
 Key: FLINK-12728
 URL: https://issues.apache.org/jira/browse/FLINK-12728
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.7.2
 Environment: linux 

jdk8

hadoop 2.7.2

flink 1.7.2
Reporter: wgcn
 Attachments: AM.log, NM.log

    job can't restart when flink  job  has been running for a long time and 
then taskmanager restarting   ,i find log in AM   that  AM  request containers  
taskmanager  all the time . log in NodeManager show that  the new requested 
containers can't  downloading file from hdfs  because of kerberos . I  configed 
the keytab config that

security.kerberos.login.use-ticket-cache: false
 security.kerberos.login.keytab: /data/sysdir/knit/user/.flink.keytab
 security.kerberos.login.principal: 
[flink/client-docker-201-53.hadoop.lq@HADOOP.LQ2. 
|mailto:flink/client-docker-201-53.hadoop.lq@HADOOP.LQ2.]

 at  flink-client machine  and  keytab  is exist.  

I showed the logs at AM and NodeManager below.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Flink internals

2019-06-04 Thread John Tipper
Hi Till, Fan & Rong,

Thanks for your feedback - I'd seen the Flink internal page but sadly, as Rong 
pointed out, it's pretty limited and not maintained.  I'll ask on the mailing 
lists, but I think it would be really helpful if there were a guide for Flink 
developers who want to contribute to Flink, as opposed to the current 
documentation which is really aimed at Flink application developers (and is 
really pretty good).  I'd happily contribute to this, but I can't find my way 
round the internals...

Many thanks,

John

From: Till Rohrmann 
Sent: 04 June 2019 09:12
To: dev
Subject: Re: Flink internals

Hi John,

unfortunately, there are no really good and up to date documents about
Flink's internals. There was some discussion about updating the internals
[1] but the community did decide against submitting it as a season of docs
project. I agree that we should update our documentation about Flink's
internals, though.

At the moment, the best way about learning about Flink is the code and the
user/dev ML where you can ask about concepts and how things work.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Apache-Flink-at-Season-of-Docs-td28133.html

Cheers,
Till

On Fri, May 31, 2019 at 2:50 AM Rong Rong  wrote:

> Hi Fan, John,
>
> The flink internal link[1] seems to be not updated in the last years.
> I found out some higher level pages in the official documentation here[2]
> however they are still very limited.
>
> Is there any other well maintained, internal documentations for
> contributors?
> I think it might be a good idea to maintain a higher-level guidance doc for
> new contributors about Flink internals?
>
> Best,
> Rong
>
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/components.html
>
> On Thu, May 30, 2019 at 4:48 AM Fan Liya  wrote:
>
> > You can find some articles here:
> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
> >
> > Best,
> > Liya Fan
> >
> > On Thu, May 30, 2019 at 6:29 PM John Tipper 
> > wrote:
> >
> > > Hi all,
> > >
> > > Is there a guide somewhere for the internals of Flink for developers
> > > wanting to get involved in core development?  I'm particularly
> interested
> > > in any notes on how the codebase is put together so that it's possible
> to
> > > learn how it works internally.
> > >
> > > I'm particularly interested in what classes are involved in the barrier
> > > mechanism and state and where these are found, but it's a large
> codebase
> > so
> > > I'm sure it would be helpful for new developers to have pointers to
> what
> > > does what inside the codebase or where to start looking for details of
> > > certain things.
> > >
> > > Many thanks,
> > >
> > > John
> > >
> >
>


[jira] [Created] (FLINK-12727) Make HiveTableOutputFormat support writing partitioned tables

2019-06-04 Thread Rui Li (JIRA)
Rui Li created FLINK-12727:
--

 Summary: Make HiveTableOutputFormat support writing partitioned 
tables
 Key: FLINK-12727
 URL: https://issues.apache.org/jira/browse/FLINK-12727
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Rui Li
Assignee: Rui Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread vino yang
Hi Dian,

Thanks for your reply.

I know what you mean. However, if you think deeply, you will find your
implementation need to provide an operator which looks like a window
operator. You need to use state and receive aggregation function and
specify the trigger time. It looks like a lightweight window operator.
Right?

We try to reuse Flink provided functions and reduce complexity. IMO, It is
more user-friendly because users are familiar with the window API.

Best,
Vino


Dian Fu  于2019年6月4日周二 下午4:19写道:

> Hi Vino,
>
> Thanks a lot for starting this discussion. +1 to this feature as I think
> it will be very useful.
>
> Regarding to using window to buffer the input elements, personally I don't
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states.
> This is also not necessary for Local Aggregate operator and storing the
> input elements in memory is enough.
>
> Thanks,
> Dian
>
> > 在 2019年6月4日,上午10:03,vino yang  写道:
> >
> > Hi Ken,
> >
> > Thanks for your reply.
> >
> > As I said before, we try to reuse Flink's state concept (fault tolerance
> > and guarantee "Exactly-Once" semantics). So we did not consider cache.
> >
> > In addition, if we use Flink's state, the OOM related issue is not a key
> > problem we need to consider.
> >
> > Best,
> > Vino
> >
> > Ken Krugler  于2019年6月4日周二 上午1:37写道:
> >
> >> Hi all,
> >>
> >> Cascading implemented this “map-side reduce” functionality with an LLR
> >> cache.
> >>
> >> That worked well, as then the skewed keys would always be in the cache.
> >>
> >> The API let you decide the size of the cache, in terms of number of
> >> entries.
> >>
> >> Having a memory limit would have been better for many of our use cases,
> >> though FWIR there’s no good way to estimate in-memory size for objects.
> >>
> >> — Ken
> >>
> >>> On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> The localKeyBy API returns an instance of KeyedStream (we just added an
> >>> inner flag to identify the local mode) which is Flink has provided
> >> before.
> >>> Users can call all the APIs(especially *window* APIs) which KeyedStream
> >>> provided.
> >>>
> >>> So if users want to use local aggregation, they should call the window
> >> API
> >>> to build a local window that means users should (or say "can") specify
> >> the
> >>> window length and other information based on their needs.
> >>>
> >>> I think you described another idea different from us. We did not try to
> >>> react after triggering some predefined threshold. We tend to give users
> >> the
> >>> discretion to make decisions.
> >>>
> >>> Our design idea tends to reuse Flink provided concept and functions
> like
> >>> state and window (IMO, we do not need to worry about OOM and the issues
> >> you
> >>> mentioned).
> >>>
> >>> Best,
> >>> Vino
> >>>
> >>> Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
> >>>
>  Hi,
> 
>  +1 for the idea from my side. I’ve even attempted to add similar
> feature
>  quite some time ago, but didn’t get enough traction [1].
> 
>  I’ve read through your document and I couldn’t find it mentioning
>  anywhere, when the pre aggregated result should be emitted down the
> >> stream?
>  I think that’s one of the most crucial decision, since wrong decision
> >> here
>  can lead to decrease of performance or to an explosion of memory/state
>  consumption (both with bounded and unbounded data streams). For
> >> streaming
>  it can also lead to an increased latency.
> 
>  Since this is also a decision that’s impossible to make automatically
>  perfectly reliably, first and foremost I would expect this to be
>  configurable via the API. With maybe some predefined triggers, like on
>  watermark (for windowed operations), on checkpoint barrier (to
> decrease
>  state size?), on element count, maybe memory usage (much easier to
> >> estimate
>  with a known/predefined types, like in SQL)… and with some option to
>  implement custom trigger.
> 
>  Also what would work the best would be to have a some form of memory
>  consumption priority. For example if we are running out of memory for
>  HashJoin/Final aggregation, instead of spilling to disk or crashing
> the
> >> job
>  with OOM it would be probably better to prune/dump the pre/local
>  aggregation state. But that’s another story.
> 
>  [1] https://github.com/apache/flink/pull/4626 <
>  https://github.com/apache/flink/pull/4626>
> 
>  Piotrek
> 
> > On 3 Jun 2019, at 10:16, sf lee  wrote:
> >
> > Excited and  Big +1 for this feature.
> >
> > SHI Xiaogang  于2019年6月3日周一 下午3:37写道:
> >
> >> Nice feature.
> >> Looking forward to having it in Flink.
> >>
> >> Regards,
> >> Xiaogang
> >>>

Re: What does flink session mean ?

2019-06-04 Thread Jeff Zhang
Thanks for the reply, @Till Rohrmann .  Regarding
reuse computed results. I think JM keep all the metadata of intermediate
data, and interactive programming is also trying to reuse computed results.
It looks like it may not be necessary to introduce the session concept as
long as we can achieve reusing computed results. Let me if I understand it
correctly.



Till Rohrmann  于2019年6月4日周二 下午4:03写道:

> Hi Jeff,
>
> the session functionality which you find in Flink's client are the
> remnants of an uncompleted feature which was abandoned. The idea was that
> one could submit multiple parts of a job to the same cluster where these
> parts are added to the same ExecutionGraph. That way we wanted to allow to
> reuse computed results when using a notebook for ad-hoc queries, for
> example. But as I said, this feature has never been completed.
>
> Cheers,
> Till
>
> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>
>>
>> Hi Folks,
>>
>>
>> When I read the flink client api code, the concept of session is a little
>> vague and unclear to me. It looks like the session concept is only applied
>> in batch mode (I only see it in ExecutionEnvironment but not in
>> StreamExecutionEnvironment). But for local mode
>> (LocalExecutionEnvironment), starting one new session is starting one new
>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>> new session is just starting one new ClusterClient instead of one new
>> cluster. So I am confused what does flink session really mean. Could anyone
>> help me understand this ? Thanks.
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


[jira] [Created] (FLINK-12726) Fix ANY type serialization

2019-06-04 Thread Timo Walther (JIRA)
Timo Walther created FLINK-12726:


 Summary: Fix ANY type serialization
 Key: FLINK-12726
 URL: https://issues.apache.org/jira/browse/FLINK-12726
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Every logical type needs to be string serializable. In old versions we used 
Java serialization logic for it. Since an any type has no type information 
anymore but just type serializer, we can use the snapshot to write out an any 
type into properties in a backward compatible way.

However, the current serialization logic is wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread Piotr Nowojski
Hi Vino,

> So if users want to use local aggregation, they should call the window API
> to build a local window that means users should (or say "can") specify the
> window length and other information based on their needs.

It sounds ok for me. It would have to be run against some API guys from the 
community though.

Piotrek

> On 4 Jun 2019, at 10:19, Dian Fu  wrote:
> 
> Hi Vino,
> 
> Thanks a lot for starting this discussion. +1 to this feature as I think it 
> will be very useful.
> 
> Regarding to using window to buffer the input elements, personally I don't 
> think it's a good solution for the following reasons:
> 1) As we know that WindowOperator will store the accumulated results in 
> states, this is not necessary for Local Aggregate operator.
> 2) For WindowOperator, each input element will be accumulated to states. This 
> is also not necessary for Local Aggregate operator and storing the input 
> elements in memory is enough.
> 
> Thanks,
> Dian
> 
>> 在 2019年6月4日,上午10:03,vino yang  写道:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply.
>> 
>> As I said before, we try to reuse Flink's state concept (fault tolerance
>> and guarantee "Exactly-Once" semantics). So we did not consider cache.
>> 
>> In addition, if we use Flink's state, the OOM related issue is not a key
>> problem we need to consider.
>> 
>> Best,
>> Vino
>> 
>> Ken Krugler  于2019年6月4日周二 上午1:37写道:
>> 
>>> Hi all,
>>> 
>>> Cascading implemented this “map-side reduce” functionality with an LLR
>>> cache.
>>> 
>>> That worked well, as then the skewed keys would always be in the cache.
>>> 
>>> The API let you decide the size of the cache, in terms of number of
>>> entries.
>>> 
>>> Having a memory limit would have been better for many of our use cases,
>>> though FWIR there’s no good way to estimate in-memory size for objects.
>>> 
>>> — Ken
>>> 
 On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
 
 Hi Piotr,
 
 The localKeyBy API returns an instance of KeyedStream (we just added an
 inner flag to identify the local mode) which is Flink has provided
>>> before.
 Users can call all the APIs(especially *window* APIs) which KeyedStream
 provided.
 
 So if users want to use local aggregation, they should call the window
>>> API
 to build a local window that means users should (or say "can") specify
>>> the
 window length and other information based on their needs.
 
 I think you described another idea different from us. We did not try to
 react after triggering some predefined threshold. We tend to give users
>>> the
 discretion to make decisions.
 
 Our design idea tends to reuse Flink provided concept and functions like
 state and window (IMO, we do not need to worry about OOM and the issues
>>> you
 mentioned).
 
 Best,
 Vino
 
 Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
 
> Hi,
> 
> +1 for the idea from my side. I’ve even attempted to add similar feature
> quite some time ago, but didn’t get enough traction [1].
> 
> I’ve read through your document and I couldn’t find it mentioning
> anywhere, when the pre aggregated result should be emitted down the
>>> stream?
> I think that’s one of the most crucial decision, since wrong decision
>>> here
> can lead to decrease of performance or to an explosion of memory/state
> consumption (both with bounded and unbounded data streams). For
>>> streaming
> it can also lead to an increased latency.
> 
> Since this is also a decision that’s impossible to make automatically
> perfectly reliably, first and foremost I would expect this to be
> configurable via the API. With maybe some predefined triggers, like on
> watermark (for windowed operations), on checkpoint barrier (to decrease
> state size?), on element count, maybe memory usage (much easier to
>>> estimate
> with a known/predefined types, like in SQL)… and with some option to
> implement custom trigger.
> 
> Also what would work the best would be to have a some form of memory
> consumption priority. For example if we are running out of memory for
> HashJoin/Final aggregation, instead of spilling to disk or crashing the
>>> job
> with OOM it would be probably better to prune/dump the pre/local
> aggregation state. But that’s another story.
> 
> [1] https://github.com/apache/flink/pull/4626 <
> https://github.com/apache/flink/pull/4626>
> 
> Piotrek
> 
>> On 3 Jun 2019, at 10:16, sf lee  wrote:
>> 
>> Excited and  Big +1 for this feature.
>> 
>> SHI Xiaogang  于2019年6月3日周一 下午3:37写道:
>> 
>>> Nice feature.
>>> Looking forward to having it in Flink.
>>> 
>>> Regards,
>>> Xiaogang
>>> 
>>> vino yang  于2019年6月3日周一 下午3:31写道:
>>> 
 Hi all,
 
 As we mentioned in some conference, such as Flink Forward SF 2019 and
>>> QCon
 Beijing 2019, our team has im

Re: [DISCUSS] Support Local Aggregation in Flink

2019-06-04 Thread Dian Fu
Hi Vino,

Thanks a lot for starting this discussion. +1 to this feature as I think it 
will be very useful.

Regarding to using window to buffer the input elements, personally I don't 
think it's a good solution for the following reasons:
1) As we know that WindowOperator will store the accumulated results in states, 
this is not necessary for Local Aggregate operator.
2) For WindowOperator, each input element will be accumulated to states. This 
is also not necessary for Local Aggregate operator and storing the input 
elements in memory is enough.

Thanks,
Dian

> 在 2019年6月4日,上午10:03,vino yang  写道:
> 
> Hi Ken,
> 
> Thanks for your reply.
> 
> As I said before, we try to reuse Flink's state concept (fault tolerance
> and guarantee "Exactly-Once" semantics). So we did not consider cache.
> 
> In addition, if we use Flink's state, the OOM related issue is not a key
> problem we need to consider.
> 
> Best,
> Vino
> 
> Ken Krugler  于2019年6月4日周二 上午1:37写道:
> 
>> Hi all,
>> 
>> Cascading implemented this “map-side reduce” functionality with an LLR
>> cache.
>> 
>> That worked well, as then the skewed keys would always be in the cache.
>> 
>> The API let you decide the size of the cache, in terms of number of
>> entries.
>> 
>> Having a memory limit would have been better for many of our use cases,
>> though FWIR there’s no good way to estimate in-memory size for objects.
>> 
>> — Ken
>> 
>>> On Jun 3, 2019, at 2:03 AM, vino yang  wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> The localKeyBy API returns an instance of KeyedStream (we just added an
>>> inner flag to identify the local mode) which is Flink has provided
>> before.
>>> Users can call all the APIs(especially *window* APIs) which KeyedStream
>>> provided.
>>> 
>>> So if users want to use local aggregation, they should call the window
>> API
>>> to build a local window that means users should (or say "can") specify
>> the
>>> window length and other information based on their needs.
>>> 
>>> I think you described another idea different from us. We did not try to
>>> react after triggering some predefined threshold. We tend to give users
>> the
>>> discretion to make decisions.
>>> 
>>> Our design idea tends to reuse Flink provided concept and functions like
>>> state and window (IMO, we do not need to worry about OOM and the issues
>> you
>>> mentioned).
>>> 
>>> Best,
>>> Vino
>>> 
>>> Piotr Nowojski  于2019年6月3日周一 下午4:30写道:
>>> 
 Hi,
 
 +1 for the idea from my side. I’ve even attempted to add similar feature
 quite some time ago, but didn’t get enough traction [1].
 
 I’ve read through your document and I couldn’t find it mentioning
 anywhere, when the pre aggregated result should be emitted down the
>> stream?
 I think that’s one of the most crucial decision, since wrong decision
>> here
 can lead to decrease of performance or to an explosion of memory/state
 consumption (both with bounded and unbounded data streams). For
>> streaming
 it can also lead to an increased latency.
 
 Since this is also a decision that’s impossible to make automatically
 perfectly reliably, first and foremost I would expect this to be
 configurable via the API. With maybe some predefined triggers, like on
 watermark (for windowed operations), on checkpoint barrier (to decrease
 state size?), on element count, maybe memory usage (much easier to
>> estimate
 with a known/predefined types, like in SQL)… and with some option to
 implement custom trigger.
 
 Also what would work the best would be to have a some form of memory
 consumption priority. For example if we are running out of memory for
 HashJoin/Final aggregation, instead of spilling to disk or crashing the
>> job
 with OOM it would be probably better to prune/dump the pre/local
 aggregation state. But that’s another story.
 
 [1] https://github.com/apache/flink/pull/4626 <
 https://github.com/apache/flink/pull/4626>
 
 Piotrek
 
> On 3 Jun 2019, at 10:16, sf lee  wrote:
> 
> Excited and  Big +1 for this feature.
> 
> SHI Xiaogang  于2019年6月3日周一 下午3:37写道:
> 
>> Nice feature.
>> Looking forward to having it in Flink.
>> 
>> Regards,
>> Xiaogang
>> 
>> vino yang  于2019年6月3日周一 下午3:31写道:
>> 
>>> Hi all,
>>> 
>>> As we mentioned in some conference, such as Flink Forward SF 2019 and
>> QCon
>>> Beijing 2019, our team has implemented "Local aggregation" in our
>> inner
>>> Flink fork. This feature can effectively alleviate data skew.
>>> 
>>> Currently, keyed streams are widely used to perform aggregating
>> operations
>>> (e.g., reduce, sum and window) on the elements that having the same
 key.
>>> When executed at runtime, the elements with the same key will be sent
 to
>>> and aggregated by the same task.
>>> 
>>> The performance of these aggregating operations is very sensitive to
 the
>>>

Re: Flink internals

2019-06-04 Thread Till Rohrmann
Hi John,

unfortunately, there are no really good and up to date documents about
Flink's internals. There was some discussion about updating the internals
[1] but the community did decide against submitting it as a season of docs
project. I agree that we should update our documentation about Flink's
internals, though.

At the moment, the best way about learning about Flink is the code and the
user/dev ML where you can ask about concepts and how things work.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Apache-Flink-at-Season-of-Docs-td28133.html

Cheers,
Till

On Fri, May 31, 2019 at 2:50 AM Rong Rong  wrote:

> Hi Fan, John,
>
> The flink internal link[1] seems to be not updated in the last years.
> I found out some higher level pages in the official documentation here[2]
> however they are still very limited.
>
> Is there any other well maintained, internal documentations for
> contributors?
> I think it might be a good idea to maintain a higher-level guidance doc for
> new contributors about Flink internals?
>
> Best,
> Rong
>
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/components.html
>
> On Thu, May 30, 2019 at 4:48 AM Fan Liya  wrote:
>
> > You can find some articles here:
> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
> >
> > Best,
> > Liya Fan
> >
> > On Thu, May 30, 2019 at 6:29 PM John Tipper 
> > wrote:
> >
> > > Hi all,
> > >
> > > Is there a guide somewhere for the internals of Flink for developers
> > > wanting to get involved in core development?  I'm particularly
> interested
> > > in any notes on how the codebase is put together so that it's possible
> to
> > > learn how it works internally.
> > >
> > > I'm particularly interested in what classes are involved in the barrier
> > > mechanism and state and where these are found, but it's a large
> codebase
> > so
> > > I'm sure it would be helpful for new developers to have pointers to
> what
> > > does what inside the codebase or where to start looking for details of
> > > certain things.
> > >
> > > Many thanks,
> > >
> > > John
> > >
> >
>


Re: What does flink session mean ?

2019-06-04 Thread Till Rohrmann
Hi Jeff,

the session functionality which you find in Flink's client are the remnants
of an uncompleted feature which was abandoned. The idea was that one could
submit multiple parts of a job to the same cluster where these parts are
added to the same ExecutionGraph. That way we wanted to allow to reuse
computed results when using a notebook for ad-hoc queries, for example. But
as I said, this feature has never been completed.

Cheers,
Till

On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:

>
> Hi Folks,
>
>
> When I read the flink client api code, the concept of session is a little
> vague and unclear to me. It looks like the session concept is only applied
> in batch mode (I only see it in ExecutionEnvironment but not in
> StreamExecutionEnvironment). But for local mode
> (LocalExecutionEnvironment), starting one new session is starting one new
> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
> new session is just starting one new ClusterClient instead of one new
> cluster. So I am confused what does flink session really mean. Could anyone
> help me understand this ? Thanks.
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [DISCUSS] Features for Apache Flink 1.9.0

2019-06-04 Thread Till Rohrmann
Thanks for starting this discussion Gordon and Kurt. For the development
threads I'm involved with here are the updates:

* Pluggable scheduler: Good part of the work is completed. Gary now works
on the glue code to use the new high level scheduler components. The
estimate to finish this work is end of June (estimate: 4 weeks starting
from this week). The changes to the scheduler would benefit from very
thorough testing because they are core to Flink.

* External shuffle service: As Zhijiang said, we hope to finish the work by
the end of this week or early next week (estimate: 1 week from now).

* Result partition life cycle management / fine grained recovery: The
current estimate to complete this feature would be end of next week or
beginning of the week afterwards (estimate: 2 weeks from now). This feature
should enable fine grained recovery for batch.

* Java 9 support: Flink builds with Java 9. Not all e2e tests are running
with Java 9 though.

* Active K8s integration: PRs are open but reviews are still pending.

Cheers,
Till

On Wed, May 29, 2019 at 4:45 AM Biao Liu  wrote:

> Thanks for being the release manager, Gordon & Kurt.
>
> For FLIP-27, there are still some more details need to discuss. I don't
> think it could catch up the release of 1.9. @Aljoscha, @Stephan, do you
> agree that?
>
> zhijiang  于2019年5月28日周二 下午11:28写道:
>
> > Hi Gordon,
> >
> > Thanks for the kind reminder of feature freeze date for 1.9.0. I think
> the
> > date makes sense on my side.
> >
> > For FLIP-31, I and Andrey could be done within two weeks or so.
> > And I already finished my side work for FLIP-1.
> >
> > Best,
> > Zhijiang
> >
> >
> > --
> > From:Timo Walther 
> > Send Time:2019年5月28日(星期二) 19:26
> > To:dev 
> > Subject:Re: [DISCUSS] Features for Apache Flink 1.9.0
> >
> > Thanks for being the release managers, Kurt and Gordon!
> >
> >  From the Table & SQL API side, there are still a lot of open issues
> > that need to be solved to decouple the API from a planner and enable the
> > Blink planner. Also we need to make sure that the Blink planner supports
> > at least everything of Flink 1.8 to not introduce a regression. We might
> > need to focus more on the main features which is a runnable Blink
> > planner and might need to postpone other discussions such as DDL, new
> > source/sink interfaces, or proper type inference logic. However, in many
> > cases there are shortcuts that we could take in order to achieve our
> > goals. So I'm confident that we solve the big blockers until the feature
> > freeze :)
> >
> > I will keep you updated.
> >
> > Thanks,
> > Timo
> >
> >
> > Am 28.05.19 um 05:07 schrieb Kurt Young:
> > > Thanks Gordon for bringing this up.
> > >
> > > I'm glad to say that blink planner merge work is almost done, and i
> will
> > > follow up the work of
> > > integrating blink planner with Table API to co-exist with current flink
> > > planner.
> > >
> > > In addition to this, the following features:
> > > 1. FLIP-32: Restructure flink-table for future contributions [1]
> > > 2. FLIP-37: Rework of the Table API Type System [2]
> > > 3. Hive integration work (including hive meta [3] and connectors)
> > >
> > > are also going well, i will spend some time to keep track of them.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
> > > [3]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Mon, May 27, 2019 at 7:18 PM jincheng sun  >
> > > wrote:
> > >
> > >> Hi Gordon,
> > >>
> > >> Thanks for mention the feature freeze date for 1.9.0, that's very
> > helpful
> > >> for contributors to evaluate their dev plan!
> > >>
> > >> Regarding FLIP-29, we are glad to do our best to finish the dev of
> > FLIP-29,
> > >> then catch up with the release of 1.9.
> > >>
> > >> Thanks again for push the release of 1.9.0 forward!
> > >>
> > >> Cheers,
> > >> Jincheng
> > >>
> > >>
> > >>
> > >> Tzu-Li (Gordon) Tai  于2019年5月27日周一 下午5:48写道:
> > >>
> > >>> Hi all,
> > >>>
> > >>> I want to kindly remind the community that we're now 5 weeks away
> from
> > >> the
> > >>> proposed feature freeze date for 1.9.0, which is June 28.
> > >>>
> > >>> This is not yet a final date we have agreed on, so I would like to
> > start
> > >>> collecting feedback on how the mentioned features are going, and in
> > >>> general, whether or not the date sounds reasonable given the current
> > >> status
> > >>> of the ongoing efforts.
> > >>> Please let me know what you think!
> > >>>
> > >>> Cheers,
> > >>> Gordon
> > >>>
> > >>>
> > >>> On Mon, May 27, 2019 at 5:40 PM Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org
> > >>>
> > >>> wrote:
> > >>>
> >  @Hequn @Jincheng
> > 
> >  

[jira] [Created] (FLINK-12725) Need to copy flink-hadoop-compatibility jar explicitly to ${FLINK-HOME}/lib location

2019-06-04 Thread arganzheng (JIRA)
arganzheng created FLINK-12725:
--

 Summary: Need to copy flink-hadoop-compatibility jar explicitly to 
${FLINK-HOME}/lib location
 Key: FLINK-12725
 URL: https://issues.apache.org/jira/browse/FLINK-12725
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Reporter: arganzheng


I am currently working on an Flink application that uses some of the Hadoop 
dependencies to write the data to HDFS.  On local environment it is working 
fine, however when I deploy this Flink application on the cluster it throws an 
exception related to compatibility issue.
The error message that I am getting is 
 ```
java.lang.RuntimeException: Could not load the TypeInformation for the class 
'org.apache.hadoop.io.Writable'. You may be missing the 
'flink-hadoop-compatibility' dependency. at 
org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
 at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
 at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
 at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
 
```
I try to included the maven dependency of {{flink-hadoop-compatibility}} jar in 
POM dependency. But it is not detecting it. The Flink version I am using is 
1.8.0

However, when I explicitly copy the compatibility JAR to the 
{{${FLINK-HOME}/lib}} location, I am not getting any exception and able to run 
the Flink application successfully.

I try dive into the source code, and find the problem:

```java
package org.apache.flink.api.java.typeutils;

public class TypeExtractor {

/** The name of the class representing Hadoop's writable */
 private static final String HADOOP_WRITABLE_CLASS = 
"org.apache.hadoop.io.Writable";
 private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = 
"org.apache.flink.api.java.typeutils.WritableTypeInfo";


 // visible for testing
 public static  TypeInformation createHadoopWritableTypeInfo(Class 
clazz) {
 checkNotNull(clazz);

Class typeInfoClass;
 try {
 typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, 
TypeExtractor.class.getClassLoader());
 }
 catch (ClassNotFoundException e) {
 throw new RuntimeException("Could not load the TypeInformation for the class '"
 + HADOOP_WRITABLE_CLASS + "'. You may be missing the 
'flink-hadoop-compatibility' dependency.");
 }

...
 }
}
```

This is because `org.apache.hadoop.io.Writable` is mean to be loaded by 
TypeExtractor.class.getClassLoader() which is `AppClassLoader`, and the 
submited flink jar is loaded by `ParentFirstClassLoader`, which is the child of 
`AppClassLoader`, so `AppClassLoader` can not load 
`org.apache.hadoop.io.Writable` from your flink jar.

I'm not sure if it's a bug, change to classLoader to 
`Thread.currentThread().getContextClassLoader()` will make it work without copy 
the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)