Re: [PROPOSAL] Introduce Elastic Bloom Filter For Flink

2018-05-23 Thread Elias Levy
I would suggest you consider an alternative data structures: a Cuckoo
Filter or a Golumb Compressed Sequence.

The GCS data structure was introduced in Cache-, Hash- and Space-Efficient
Bloom Filters
 by
F. Putze, P. Sanders, and J. Singler.  See section 4.



> We should discuss which exact implementation of bloom filters are the best
> fit.
> @Fabian: There are also implementations of bloom filters that use counting
> and therefore support
> deletes, but obviously this comes at the cost of a potentially higher
> space consumption.
>
> Am 23.05.2018 um 11:29 schrieb Fabian Hueske :
>> IMO, such a feature would be very interesting. However, my concerns with
>> Bloom Filter
>> is that they are insert-only data structures, i.e., it is not possible to
>> remove keys once
>> they were added. This might render the filter useless over time.
>> In a different thread (see discussion in FLINK-8918 [1]), you mentioned
>> that the Bloom
>> Filters would be growing.
>> If we keep them in memory, how can we prevent them from exceeding memory
>> boundaries over
>> time?
>
>


Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Elias Levy
Since wishes are free:

- Standalone cluster job isolation:
https://issues.apache.org/jira/browse/FLINK-8886
- Proper sliding window joins (not overlapping hoping window joins):
https://issues.apache.org/jira/browse/FLINK-6243
- Sharing state across operators:
https://issues.apache.org/jira/browse/FLINK-6239
- Synchronizing streams: https://issues.apache.org/jira/browse/FLINK-4558

Seconded:
- Atomic cancel-with-savepoint:
https://issues.apache.org/jira/browse/FLINK-7634
- Support dynamically changing CEP patterns :
https://issues.apache.org/jira/browse/FLINK-7129


On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  wrote:

> Hi all!
>
> Thanks for the discussion and good input. Many suggestions fit well with
> the proposal above.
>
> Please bear in mind that with a time-based release model, we would release
> whatever is mature by end of July.
> The good thing is we could schedule the next release not too far after
> that, so that the features that did not quite make it will not be delayed
> too long.
> In some sense, you could read this as as "*what to do first*" list,
> rather than "*this goes in, other things stay out"*.
>
> Some thoughts on some of the suggestions
>
> *Kubernetes integration:* An opaque integration with Kubernetes should be
> supported through the "as a library" mode. For a deeper integration, I know
> that some committers have experimented with some PoC code. I would let Till
> add some thoughts, he has worked the most on the deployment parts recently.
>
> *Per partition watermarks with idleness:* Good point, could one implement
> that on the current interface, with a periodic watermark extractor?
>
> *Atomic cancel-with-savepoint:* Agreed, this is important. Making this
> work with all sources needs a bit more work. We should have this in the
> roadmap.
>
> *Elastic Bloomfilters:* This seems like an interesting new feature - the
> above suggested feature set was more about addressing some longer standing
> issues/requests. However, nothing should prevent contributors to work on
> that.
>
> Best,
> Stephan
>
>
> On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
> wrote:
>
>> +1 on https://issues.apache.org/jira/browse/FLINK-5479
>> [FLINK-5479] Per-partition watermarks in ...
>> 
>> issues.apache.org
>> Reported in ML: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-
>> skewness-causes-watermark-not-being-emitted-td11008.html It's normally
>> not a common case to have Kafka partitions not producing any data, but
>> it'll probably be good to handle this as well. I ...
>>
>> --
>> *From:* Rico Bergmann 
>> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
>> *To:* Hao Sun
>> *Cc:* dev@flink.apache.org; user
>> *Subject:* Re: [DISCUSS] Flink 1.6 features
>>
>> +1 on K8s integration
>>
>>
>>
>> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>>
>> adding my vote to K8S Job mode, maybe it is this?
>> > Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>
>>
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
>> wrote:
>>
>> Hi Stephan,
>>
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
>> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
>> partitions) be included in 1.6? As we are seeing more users with this
>> issue on the mailing lists.
>>
>> Thanks.
>> Ben
>>
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.m
>> box/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@m
>> ail.gmail.com%3e
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: dev@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make 

Re: [VOTE] Release 1.6.0, release candidate #2

2018-08-03 Thread Elias Levy
On Fri, Aug 3, 2018 at 9:23 AM Till Rohrmann  wrote:

> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12342760


Shouldn't the release notes only include issues that have been closed, not
simple those that have a Fix Version equal to the release version?  There
are a lot of issues with an assigned Fix Version that are still open.


Re: [Discuss] Outer join support and timestamp assignment for IntervalJoin

2018-08-13 Thread Elias Levy
As a developer, while not quite a succinct, I feel that option A in both
cases is easier to read.

On Mon, Aug 13, 2018 at 4:18 AM Florian Schmidt 
wrote:

> Hello Community,
>
> I’ve recently been working on adding support for outer joins [1] and
> timestamp assignment [2] to the IntervalJoin in the DataStream API.
> As this is a public API and it should be simple and understandable for
> users I wanted to gather some feedback on some variations that I drafted up:
>
> 1. Add outer joins
>
> Approach A
>
> keyedStreamA.intervalJoin(keyedStreamB)
> .leftOuter() // .rightOuter, .fullOuter()
> .between(, )
> .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
> intervalFullOuterJoin
> .between(, )
> .process(new ProcessJoinFunction() { /* … */ }
>
> Approach C
>
> keyedStreamA.intervalJoin(keyedStreamB)
> .joinType(JoinType.INNER) // Reuse existing (internally
> used) JoinType
>
>
> Personally I feel like C is the cleanest approach, but it has the problem
> that checking for invalid timestamp strategy & join combinations can only
> be done during runtime, whereas A and B would allow us to express valid
> combinations through the type system.
>
> 2. Assign timestamps to the joined pairs
>
> When two elements are joined together, this will add support for
> specifying which of the elements timestamps should be assigned as the
> results timestamp.
> The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the
> minimum of the two elements timestamps, MAX the maximum, LEFT the left
> elements timestamp and RIGHT the right elements timestamp.
>
> Approach A
>
> keyedStreamA.intervalJoin(streamB)
> .between(, )
> .assignLeftTimestamp() // assignRightTimestamp(),
> assignMinTimestamp(), assignMaxTimestamp()
> .process(new ProcessJoinFunction() { /* … */ }
>
> Approach B
>
> keyedStreamA.intervalJoin(keyedStreamB)
> .between(, )
> .assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN,
> .MAX
>
> Again I feel like B is the cleanest approach, but has the same caveat with
> runtime vs. type system checks as the approach above. This could be
> especially interesting when it comes to combinations of join types and
> timestamp assignments, where we will have a few combinations that are not
> possibly.
>
> Any feedback would be greatly appreciated. I also updated the design doc
> at [3] if anyone wants to hop in on further discussions!
>
> Florian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8483 <
> https://issues.apache.org/jira/browse/FLINK-8483>
> [2] https://issues.apache.org/jira/browse/FLINK-8482 <
> https://issues.apache.org/jira/browse/FLINK-8482>
> [3]
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> <
> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
> >
>
>


Re: [DISCUSS] Release Flink 1.5.3

2018-08-16 Thread Elias Levy
If I may, think FLINK-10011 should be considered a blocker and included in
1.5.3 and a 1.4.3 release.

On Thu, Aug 16, 2018 at 3:09 AM Dominik Wosiński  wrote:

> +1, I agree that frequent releases are good for users.
> Dominik
>
>
> Wysłane z aplikacji Poczta dla Windows 10
>
> Od: Stefan Richter
> Wysłano: czwartek, 16 sierpnia 2018 10:57
> Do: dev
> Temat: Re: [DISCUSS] Release Flink 1.5.3
>
> +1 sounds good.
>
> Stefan
>
> > Am 16.08.2018 um 09:55 schrieb Piotr Nowojski :
> >
> > +1 from me
> >
> > Piotrek
> >
> >> On 16 Aug 2018, at 09:40, Timo Walther  wrote:
> >>
> >> Thank you for starting this discussion.
> >>
> >> +1 for this
> >>
> >> Regards,
> >> Timo
> >>
> >> Am 16.08.18 um 09:27 schrieb vino yang:
> >>> Agree! This sounds very good.
> >>>
> >>> Till Rohrmann  于2018年8月16日周四 下午3:14写道:
> >>>
>  +1 for starting the release process 1.5.3 immediately. We can always
>  create another bug fix release afterwards. I think the more often we
>  release the better it is for our users, because they receive
> incremental
>  improvements faster.
> 
>  Cheers,
>  Till
> 
>  On Thu, Aug 16, 2018 at 8:52 AM Chesnay Schepler 
>  wrote:
> 
> > I would actually start with the release process today unless anyone
> > objects.
> >
> > On 16.08.2018 08:46, vino yang wrote:
> >> Hi Chesnay,
> >>
> >> +1
> >>  I want to know when you plan to cut the branch.
> >>
> >> Thanks, vino.
> >>
> >> Chesnay Schepler  于2018年8月16日周四 下午2:29写道:
> >>
> >>> Hello everyone,
> >>>
> >>> it has been a little over 2 weeks since 1.5.2 was released, and
> since
> >>> then a number of fixes were added to the release-1.5 that I think
> we
> >>> should push to users.
> >>>
> >>> Notable fixes, (among others) are FLINK-9969 which fixes a memory
> leak
> >>> when using batch, FLINK-10066 that reduces the memory print on the
> JM
> >>> for long-running jobs or FLINK-10070 that reverses a regression
> >>> introduced in 1.5.2 due to which Flink could not be compiled with
> >>> certain maven versions.
> >>>
> >>> I would of course volunteer as Release Manager.
> >>>
> >>>
> >
> >>
> >
>
>
>


Re: Flink 1.7 Development Priorities

2018-08-23 Thread Elias Levy
I would be nice to see the state TTL work finished.  At the moment state
will not expire if a key is not accessed while the job is running, which
means jobs that require state expiration must continue using timers to
expire data.

On Thu, Aug 23, 2018 at 1:12 AM Aljoscha Krettek 
wrote:

> Hi Everyone,
>
> After the recent Flink 1.6 release the people working on Flink at data
> Artisans came together to talk about what we want to work on for Flink 1.7.
> The following is a list of high-level directions that we will be working on
> for the next couple of months. This doesn't mean that other things are not
> important or maybe more important, so please chime in.
>
> That being said, here's the high-level list:
>  - make the Rest API versioned
>  - provide docker-compose based quickstarts for Flink SQL
>  - support writing to S3 in the new streaming file sink
>  - add a new type of join that allows "joining streams with tables"
>  - Scala 2.12 support
>  - improvements to resource scheduling, local recovery
>  - improved support for running Flink in containers, Flink dynamically
> reacting to changes in the container deployment
>  - automatic rescaling policies
>  - initial support for state migration, i.e. changing the
> schema/TypeSerializer of Flink State
>
> This is also an invitation for others to post what they would like to work
> on and also to point out glaring omissions.
>
> Best,
> Aljoscha


Re: [DISCUSS] Breaking the Scala API for Scala 2.12 Support

2018-10-04 Thread Elias Levy
The second alternative, with the addition of methods that take functions
with Scala types, seems the most sensible.  I wonder if there is a need
then to maintain the *J Java parameter methods, or whether users could just
access the functionality by converting the Scala DataStreams to Java via
.javaStream and whatever the equivalent is for DataSets.

On Thu, Oct 4, 2018 at 8:10 AM Aljoscha Krettek  wrote:

> Hi,
>
> I'm currently working on https://issues.apache.org/jira/browse/FLINK-7811,
> with the goal of adding support for Scala 2.12. There is a bit of a hurdle
> and I have to explain some context first.
>
> With Scala 2.12, lambdas are implemented using the lambda mechanism of
> Java 8, i.e. Scala lambdas are now SAMs (Single Abstract Method). This
> means that the following two method definitions can both take a lambda:
>
> def map[R](mapper: MapFunction[T, R]): DataSet[R]
> def map[R](fun: T => R): DataSet[R]
>
> The Scala compiler gives precedence to the lambda version when you call
> map() with a lambda in simple cases, so it works here. You could still call
> map() with a lambda if the lambda version of the method weren't here
> because they are now considered the same. For Scala 2.11 we need both
> signatures, though, to allow calling with a lambda and with a MapFunction.
>
> The problem is with more complicated method signatures, like:
>
> def reduceGroup[R](fun: (scala.Iterator[T], Collector[R]) => Unit):
> DataSet[R]
>
> def reduceGroup[R](reducer: GroupReduceFunction[T, R]): DataSet[R]
>
> (for reference, GroupReduceFunction is a SAM with void
> reduce(java.lang.Iterable values, Collector out))
>
> These two signatures are not the same but similar enough for the Scala
> 2.12 compiler to "get confused". In Scala 2.11, I could call reduceGroup()
> with a lambda that doesn't have parameter type definitions and things would
> be fine. With Scala 2.12 I can't do that because the compiler can't figure
> out which method to call and requires explicit type definitions on the
> lambda parameters.
>
> I see some solutions for this:
>
> 1. Keep the methods as is, this would force people to always explicitly
> specify parameter types on their lambdas.
>
> 2. Rename the second method to reduceGroupJ() to signal that it takes a
> user function that takes Java-style interfaces (the first parameter is
> java.lang.Iterable while the Scala lambda takes a scala.Iterator). This
> disambiguates the code, users can use lambdas without specifying explicit
> parameter types but breaks the API.
>
> One effect of 2. would be that we can add a reduceGroup() method that
> takes a api.scala.GroupReduceFunction that takes proper Scala types, thus
> it would allow people to implement user functions without having to cast
> the various Iterator/Iterable parameters.
>
> Either way, people would have to adapt their code when moving to Scala
> 2.12 in some way, depending on what style of methods they use.
>
> There is also solution 2.5:
>
> 2.5 Rename the methods only in the Scala 2.12 build of Flink and keep the
> old method names for Scala 2.11. This would require some infrastructure and
> I don't yet know how it can be done in a sane way.
>
> What do you think? I personally would be in favour of 2. but it breaks the
> existing API.
>
> Best,
> Aljoscha
>
>
>
>


Re: Sharing state between subtasks

2018-10-08 Thread Elias Levy
Kafka Streams handles this problem, time alignment, by processing records
from the partitions with the lowest timestamp in a best effort basis.
See KIP-353 for the details.  The same could be done within the Kafka
source and multiple input stream operators.  I opened FLINK-4558
 a while ago regarding
this topic.

On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier  wrote:

> I'd be very curious to hear others' thoughts on this..  I would expect many
> people to have run into similar issues.  I also wonder if anybody has
> already been working on similar issues.  It seems there is room for some
> core Flink changes to address this as well and I'm guessing people have
> already thought about it.
>


Re: Sharing state between subtasks

2018-10-10 Thread Elias Levy
On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek 
wrote:

> I think the two things (shared state and new source interface) are
> somewhat orthogonal. The new source interface itself alone doesn't solve
> the problem, we would still need some mechanism for sharing the event-time
> information between different subtasks. This could be the state sharing
> mechanism. Therefore I would say we should not block one on the other and
> therefore should go ahead with state sharing.
>

Is that really the case?  The reason Thomas gave for the request to share
state among subtasks was to implement stream alignment.  If streams can be
aligned, then the given reason for state sharing disappears.  Not to say
there aren't other situations where state sharing could be useful.  It
would have been handy in a number of our jobs.

Also, it's not clear to me that if sources (and multiple streams operators)
were performing time alignment, you'd need some mechanism for sharing
even-time information between subtasks.  Each source and multiple input
operator can perform its own local alignment and back-pressure can take
care of squelching sources that are advancing too fast.


Re: Sharing state between subtasks

2018-10-10 Thread Elias Levy
On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske  wrote:

> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
>
> The problem of tasks that are far ahead in time cannot be solved with
> back-pressure.
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
>

The idea I was suggesting is not for operators to block an input.  Rather,
it is that they selectively choose from which input to process the next
message from based on their timestamp, so long as there are buffered
messages waiting to be processed.  That is a best-effort alignment
strategy.  Seems to work relatively well in practice, at least within Kafka
Streams.

E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
its inputs.  Instead, it could keep them separate and selectively consume
from the one that had a buffer available, and if both have buffers
available, from the buffer with the messages with a lower timestamp.


Re: Sharing state between subtasks

2018-10-09 Thread Elias Levy
On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek  wrote:

> @Elias Do you know if Kafka Consumers do this alignment across multiple
> consumers or only within one Consumer across the partitions that it reads
> from.
>

The behavior is part of Kafka Streams
,
not the Kafka consumer.  The alignment does not occur across Kafka
consumers, but that is because Kafka Streams, unlikely Flink, uses a single
consumer to fetch records from multiple sources / topics.  The alignment
occurs with the stream task.  Stream tasks keep queues per topic-partition
(which may be from different topics), and select the next record to
processed by selecting the queue with the lowest timestamp.

The equivalent in Flink would be for the Kafka connector source to select
the message among partitions with the lowest timestamp to emit next, and
for multiple input stream operators to select the message among inputs with
the lowest timestamp to process.


Re: [DISCUSS] Release Flink 1.5.4 and 1.6.1

2018-09-14 Thread Elias Levy
Any chance we may see a 1.4.3 release with the ZK fixes?

On Fri, Sep 14, 2018 at 8:29 AM Till Rohrmann  wrote:

> Hi everyone,
>
> it has already been a couple of weeks since we released Flink 1.5.3 and
> Flink 1.6.0. In both release branches are important bug fixes from which
> our users can benefit (1.5.4: 23 resolved issues, 1.6.1: 60 resolved
> issues). Therefore, I propose to create the next bug fix release for Flink
> 1.5 and 1.6.
>
> Most notable fixes are: FLINK-10255 (Standby Dispatcher locks submitted
> JobGraphs), FLINK-10193 (Use proper timeout when triggering savepoint),
> FLINK-10314 (ExecutionGraph creation can be blocking) and FLINK-10011(Old
> jobs sometimes get resurrected during HA failover).
>
> I would of course volunteer as release manager and kick off the release
> process once FLINK-10314 has been merged. What do you think?
>
> Cheers,
> Till
>


Re: [DISCUSS] Release Flink 1.5.4 and 1.6.1

2018-09-17 Thread Elias Levy
Probably.  I was hesitant to move to 1.5 without significant testing, as we
run in standalone mode and 1.5 introduced significant changes as a result
of FLIP-6.  But if the community hasn't seen major regressions in 1.5
running in legacy mode, then that may be feasible.

On Mon, Sep 17, 2018 at 1:41 AM Till Rohrmann  wrote:

> Would you be able to upgrade to Flink 1.5.4 or 1.6.1 using Flink's legacy
> mode @Elias? The legacy mode should have the same behaviour as Flink
> 1.4-SNAPSHOT and thus a potential Flink 1.4.3.
>
> I'm a bit hesitant to make another 1.4 release, because in 1.4 we didn't
> have many automated e2e tests. Consequently, the release process would
> require a considerable community effort for testing in order to make sure
> that the release is properly working.
>
> What do you think?
>
> Cheers,
> Till
>
> On Fri, Sep 14, 2018 at 7:10 PM Elias Levy 
> wrote:
>
> > Any chance we may see a 1.4.3 release with the ZK fixes?
> >
> > On Fri, Sep 14, 2018 at 8:29 AM Till Rohrmann 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > it has already been a couple of weeks since we released Flink 1.5.3 and
> > > Flink 1.6.0. In both release branches are important bug fixes from
> which
> > > our users can benefit (1.5.4: 23 resolved issues, 1.6.1: 60 resolved
> > > issues). Therefore, I propose to create the next bug fix release for
> > Flink
> > > 1.5 and 1.6.
> > >
> > > Most notable fixes are: FLINK-10255 (Standby Dispatcher locks submitted
> > > JobGraphs), FLINK-10193 (Use proper timeout when triggering savepoint),
> > > FLINK-10314 (ExecutionGraph creation can be blocking) and
> FLINK-10011(Old
> > > jobs sometimes get resurrected during HA failover).
> > >
> > > I would of course volunteer as release manager and kick off the release
> > > process once FLINK-10314 has been merged. What do you think?
> > >
> > > Cheers,
> > > Till
> > >
> >
>


Re: [Discuss] Semantics of event time for state TTL

2019-04-04 Thread Elias Levy
My 2c:

Timestamp stored with the state value: Event timestamp
Timestamp used to check expiration: Last emitted watermark

That follows the event time processing model used elsewhere is Flink.  E.g.
events are segregated into windows based on their event time, but the
windows do not fire until the watermark advances past the end of the window.


On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin  wrote:

> Hi All,
>
> As you might have already seen there is an effort tracked in FLINK-12005
> [1] to support event time scale for state with time-to-live (TTL) [2].
> While thinking about design, we realised that there can be multiple options
> for semantics of this feature, depending on use case. There is also
> sometimes confusion because of event time out-of-order nature in Flink. I
> am starting this thread to discuss potential use cases of this feature and
> their requirements for interested users and developers. There was already
> discussion thread asking about event time for TTL and it already contains
> some thoughts [3].
>
> There are two semantical cases where we use time for TTL feature at the
> moment. Firstly, we store timestamp of state last access/update. Secondly,
> we use this timestamp and current timestamp to check expiration and garbage
> collect state at some point later.
>
> At the moment, Flink supports *only processing time* for both timestamps:
> state *last access and current timestamp*. It is basically current local
> system unix epoch time.
>
> When it comes to event time scale, we also need to define what Flink should
> use for these two timestamps. Here I will list some options and their
> possible pros for discussion. There might be more depending on use
> case.
>
> *Last access timestamp (stored in backend with the actual state value):*
>
>- *Event timestamp of currently being processed record.* This seems to
>be the simplest option and it allows user-defined timestamps in state
>backend. The problem here might be instability of event time which can
> not
>only increase but also decrease if records come out of order. This can
> lead
>to rewriting the state timestamp to smaller value which is unnatural for
>the notion of time.
>- *Max event timestamp of records seen so far for this record key.* This
>option is similar to the previous one but it tries to fix the notion of
>time to make it always increasing. Maintaining this timestamp has also
>performance implications because the previous timestamp needs to be read
>out to decide whether to rewrite it.
>- *Last emitted watermark*. This is what we usually use for other
>operations to trigger some actions in Flink, like timers and windows
> but it
>can be unrelated to the record which actually triggers the state update.
>
> *Current timestamp to check expiration:*
>
>- *Event timestamp of last processed record.* Again quite simple but
>unpredictable option for out-of-order events. It can potentially lead to
>undesirable expiration of late buffered data in state without control.
>- *Max event timestamp of records seen so far for operator backend.*
> Again
>similar to previous one, more stable but still user does not have too
> much
>control when to expire state.
>- *Last emitted watermark*. Again, this is what we usually use for other
>operations to trigger some actions in Flink, like timers and windows. It
>also gives user some control to decide when state is expired (up to
> which
>point in event time) by emitting certain watermark. It is more flexible
> but
>complicated. If some watermark emitting strategy is already used for
> other
>operations, it might be not optimal for TTL and delay state cleanup.
>- *Current processing time.* This option is quite simple, It would mean
>that user just decides which timestamp to store but it will expire in
> real
>time. For data privacy use case, it might be better because we want
> state
>to be unavailable in particular real moment of time since the associated
>piece of data was created in event time. For long term approximate
> garbage
>collection, it might be not a problem as well. For quick expiration, the
>time skew between event and processing time can lead again to premature
>deletion of late data and user cannot delay it.
>
> We could also make this behaviour configurable. Another option is to make
> time provider pluggable for users. The interface can give users context
> (currently processed record, watermark etc) and ask them which timestamp to
> use. This is more complicated though.
>
> Looking forward for your feedback.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-12005
> [2]
>
> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
> [3]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>


Re: [DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint

2019-03-05 Thread Elias Levy
Apologies for the late reply.

I think this is badly needed, but I fear we are adding complexity by
introducing yet two more stop commands.  We'll have: cancel, stop,
terminate. and suspend.  We basically want to do two things: terminate a
job with prejudice or stop a job safely.

For the former "cancel" is the appropriate term, and should have no need
for a cancel with checkpoint option.  If the job was configured to use
externalized checkpoints and it ran long enough, a checkpoint will be
available for it.

For the later "stop" is the appropriate term, and it means that a job
should process no messages after the checkpoints barrier and that it should
ensure that exactly-once sinks complete their two-phase commits
successfully.  If a savepoint was requested, one should be created.

So in my mind there are two commands, cancel and stop, with appropriate
semantics.  Emitting MAX_WATERMARK before the checkpoint barrier during
stop is merely an optional behavior, like creation of a savepoint.  But if
a specific command for it is desired, then "drain" seems appropriate.

On Tue, Feb 12, 2019 at 9:50 AM Stephan Ewen  wrote:

> Hi Elias!
>
> I remember you brought this missing feature up in the past. Do you think
> the proposed enhancement would work for your use case?
>
> Best,
> Stephan
>
> -- Forwarded message -
> From: Kostas Kloudas 
> Date: Tue, Feb 12, 2019 at 5:28 PM
> Subject: [DISCUSS] FLIP-33: Terminate/Suspend Job with Savepoint
> To: 
>
>
> Hi everyone,
>
>  A commonly used functionality offered by Flink is the
> "cancel-with-savepoint" operation. When applied to the current exactly-once
> sinks, the current implementation of the feature can be problematic, as it
> does not guarantee that side-effects will be committed by Flink to the 3rd
> party storage system.
>
>  This discussion targets fixing this issue and proposes the addition of two
> termination modes, namely:
> 1) SUSPEND, for temporarily stopping the job, e.g. for Flink version
> upgrading in your cluster
> 2) TERMINATE, for terminal shut down which ends the stream and sends
> MAX_WATERMARK time, and flushes any state associated with (event time)
> timers
>
> A google doc with the FLIP proposal can be found here:
>
> https://docs.google.com/document/d/1EZf6pJMvqh_HeBCaUOnhLUr9JmkhfPgn6Mre_z6tgp8/edit?usp=sharing
>
> And the page for the FLIP is here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
>
>  The implementation sketch is far from complete, but it is worth having a
> discussion on the semantics as soon as possible. The implementation section
> is going to be updated soon.
>
>  Looking forward to the discussion,
>  Kostas
>
> --
>
> Kostas Kloudas | Software Engineer
>
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

2019-04-29 Thread Elias Levy
On Fri, Apr 26, 2019 at 8:58 PM vino yang  wrote:

> I agree with your opinion that "*Flink jobs don't sufficiently meet these
> requirements to work as a replacement for a data store.*".  Actually, I
> think it's obviously not Flink's goal.
>

I would not be so sure.  When data Artisans introduced
 Queryable
State in Flink, one of the use cases was explicitly removing the need for
external key-value stores. This mirrored Confluent's earlier

introduction
of Interactive Queries in Kafka Streams, and they certainly saw querying of
streaming state as a possible alternative to traditional data stores.


Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

2019-04-26 Thread Elias Levy
On Fri, Apr 26, 2019 at 1:41 AM vino yang  wrote:

> You are right, currently, the queryable state has few users. And I totally
> agree with you, it makes the streaming works more like a DB.
>

Alas, I don't think queryable state will really be used much in production
other than for ad hoc queries or debugging.  Real data stores at scale are
resilient, replicated, and with very low downtime.  In my opinion, Flink
jobs don't sufficiently meet these requirements to work as a replacement
for a data store.  Jobs too frequently fail and restart because of
checkpoint failures, particularly ones with large state.  And when a job
does restart, all too often local restore can't be used (e.g. if you loose
a node).  And since there is no fine grained job recovery and there is no
hot replicas of the data, all the state will need to be restored from the
DFS, which for something with large state can take a while.  It's a nice
idea, just not realistic in practice.


[jira] [Created] (FLINK-4050) FlinkKafkaProducer API Refactor

2016-06-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4050:
-

 Summary: FlinkKafkaProducer API Refactor
 Key: FLINK-4050
 URL: https://issues.apache.org/jira/browse/FLINK-4050
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Elias Levy


The FlinkKafkaProducer API seems more difficult to use than it should be.  

The API requires you pass it a SerializationSchema or a 
KeyedSerializationSchema, but the Kafka producer already has a serialization 
API.  Requiring a serializer in the Flink API precludes the use of the Kafka 
serializers.  For instance, they preclude the use of the Confluent 
KafkaAvroSerializer class that makes use of the Confluent Schema Registry.  
Ideally, the serializer would be optional, so as to allow the Kafka producer 
serializers to handle the task.

In addition, the KeyedSerializationSchema conflates message key extraction with 
key serialization.  If the serializer were optional, to allow the Kafka 
producer serializers to take over, you'd still need to extract a key from the 
message.

And given that the key may not be part of the message you want to write to 
Kafka, an upstream step may have to package the key with the message to make 
both available to the sink, for instance in a tuple. That means you also need 
to define a method to extract the message to write to Kafka from the element 
passed into the sink by Flink.  

In summary, there should be separation of extraction of the key and message 
from the element passed into the sink from serialization, and the serialization 
step should be optional.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4027:
-

 Summary: FlinkKafkaProducer09 sink can lose messages
 Key: FLINK-4027
 URL: https://issues.apache.org/jira/browse/FLINK-4027
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Elias Levy
Priority: Critical


The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.

The producer is publishing messages asynchronously.  A callback can record 
publishing errors, which will be raised when detected.  But as far as I can 
tell, there is no barrier to wait for async errors from the sink when 
checkpointing or to track the event time of acked messages to inform the 
checkpointing process.

If a checkpoint occurs while there are pending publish requests, and the 
requests return a failure after the checkpoint occurred, those message will be 
lost as the checkpoint will consider them processed by the sink.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3984) Event time of stream transformations is undocumented

2016-05-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3984:
-

 Summary: Event time of stream transformations is undocumented
 Key: FLINK-3984
 URL: https://issues.apache.org/jira/browse/FLINK-3984
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.0.3
Reporter: Elias Levy


The Event Time, Windowing, and DataStream Transformation documentation section 
fail to state what event time, if any, the output of transformations have on a 
stream that is configured to use event time and that has timestamp assigners.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3710) ScalaDocs for org.apache.flink.streaming.scala are missing from the web site

2016-04-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3710:
-

 Summary: ScalaDocs for org.apache.flink.streaming.scala are 
missing from the web site
 Key: FLINK-3710
 URL: https://issues.apache.org/jira/browse/FLINK-3710
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.0.1
Reporter: Elias Levy


The ScalaDocs only include docs for org.apache.flink.scala and sub-packages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3692) Develop a Kafka state backend

2016-04-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-3692:
-

 Summary: Develop a Kafka state backend
 Key: FLINK-3692
 URL: https://issues.apache.org/jira/browse/FLINK-3692
 Project: Flink
  Issue Type: New Feature
  Components: Core
Reporter: Elias Levy


Flink clusters usually consume of a Kafka cluster.  It simplify operations if 
Flink could store its state checkpoints in Kafka.  This should be possibly 
using different topics to write to, partitioning appropriately, and using 
compacted topics.  This would avoid the need to run an HDFS cluster just to 
store Flink checkpoints.

For inspiration you may want to take a look at how Samza checkpoints a task's 
local state to a Kafka topic, and how the newer Kafka consumers checkpoint 
their offsets to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground

2016-08-05 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4326:
-

 Summary: Flink start-up scripts should optionally start services 
on the foreground
 Key: FLINK-4326
 URL: https://issues.apache.org/jira/browse/FLINK-4326
 Project: Flink
  Issue Type: Improvement
  Components: Startup Shell Scripts
Affects Versions: 1.0.3
Reporter: Elias Levy


This has previously been mentioned in the mailing list, but has not been 
addressed.  Flink start-up scripts start the job and task managers in the 
background.  This makes it difficult to integrate Flink with most processes 
supervisory tools and init systems, including Docker.  One can get around this 
via hacking the scripts or manually starting the right classes via Java, but it 
is a brittle solution.

In addition to starting the daemons in the foreground, the start up scripts 
should use exec instead of running the commends, so as to avoid forks.  Many 
supervisory tools assume the PID of the process to be monitored is that of the 
process it first executes, and fork chains make it difficult for the supervisor 
to figure out what process to monitor.  Specifically, jobmanager.sh and 
taskmanager.sh should exec flink-daemon.sh, and flink-daemon.sh should exec 
java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4558) Add support for synchronizing streams

2016-09-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4558:
-

 Summary: Add support for synchronizing streams
 Key: FLINK-4558
 URL: https://issues.apache.org/jira/browse/FLINK-4558
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Elias Levy


As mentioned on the [mailing 
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html],
 there are use cases that require synchronizing two streams on via their times 
and where it is not practical to buffer all messages from one streams while 
waiting for the other to synchronize.  Flink should add functionality to enable 
such use cases.

This could be implemented by modifying TwoInputStreamOperator so that calls to 
processElement1 and processElement2 could return a value indicating that the 
element can't yet be processed, having the framework then pause processing for 
some time, potentially using exponential back off with a hard maximum, and then 
allowing the back pressure system to do its work and pause the stream.

Alternatively, an API could be added to explicitly pause/unpause a stream.

For ease of use either of these mechanism should be used to create a 
SynchronizedTwoInputStreamOperator that end users can utilize by passing a 
configurable time delta to use as a synchronization threshold.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4497:
-

 Summary: Add support for Scala tuples and case classes to 
Cassandra sink
 Key: FLINK-4497
 URL: https://issues.apache.org/jira/browse/FLINK-4497
 Project: Flink
  Issue Type: Improvement
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The new Cassandra sink only supports streams of Flink Java tuples and Java 
POJOs that have been annotated for use by Datastax Mapper.  The sink should be 
extended to support Scala types and case classes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4498) Better Cassandra sink documentation

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4498:
-

 Summary: Better Cassandra sink documentation
 Key: FLINK-4498
 URL: https://issues.apache.org/jira/browse/FLINK-4498
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra sink documentation is somewhat muddled and could be improved.  
For instance, the fact that is only supports tuples and POJO's that use 
DataStax Mapper annotations is only mentioned in passing, and it is not clear 
that the reference to tuples only applies to Flink Java tuples and not Scala 
tuples.  

The documentation also does not mention that setQuery() is only necessary for 
tuple streams.  It would be good to have an example of a POJO stream with the 
DataStax annotations.

The explanation of the write ahead log could use some cleaning up to clarify 
when it is appropriate to use, ideally with an example.  Maybe this would be 
best as a blog post to expand on the type of non-deterministic streams this 
applies to.

It would also be useful to mention that tuple elements will be mapped to 
Cassandra columns using the Datastax Java driver's default encoders, which are 
somewhat limited (e.g. to write to a blob column the type in the tuple must be 
a java.nio.ByteBuffer and not just a byte[]).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4501) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4501:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4501
 URL: https://issues.apache.org/jira/browse/FLINK-4501
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4502:
-

 Summary: Cassandra connector documentation has misleading 
consistency guarantees
 Key: FLINK-4502
 URL: https://issues.apache.org/jira/browse/FLINK-4502
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Elias Levy


The Cassandra connector documentation states that  "enableWriteAheadLog() is an 
optional method, that allows exactly-once processing for non-deterministic 
algorithms."  This claim appears to be false.

>From what I gather, the write ahead log feature of the connector works as 
>follows:
- The sink is replaced with a stateful operator that writes incoming messages 
to the state backend based on checkpoint they belong in.
- When the operator is notified that a Flink checkpoint has been completed it, 
for each set of checkpoints older than and including the committed one:
  * reads its messages from the state backend
  * writes them to Cassandra
  * records that it has committed them to Cassandra for the specific checkpoint 
and operator instance
   * and erases them from the state backend.

This process attempts to avoid resubmitting queries to Cassandra that would 
otherwise occur when recovering a job from a checkpoint and having messages 
replayed.

Alas, this does not guarantee exactly once semantics at the sink.  The writes 
to Cassandra that occur when the operator is notified that checkpoint is 
completed are not atomic and they are potentially non-idempotent.  If the job 
dies while writing to Cassandra or before committing the checkpoint via 
committer, queries will be replayed when the job recovers.  Thus the 
documentation appear to be incorrect in stating this provides exactly-once 
semantics.

There also seems to be an issue in GenericWriteAheadSink's 
notifyOfCompletedCheckpoint which may result in incorrect output.  If 
sendValues returns false because a write failed, instead of bailing, it simply 
moves on to the next checkpoint to commit if there is one, keeping the previous 
one around to try again later.  But that can result in newer data being 
overwritten with older data when the previous checkpoint is retried.  Although 
given that CassandraCommitter implements isCheckpointCommitted as checkpointID 
<= this.lastCommittedCheckpointID, it actually means that when it goes back to 
try the uncommitted older checkpoint it will consider it committed, even though 
some of its data may not have been written out, and the data will be discarded.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4500) Cassandra sink can lose messages

2016-08-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4500:
-

 Summary: Cassandra sink can lose messages
 Key: FLINK-4500
 URL: https://issues.apache.org/jira/browse/FLINK-4500
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.1.0
Reporter: Elias Levy


The problem is the same as I pointed out with the Kafka producer sink 
(FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
both send data asynchronously to Cassandra and record whether an error occurs 
via a future callback.  But CassandraSinkBase does not implement Checkpointed, 
so it can't stop checkpoint from happening even though the are Cassandra 
queries in flight from the checkpoint that may fail.  If they do fail, they 
would subsequently not be replayed when the job recovered, and would thus be 
lost.

In addition, 
CassandraSinkBase's close should check whether there is a pending exception and 
throw it, rather than silently close.  It should also wait for any pending 
async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2017-04-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6243:
-

 Summary: Continuous Joins:  True Sliding Window Joins
 Key: FLINK-6243
 URL: https://issues.apache.org/jira/browse/FLINK-6243
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 1.1.4
Reporter: Elias Levy


Flink defines sliding window joins as the join of elements of two streams that 
share a window of time, where the windows are defined by advancing them forward 
some amount of time that is less than the window time span.  More generally, 
such windows are just overlapping hopping windows. 

Other systems, such as Kafka Streams, support a different notion of sliding 
window joins.  In these systems, two elements of a stream are joined if the 
absolute time difference between the them is less or equal the time window 
length.

This alternate notion of sliding window joins has some advantages in some 
applications over the current implementation.  

Elements to be joined may both fall within multiple overlapping sliding 
windows, leading them to be joined multiple times, when we only wish them to be 
joined once.

The implementation need not instantiate window objects to keep track of stream 
elements, which becomes problematic in the current implementation if the window 
size is very large and the slide is very small.

It allows for asymmetric time joins.  E.g. join if elements from stream A are 
no more than X time behind and Y time head of an element from stream B.

It is currently possible to implement a join with these semantics using 
{{CoProcessFunction}}, but the capability should be a first class feature, such 
as it is in Kafka Streams.

To perform the join, elements of each stream must be buffered for at least the 
window time length.  To allow for large window sizes and high volume of 
elements, the state, possibly optionally, should be buffered such as it can 
spill to disk (e.g. by using RocksDB).

The same stream may be joined multiple times in a complex topology.  As an 
optimization, it may be wise to reuse any element buffer among colocated join 
operators.  Otherwise, there may write amplification and increased state that 
must be snapshotted.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6239) Sharing of State Across Operators

2017-04-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6239:
-

 Summary: Sharing of State Across Operators
 Key: FLINK-6239
 URL: https://issues.apache.org/jira/browse/FLINK-6239
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.4
Reporter: Elias Levy


Currently state cannot be shared across operators.  On a keyed stream, the 
state is implicitly keyed by the operator id, in addition to the stream key.  
This can make it more difficult and inefficient to implement complex 
topologies, where multiple operator may need to access the same state.  It 
would be value to be able to access keyed value and map stated across operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-7364) Log exceptions from user code in streaming jobs

2017-08-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7364:
-

 Summary: Log exceptions from user code in streaming jobs
 Key: FLINK-7364
 URL: https://issues.apache.org/jira/browse/FLINK-7364
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.1
Reporter: Elias Levy


Currently, if an exception arises in user supplied code within an operator in a 
streaming job, Flink terminates the job, but it fails to record the reason for 
the termination.  The logs do not record that there was an exception at all, 
much less recording the type of exception and where it occurred.  This makes it 
difficult to debug jobs without implementing exception recording code on all 
user supplied operators. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7178) Datadog Metric Reporter Jar is Lacking Dependencies

2017-07-13 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7178:
-

 Summary: Datadog Metric Reporter Jar is Lacking Dependencies
 Key: FLINK-7178
 URL: https://issues.apache.org/jira/browse/FLINK-7178
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.3.1
Reporter: Elias Levy


The Datadog metric reporter has dependencies on {{com.squareup.okhttp3}} and 
{{com.squareup.okio}}.  It appears there was an attempt to Maven Shade plug-in 
to move these classes to {{org.apache.flink.shaded.okhttp3}} and 
{{org.apache.flink.shaded.okio}} during packaging.  Alas, the shaded classes 
are not included in the {{flink-metrics-datadog-1.3.1.jar}} released to Maven 
Central.  Using the Jar results in an error when the Jobmanager or Taskmanager 
starts up because of the missing dependencies. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7286) Flink Dashboard fails to display bytes/records received by sources

2017-07-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7286:
-

 Summary: Flink Dashboard fails to display bytes/records received 
by sources
 Key: FLINK-7286
 URL: https://issues.apache.org/jira/browse/FLINK-7286
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.3.1
Reporter: Elias Levy


It appears Flink can't measure the number of bytes read or records produced by 
a source (e.g. Kafka source). This is particularly problematic for simple jobs 
where the job pipeline is chained, and in which there are no measurements 
between operators. Thus, in the UI it appears that the job is not consuming any 
data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6419) Better support for CEP quantified conditions in PatternSelect.select

2017-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6419:
-

 Summary: Better support for CEP quantified conditions in 
PatternSelect.select
 Key: FLINK-6419
 URL: https://issues.apache.org/jira/browse/FLINK-6419
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.3.0
Reporter: Elias Levy
Priority: Minor


Flink 1.3 introduces to the API quantifer methods which allow one to 
declaratively specific how many times a condition must be matched before there 
is a state change.

The pre-existing {{PatternSelect.select}} method does not account for this 
change very well.  The selection function passed to {{select}} receives a 
{{Map[String,T]}} as an argument that permits the function to look up the 
matched events by the condition's name.  

To support the new functionality that permits a condition to match multiple 
elements, when a quantifier is greater than one, the matched events are stored 
in the map by appending the condition's name with an underscore and an index 
value.

While functional, this is less than ideal.  It would be best if conditions with 
quantifier that is a multiple returned the matched events in an array and if 
they were accessible via the condition's name, without have to construct keys 
from the condition's name and an index, and iterate querying the map until no 
more are found. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6420) Cleaner CEP API to specify conditions between events

2017-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6420:
-

 Summary: Cleaner CEP API to specify conditions between events
 Key: FLINK-6420
 URL: https://issues.apache.org/jira/browse/FLINK-6420
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Affects Versions: 1.3.0
Reporter: Elias Levy
Priority: Minor


Flink 1.3 will introduce so-called iterative conditions, which allow the 
predicate to look up events already matched by conditions in the pattern.  This 
permits specifying conditions between matched events, similar to a conditional 
join between tables in SQL.  Alas, the API could be simplified to specify such 
conditions more declaratively.

At the moment you have to do something like
{code}
Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.where({ (next, ctx) =>
  val first = ctx.getEventsForPattern("first").next
  first.bar == next.bar && next => next.boo = "x"
})
{code}
which is not very clean.  It would friendlier if you could do something like:
{code}
Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.relatedTo("first", { (first, next) => first.bar == next.bar })
.where( next => next.boo = "x" )
{code}
Something along these lines would work well when the condition being tested 
against matches a single event (single quantifier).  

If the condition being tested can accept multiple events (e.g. times 
quantifier) two other methods could be used {{relatedToAny}} and 
{{relatedToAll}}, each of which takes a predicate function.  In both cases each 
previously accepted element of the requested condition is evaluated against the 
predicate.  In the former case if any evaluation returns true the condition is 
satisfied.  In the later case all evaluations must return true for the 
condition to be satisfied.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6472) BoundedOutOfOrdernessTimestampExtractor does not bound out of orderliness

2017-05-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-6472:
-

 Summary: BoundedOutOfOrdernessTimestampExtractor does not bound 
out of orderliness
 Key: FLINK-6472
 URL: https://issues.apache.org/jira/browse/FLINK-6472
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Elias Levy


{{BoundedOutOfOrdernessTimestampExtractor}} attempts to emit watermarks that 
lag behind the largest observed timestamp by a configurable time delta.  It 
fails to so in some circumstances.

The class extends {{AssignerWithPeriodicWatermarks}}, which generates 
watermarks in periodic intervals.  The timer for this intervals is a processing 
time timer.

In circumstances where there is a rush of events (restarting Flink, unpausing 
an upstream producer, loading events from a file, etc), many events with 
timestamps much larger that what the configured bound would normally allow will 
be sent downstream without a watermark.  This can have negative effects 
downstream, as operators may be buffering the events waiting for a watermark to 
process them, thus leading the memory growth and possible out-of-memory 
conditions.

It is probably best to have a bounded out of orderliness extractor that is 
based on the punctuated timestamp extractor, so we can ensure that watermarks 
are generated in a timely fashion in event time, with the addition of process 
time timer to generate a watermark if there has been a lull in events, thus 
also bounding the delay of generating a watermark in processing time. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-7634) Add option to create a savepoint while canceling a job in the dashboard

2017-09-16 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7634:
-

 Summary: Add option to create a savepoint while canceling a job in 
the dashboard 
 Key: FLINK-7634
 URL: https://issues.apache.org/jira/browse/FLINK-7634
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


Currently there appears to be no way to trigger a savepoint in the dashboard, 
to cancel a job while taking a savepoint, to list savepoints, or to list 
external checkpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail

2017-09-18 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7641:
-

 Summary: Loss of JobManager in HA mode should not cause jobs to 
fail
 Key: FLINK-7641
 URL: https://issues.apache.org/jira/browse/FLINK-7641
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently if a standalone cluster of JobManagers is configured in 
high-availability mode and the master JM is lost, the job executing in the 
cluster will be restarted.  This is less than ideal.  It would be best if the 
jobs could continue to execute without restarting while one of the spare JMs 
becomes the new master, or in the worse case, the jobs are paused while the JM 
election takes place.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7640) Dashboard should display information about JobManager cluster in HA mode

2017-09-18 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7640:
-

 Summary: Dashboard should display information about JobManager 
cluster in HA mode
 Key: FLINK-7640
 URL: https://issues.apache.org/jira/browse/FLINK-7640
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently the dashboard provides no information about the status of a cluster 
of JobManagers configured in high-availability mode.  

The dashboard should display the status and membership of a JM cluster in the 
Overview and Job Manager sections.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7646) Restart failed jobs with configurable parallelism range

2017-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7646:
-

 Summary: Restart failed jobs with configurable parallelism range
 Key: FLINK-7646
 URL: https://issues.apache.org/jira/browse/FLINK-7646
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.2
Reporter: Elias Levy


Currently, if a TaskManager fails the whole job is terminated and then, 
depending on the restart policy, may be attempted to be restarted.  If the 
failed TaskManager has not been replaced, and there are no spare task slots in 
the cluster, the job will fail to be restarted.

There are situations where restoring or adding a new TaskManager may take a 
while  For instance, in AWS an Auto Scaling Group can only be used to manage a 
group of instances in a single availability zone.  If you have a cluster of 
TaskManagers that spans an AZ, managed by one ASG per AZ, and an AZ goes dark, 
the other ASGs won't scale automatically to make up for the lost TaskManagers.  
To resolve the situation the healthy ASGs will need to be modified manually or 
by systems external to AWS.

With that in mind, it would be useful if you could specify a range for the 
parallelism parameter.  Under normal circumstances the job would execute with 
the maximum parallelism of the range.  But if TaskManagers were lost and not 
replaced after some time, the job would accept being execute with some lower 
parallelism within the range.

I understand that this may not be feasible with checkpoints, as savepoints are 
supposed to be the mechanism used to change parallelism of a stateful job.  
Therefore, this proposal may need to wait until the implementation of the 
periodic savepoint feature (FLINK-4511).

This feature would aid the availability of Flink jobs.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts

2017-09-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7687:
-

 Summary: Clarify the master and slaves files are not necessary 
unless using the cluster start/stop scripts
 Key: FLINK-7687
 URL: https://issues.apache.org/jira/browse/FLINK-7687
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


It would be helpful if the documentation was clearer on the fact that the 
master/slaves config files are not needed when configured in high-availability 
mode unless you are using the provided scripts to start and shutdown the 
cluster over SSH.  If you are using some other mechanism to manage Flink 
instances (configuration management tools such as Chef or Ansible, or container 
management frameworks like Docker Compose or Kubernetes), these files are 
unnecessary.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7722) MiniCluster does not appear to honor Log4j settings

2017-09-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7722:
-

 Summary: MiniCluster does not appear to honor Log4j settings
 Key: FLINK-7722
 URL: https://issues.apache.org/jira/browse/FLINK-7722
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


When executing a job from the command line for testing, it will output logs 
like:

{noformat}
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] 
with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014.
09/27/2017 13:15:13 Job execution switched to status RUNNING.
09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED 
09/27/2017 13:15:13 Source: Collect
{noformat}

It will do so even if the log4j.properties file contains:

{code}
log4j.rootLogger=ERROR, stdout
log4j.logger.org.apache.flink=ERROR
log4j.logger.akka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.hadoop=ERROR
log4j.logger.org.apache.zookeeper=ERROR
{code}

It seems that the MiniCluster does not honor Log4j settings, or at least that 
is my guess.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable

2017-08-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7547:
-

 Summary: o.a.f.s.api.scala.async.AsyncFunction is not declared 
Serializable
 Key: FLINK-7547
 URL: https://issues.apache.org/jira/browse/FLINK-7547
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2
Reporter: Elias Levy
Priority: Minor


{{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared 
{{Serializable}}, whereas 
{{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is.  This 
leads to the job not starting as the as async function can't be serialized 
during initialization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-9272) DataDog API "counter" metric type is deprecated

2018-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9272:
-

 Summary: DataDog API "counter" metric type is deprecated 
 Key: FLINK-9272
 URL: https://issues.apache.org/jira/browse/FLINK-9272
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


It appears to have been replaced by the "count" metric type.

https://docs.datadoghq.com/developers/metrics/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9403) Documentation continues to refer to removed methods

2018-05-20 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9403:
-

 Summary: Documentation continues to refer to removed methods
 Key: FLINK-9403
 URL: https://issues.apache.org/jira/browse/FLINK-9403
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Elias Levy


{{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
{{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
methods.  They were removed in [this 
commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9440) Allow cancelation and reset of timers

2018-05-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9440:
-

 Summary: Allow cancelation and reset of timers
 Key: FLINK-9440
 URL: https://issues.apache.org/jira/browse/FLINK-9440
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.2
Reporter: Elias Levy


Currently the {{TimerService}} allows one to register timers, but it is not 
possible to delete a timer or to reset a timer to a new value.  If one wishes 
to reset a timer, one must also handle the previous inserted timer callbacks 
and ignore them.

I would be useful if the API allowed one to remove and reset timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function

2018-06-15 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9600:
-

 Summary: Add DataStream transformation variants that pass 
timestamp to the user function
 Key: FLINK-9600
 URL: https://issues.apache.org/jira/browse/FLINK-9600
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Elias Levy


It is often necessary to access the timestamp assigned to records within user 
functions.  At the moment this is only possible from {{RichFunction}}. 
Implementing a {{RichFunction}} just to access the timestamp is burdensome, so 
most job carry a duplicate of the timestamp within the record.

It would be useful if {{DataStream}} provided transformation methods that 
accepted user functions that could be passed the record's timestamp as an 
additional argument, similar to how there are two variants of {{flatMap}}, one 
with an extra parameter that gives the user function access to the output 
{{Collector}}.

Along similar lines, it may be useful to have variants that pass the record's 
key as an additional parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9450) Job hangs if S3 access it denied during checkpoints

2018-05-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9450:
-

 Summary: Job hangs if S3 access it denied during checkpoints
 Key: FLINK-9450
 URL: https://issues.apache.org/jira/browse/FLINK-9450
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Elias Levy


We have a streaming job that consumes from and writes to Kafka.  The job is 
configured to checkpoint to S3.  If we deny access to S3 by using iptables on 
the TM host to deny all outgoing connections to ports 80 and 443, whether using 
DROP or REJECT, and whether using REJECT with -reject-with tcp-reset or -r 
reject-with imp-port-unreachable, the job soon stops publishing to Kafka.

This happens whether or not the Kafka sources have 
{{setCommitOffsetsOnCheckpoints}} set to true or false.

The system is configured to use Presto for the S3 file system.  The job has a 
small amount of state, so it is configured to use {{FsStateBackend}} with 
asynchronous snapshots.

If the ip tables rules are removed, the job continues the function.

I would expect the job to either fail or continue running if a checkpoint fails.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes

2018-06-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9495:
-

 Summary: Implement ResourceManager for Kubernetes
 Key: FLINK-9495
 URL: https://issues.apache.org/jira/browse/FLINK-9495
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Affects Versions: 1.5.0
Reporter: Elias Levy


I noticed there is no issue for developing a Kubernetes specific 
ResourceManager under FLIP-6, so I am creating this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-06-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9682:
-

 Summary: Add setDescription to execution environment and display 
it in the UI
 Key: FLINK-9682
 URL: https://issues.apache.org/jira/browse/FLINK-9682
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Webfrontend
Affects Versions: 1.5.0
Reporter: Elias Levy


Currently you can provide a job name to {{execute}} in the execution 
environment.  In an environment where many version of a job may be executing, 
such as a development or test environment, identifying which running job is of 
a specific version via the UI can be difficult unless the version is embedded 
into the job name given the {{execute}}.  But the job name is uses for other 
purposes, such as for namespacing metrics.  Thus, it is not ideal to modify the 
job name, as that could require modifying metric dashboards and monitors each 
time versions change.

I propose a new method be added to the execution environment, 
{{setDescription}}, that would allow a user to pass in an arbitrary description 
that would be displayed in the dashboard, allowing users to distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7935:
-

 Summary: Metrics with user supplied scope variables
 Key: FLINK-7935
 URL: https://issues.apache.org/jira/browse/FLINK-7935
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2
Reporter: Elias Levy


We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
metrics.

Flink names and scopes metrics together, at least by default. E.g. by default  
the System scope for operator metrics is 
{{.taskmanager}}.  The 
scope variables become part of the metric's full name.

In DD the metric would be named something generic, e.g. 
{{taskmanager.job.operator}}, and they would be distinguished by their tag 
values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.

Flink allows you to configure the format string for system scopes, so it is 
possible to set the operator scope format to {{taskmanager.job.operator}}.  We 
do this for all scopes:

{code}
metrics.scope.jm: jobmanager
metrics.scope.jm.job: jobmanager.job
metrics.scope.tm: taskmanager
metrics.scope.tm.job: taskmanager.job
metrics.scope.task: taskmanager.job.task
metrics.scope.operator: taskmanager.job.operator
{code}

This seems to work.  The DataDog Flink metric's plugin submits all scope 
variables as tags, even if they are not used within the scope format.  And it 
appears internally this does not lead to metrics conflicting with each other.

We would like to extend this to user defined metrics, but you can define 
variables/scopes when adding a metric group or metric with the user API, so 
that in DD we have a single metric with a tag with many different values, 
rather than hundreds of metrics to just the one value we want to measure across 
different event types.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8751) Canceling a job results in a InterruptedException in the JM

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8751:
-

 Summary: Canceling a job results in a InterruptedException in the 
JM
 Key: FLINK-8751
 URL: https://issues.apache.org/jira/browse/FLINK-8751
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Canceling a job results in the following exception reported by the JM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8752) ClassNotFoundException when using the user code class loader

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8752:
-

 Summary: ClassNotFoundException when using the user code class 
loader
 Key: FLINK-8752
 URL: https://issues.apache.org/jira/browse/FLINK-8752
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Attempting to submit a job results in the job failing while it is being started 
in the JMs with a ClassNotFoundException error: 
{code:java}
java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
{code}

If I drop the job's jar into the lib folder in the JM and configure the JM to  
classloader.resolve-order to parent-first the job starts successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures

2018-01-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8352:
-

 Summary: Flink UI Reports No Error on Job Submission Failures
 Key: FLINK-8352
 URL: https://issues.apache.org/jira/browse/FLINK-8352
 Project: Flink
  Issue Type: Bug
  Components: Web Client
Affects Versions: 1.4.0
Reporter: Elias Levy


If you submit a job jar via the web UI and it raises an exception when started, 
the UI will report no error and will continue the show the animated image that 
makes it seem as if it is working.  In addition, no error is printed in the 
logs, unless the level is increased to at least DEBUG:

{noformat}
@40005a4c399202b87ebc DEBUG 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
handling request.
@40005a4c399202b8868c java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b88a74   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68)
@40005a4c399202b88e5c   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
@40005a4c399202b8e44c   at 
java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
@40005a4c399202b8e44c   at java.util.concurrent.FutureTask.run(Unknown 
Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
 Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
@40005a4c399202b8f3ec   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
@40005a4c399202b8f7d4   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
@40005a4c399202b8f7d4   at java.lang.Thread.run(Unknown Source)
@40005a4c399202b8fbbc Caused by: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b90b5c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
@40005a4c399202b90f44   at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
@40005a4c399202b90f44   at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
@40005a4c399202b91afc   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57)
@40005a4c399202b91afc   ... 8 more
@40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError
@40005a4c399202b91ee4   at 
com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala)
@40005a4c399202b922cc   at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
@40005a4c399202b92a9c   at 
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92a9c   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92e84   at java.lang.reflect.Method.invoke(Unknown 
Source)
@40005a4c399202b92e84   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
@40005a4c399202b9326c   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
@40005a4c399202b93a3c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
@40005a4c399202b949dc   ... 11 more
@40005a4c399202b949dc Caused by: java.io.FileNotFoundException: 
/data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55
 (No such file or directory)
@40005a4c399202b951ac   at java.io.FileOutputStream.open0(Native Method)
@40005a4c399202b951ac   at java.io.FileOutputStream.open(Unknown Source)
@40005a4c399202b9597c   at java.io.FileOutputStream.(Unknown 
Source)
@40005a4c399202b9597c   at java.io.FileWriter.(Unknown Source)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b9614c   at 
scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
@40005a4c399202b9614c   at scoverage.Invoker$.invoked(Invoker.scala:42)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala:28)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8311) Flink needs documentation for network access control

2017-12-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8311:
-

 Summary: Flink needs documentation for network access control
 Key: FLINK-8311
 URL: https://issues.apache.org/jira/browse/FLINK-8311
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Elias Levy


There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8358) Hostname used by DataDog metric reporter is not configurable

2018-01-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8358:
-

 Summary: Hostname used by DataDog metric reporter is not 
configurable
 Key: FLINK-8358
 URL: https://issues.apache.org/jira/browse/FLINK-8358
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0
Reporter: Elias Levy


The hostname used by the DataDog metric reporter to report metrics is not 
configurable.  This can problematic if the hostname that Flink uses is 
different from the hostname used by the system's DataDog agent.  

For instance, in our environment we use Chef, and using the DataDog Chef 
Handler, certain metadata such a host roles is associated with the hostname in 
the DataDog service.  The hostname used to submit this metadata is the name we 
have given the host.  But as Flink picks up the default name given by EC2 to 
the instance, metrics submitted by Flink to DataDog using that hostname are not 
associated with the tags derived from Chef.

In the Job Manager we can avoid this issue by explicitly setting the config 
{{jobmanager.rpc.address}} to the hostname we desire.  I attempted to do the 
name on the Task Manager by setting the {{taskmanager.hostname}} config, but 
DataDog does not seem to pick up that value.

Digging through the code it seem the DD metric reporter get the hostname from 
the {{TaskManagerMetricGroup}} host variable, which seems to be set from 
{{taskManagerLocation.getHostname}}.  That in turn seems to be by calling 
{{this.inetAddress.getCanonicalHostName()}}, which merely perform a reverse 
lookup on the IP address, and then calling {{NetUtils.getHostnameFromFQDN}} on 
the result.  The later is further problematic because it result is a non-fully 
qualified hostname.

More generally, there seems to be a need to specify the hostname of a JM or TM 
node that be reused across Flink components.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10037:
--

 Summary: Document details event time behavior in a single location
 Key: FLINK-10037
 URL: https://issues.apache.org/jira/browse/FLINK-10037
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.2
Reporter: Elias Levy
Assignee: Elias Levy


A description of event time and watermarks, how they generated, assigned, and 
handled, is spread across many pages in the documentation.  I would be useful 
to have it all in a single place and includes missing information, such as how 
Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10011:
--

 Summary: Old job resurrected during HA failover
 Key: FLINK-10011
 URL: https://issues.apache.org/jira/browse/FLINK-10011
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.2
Reporter: Elias Levy


For the second time we've observed Flink resurrect an old job during JobManager 
high-availability fail over.
h4. Configuration
 * AWS environment
 * Flink 1.4.2 standalong cluster in HA mode
 * 2 JMs, 3 TMs
 * 3 node ZK ensemble
 * 1 job consuming to/from Kafka
 * Checkpoints in S3 using the Presto file system adaptor

h4. Timeline 
 * 15:18:10 JM 2 completes checkpoint 69256.
 * 15:19:10 JM 2 completes checkpoint 69257.
 * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
SocketTimeoutException
 * 15:19:57 ZK 1 closes connection to JM 2 (leader)
 * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
 * 15:19:57 JM 2 reports it can't read data from ZK
 ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
likely server has closed socket, closing socket connection and attempting 
reconnect)}}
 ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
 * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
are not monitored (temporarily).}}
 ** {{Connection to ZooKeeper suspended. The contender 
akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the 
leader election}}{{ }}
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 * 15:19:57 JM 2 gives up leadership
 ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}

 * 15:19:57 JM 2 changes job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
 ** {{Stopping checkpoint coordinator for job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}

 * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
because there is no leader
 ** {{Discard message 
LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
 TaskManager akka://flink/user/taskmanager is disassociating)) because there is 
currently no valid leader id known.}}

 * 15:19:57 JM 2 connects to ZK 2 and renews its session
 ** {{Opening socket connection to server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
 ** {{Socket connection established to 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
 ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.}}
 ** {{Session establishment complete on server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
0x3003f4a0003, negotiated timeout = 4}}
 ** {{Connection to ZooKeeper was reconnected. Leader election can be 
restarted.}}
 ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are 
monitored again.}}
 ** {{State change: RECONNECTED}}

 * 15:19:57: JM 1 reports JM 1 has been granted leadership:
 ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}

 * 15:19:57 JM 2 reports the job has been suspended
 ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting 
down.}}
 ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}

 * 15:19:57 JM 2 reports it has lost leadership:
 ** {{Associated JobManager 
Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
 ** {{Received leader address but not running in leader ActorSystem. Cancelling 
registration.}}

 * 15:19:57 TMs register with JM 1

 * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
 ** {{Attempting to recover all jobs.}}
 ** {{There are 2 jobs to recover. Starting the job recovery.}}
 ** {{Attempting to recover job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
 ** {{Attempting to recover job 
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}

 * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 
Error:KeeperErrorCode = NodeExists for 
/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1

[jira] [Created] (FLINK-10098) Programmatically select timer storage backend

2018-08-07 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10098:
--

 Summary: Programmatically select timer storage backend
 Key: FLINK-10098
 URL: https://issues.apache.org/jira/browse/FLINK-10098
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Streaming, TaskManager
Affects Versions: 1.6.0, 1.7.0
Reporter: Elias Levy


FLINK-9486 introduced timer storage on the RocksDB storage backend.  Right now 
it is only possible to configure RocksDB as the storage for timers by settings 
the {{state.backend.rocksdb.timer-service.factory}} value in the configuration 
file for Flink.

As the state storage backend can be programmatically selected by by jobs via  
{{env.setStateBackend(...)}}, the timer backend should also be configurable 
programmatically.

Different jobs should be able to store their timers in different storage 
backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10117) REST API for Queryable State

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10117:
--

 Summary: REST API for Queryable State
 Key: FLINK-10117
 URL: https://issues.apache.org/jira/browse/FLINK-10117
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State, REST
Affects Versions: 1.6.0
Reporter: Elias Levy


At the moment, queryable state requires a JVM based client that can make use of 
the Java queryable state client API in flink-queryable-state-client artifact.  
In addition, the client requires a state descriptor matching the queried state, 
which tightly couples the Flink job and query state clients.

I propose that queryable state become accessible via a REST API.  FLINK-7040 
mentions this possibility, but does not specify work towards that goal.

I suggest that to enable queryable state over REST, users define JSON 
serializers via the state descriptors.  

This would allow queryable state clients to be developed in any language, not 
require them to use a Flink client library, and permit them to be loosely 
coupled with the job, as they could generically parse the returned JSON.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10118) Queryable state MapState entry query

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10118:
--

 Summary: Queryable state MapState entry query
 Key: FLINK-10118
 URL: https://issues.apache.org/jira/browse/FLINK-10118
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Affects Versions: 1.6.0
Reporter: Elias Levy


Queryable state allows querying of keyed MapState, but such a query returns all 
MapState entries for the given key.  In some cases, such MapState many include 
substantial number of entries (in the millions), while the user may only be 
interested in one entry.

I propose we allow queries for MapState to provide one or more map entry keys, 
in addition to the state key, and to only return entries for the given map keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9731:
-

 Summary: Kafka source subtask begins to consume from earliest 
offset
 Key: FLINK-9731
 URL: https://issues.apache.org/jira/browse/FLINK-9731
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Elias Levy


On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs show no 
indication of anything amiss. There were no errors in the or Kafka related 
messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears

[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-03-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8886:
-

 Summary: Job isolation via scheduling in shared cluster
 Key: FLINK-8886
 URL: https://issues.apache.org/jira/browse/FLINK-8886
 Project: Flink
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.5.0
Reporter: Elias Levy


Flink's TaskManager executes tasks from different jobs within the same JMV as 
threads.  We prefer to isolate different jobs on their on JVM.  Thus, we must 
use different TMs for different jobs.  As currently the scheduler will allocate 
task slots within a TM to tasks from different jobs, that means we must stand 
up one cluster per job.  This is wasteful, as it requires at least two 
JobManagers per cluster for high-availability, and the JMs have low utilization.

Additionally, different jobs may require different resources.  Some jobs are 
compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
the scheduler threats all TMs are equivalent, except possibly in their number 
of available task slots.  Thus, one is required to stand up multiple cluster if 
there is a need for different types of TMs.

 

It would be useful if one could specify requirements on job, such that they are 
only scheduled on a subset of TMs.  Properly configured, that would permit 
isolation of jobs in a shared cluster and scheduling of jobs with specific 
resource needs.

 

One possible implementation is to specify a set of tags on the TM config file 
which the TMs used when registering with the JM, and another set of tags 
configured within the job or supplied when submitting the job.  The scheduler 
could then match the tags in the job with the tags in the TMs.  In a 
restrictive mode the scheduler would assign a job task to a TM only if all tags 
match.  In a relaxed mode the scheduler could assign a job task to a TM if 
there is a partial match, while giving preference to a more accurate match.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API

2018-03-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8844:
-

 Summary: Export job jar file name or job version property via REST 
API
 Key: FLINK-8844
 URL: https://issues.apache.org/jira/browse/FLINK-8844
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.3
Reporter: Elias Levy


To aid automated deployment of jobs, it would be useful if the REST API exposed 
either a running job's jar filename or a version property the job could set, 
similar to how it sets the job name.

As it is now there is no standard mechanism to determine what version of a job 
is running in a cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10617:
--

 Summary: Restoring job fails because of slot allocation timeout
 Key: FLINK-10617
 URL: https://issues.apache.org/jira/browse/FLINK-10617
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager, TaskManager
Affects Versions: 1.6.1
Reporter: Elias Levy


The following may be related to FLINK-9932, but I am unsure.  If you believe it 
is, go ahead and close this issue and a duplicate.

While trying to test local state recovery on a job with large state, the job 
failed to be restored because slot allocation timed out.

The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
per node).  The job has parallelism of 96, so it consumes all of the slots, and 
has ~200 GB of state in RocksDB.  

To test local state recovery I decided to kill one of the TMs.  The TM 
immediately restarted and re-registered with the JM.  I confirmed the JM showed 
96 registered task slots.
{noformat}
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Resolved ResourceManager address, beginning registration
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Registration at ResourceManager attempt 1 (timeout=100ms)
21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Successful registration at resource manager 
akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 
302988dea6afbd613bb2f96429b65d18.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,668 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Registration at JobManager attempt 1 (timeout=100ms)
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,688 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,688 INFO

[jira] [Created] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-04 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10493:
--

 Summary: Macro generated CaseClassSerializer considered harmful
 Key: FLINK-10493
 URL: https://issues.apache.org/jira/browse/FLINK-10493
 Project: Flink
  Issue Type: Bug
  Components: Scala API, State Backends, Checkpointing, Type 
Serialization System
Affects Versions: 1.5.4, 1.6.1, 1.6.0, 1.5.3, 1.5.2, 1.5.1, 1.4.2, 1.4.1, 
1.4.0
Reporter: Elias Levy


The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
and {{TypeSerializer}} objects for types.  In the case of Scala tuple and case 
classes, the macro generates an [anonymous {{CaseClassSerializer}} 
class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
  

The Scala compiler will generate a name for the anonymous class that depends on 
the relative position in the code of the macro invocation to other anonymous 
classes.  If the code is changed such that the anonymous class relative 
position changes, even if the overall logic of the code or the type in question 
do not change, the name of the serializer class will change.

That will result in errors, such as the one below, if the job is restored from 
a savepoint, as the serializer to read the data in the savepoint will no longer 
be found, as its name will have changed.

At the very least, there should be a prominent warning in the documentation 
about this issue.  Minor code changes can result in jobs that can't restore 
previous state.  Ideally, the use of anonymous classes should be deprecated if 
possible.

{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass

[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-10-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10520:
--

 Summary: Job save points REST API fails unless parameters are 
specified
 Key: FLINK-10520
 URL: https://issues.apache.org/jira/browse/FLINK-10520
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Elias Levy


The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
unless the request includes a body with all parameters ({{target-directory}} 
and {{cancel-job}})), even thought the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
 suggests these are optional.

If a POST request with no data is made, the response is a 400 status code with 
the error message "Bad request received."

If the POST request submits an empty JSON object ( {} ), the response is a 400 
status code with the error message "Request did not match expected format 
SavepointTriggerRequestBody."  The same is true if only the 
{{target-directory}} or {{cancel-job}} parameters are included.

As the system is configured with a default savepoint location, there shouldn't 
be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled

2018-10-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10483:
--

 Summary: Can't restore from a savepoint even with Allow Non 
Restored State enabled
 Key: FLINK-10483
 URL: https://issues.apache.org/jira/browse/FLINK-10483
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Type Serialization System
Affects Versions: 1.4.2
Reporter: Elias Levy


A trimmed streaming job fails a restore from a savepoint with an Unloadable 
class for type serializer error, even though the case class in question has 
been eliminated from the job and Allow Non Restored State is enabled.

We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, 
one of the streams is processed by an async function, and the output of the 
async function and the other original stream are consumed by a 
CoProcessOperator, that intern emits Scala case class instances, that go into a 
stateful ProcessFunction filter, and then into a sink.  I.e.

{code:java}
source 1 -> async function --\
   |---> co process --> process --> 
sink
source 2 --/
{code}

I eliminated most of the DAG, leaving only the source 1 --> async function 
portion of it.  This removed the case class in question from the processing 
graph.  When I try to restore from the savepoint, even if Allow Non Restored 
State is selected, the job fails to restore with the error "Deserialization of 
serializer erroed".

This is the error being generated:


{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Cla

[jira] [Created] (FLINK-10460) DataDog reporter JsonMappingException

2018-09-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10460:
--

 Summary: DataDog reporter JsonMappingException
 Key: FLINK-10460
 URL: https://issues.apache.org/jira/browse/FLINK-10460
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


Observed the following error in the TM logs this morning:


{code:java}
WARN  org.apache.flink.metrics.datadog.DatadogHttpReporter  - Failed 
reporting metrics to Datadog.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 (was java.util.ConcurrentModificationException) (through reference chain: 
org.apache.flink.metrics.datadog.DSeries["series"]->
java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"])
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79)
   at 
org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143)
  at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
   at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
   at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown
 Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   at java.lang.Thread.run(Unknown Source)
 Caused by: java.util.ConcurrentModificationException
   at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
   at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
   at java.util.AbstractCollection.addAll(Unknown Source)
   at java.util.HashSet.(Unknown Source)
   at 
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
   at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
   at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
  at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
  at org.apache.flink.metrics.datadog.DGauge.get

[jira] [Created] (FLINK-10372) There is no API to configure the timer state backend

2018-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10372:
--

 Summary: There is no API to configure the timer state backend
 Key: FLINK-10372
 URL: https://issues.apache.org/jira/browse/FLINK-10372
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Elias Levy


Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB 
instead of the heap.  Alas, this can only be configured via the 
{{state.backend.rocksdb.timer-service.factory}} config file option.  That means 
that the choice of state backend to use for timer can't be made on a per job 
basis on a shared cluster.

There is a need for an API in {{RocksDBStateBackend}} to configure the backend 
per job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning

2018-09-21 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10390:
--

 Summary: DataDog metric reporter leak warning
 Key: FLINK-10390
 URL: https://issues.apache.org/jira/browse/FLINK-10390
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.6.1
Reporter: Elias Levy


After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings 
associated with the DataDog metrics reporter:
{quote}Sep 21, 2018 9:43:20 PM 
org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A 
connection to https://app.datadoghq.com/ was leaked. Did you forget to close a 
response body? To see where this was allocated, set the OkHttpClient logger 
level to FINE: 
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
{quote}
The metric reporter's okhttp dependency version (3.7.0) has not changed, so 
that does not appear to be the source of the warning.

I believe the issue is the changed made in 
[FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b].
  The HTTP calls were made async.  The previous code called 
{{client.newCall(r).execute().close()}}.  The new call does nothing in the 
callback, even thought the [Callback.onResponse 
documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-]
 states:

bq. Called when the HTTP response was successfully returned by the remote 
server. The callback may proceed to read the response body with Response.body. 
The response is still live until its response body is closed. 



 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12024) Bump universal Kafka connector to Kafka dependency to 2.2.0

2019-03-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-12024:
--

 Summary: Bump universal Kafka connector to Kafka dependency to 
2.2.0
 Key: FLINK-12024
 URL: https://issues.apache.org/jira/browse/FLINK-12024
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Elias Levy


Update the Kafka client dependency to version 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11794) Allow compression of row format files created by StreamingFileSink

2019-03-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11794:
--

 Summary: Allow compression of row format files created by 
StreamingFileSink
 Key: FLINK-11794
 URL: https://issues.apache.org/jira/browse/FLINK-11794
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.7.2
Reporter: Elias Levy


Currently, there is no mechanism to compress files created using a 
StreamingFileSink.  This is highly desirable when output is a text based row 
format such as JSON.

Possible alternatives are the introduction of a callback that gets passed the 
local file before it is uploaded to the DFS, so that it could be compressed; or 
a factory method could be used that returns an OutputStream, such as 
GZIPOutputStream, that compresses a passed in output stream that could be then 
used by the Encoder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11520) Triggers should be provided the window state

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11520:
--

 Summary: Triggers should be provided the window state
 Key: FLINK-11520
 URL: https://issues.apache.org/jira/browse/FLINK-11520
 Project: Flink
  Issue Type: Improvement
Reporter: Elias Levy


Some triggers may require access to the window state to perform their job.  
Consider a window computing a count using an aggregate function.  It may be 
desired to fire the window when the count is 1 and then at the end of the 
window.  The early firing can provide feedback to external systems that a key 
has been observed, while waiting for the final count.

The same problem can be observed in 
org.apache.flink.streaming.api.windowing.triggers.CountTrigger, which must 
maintain an internal count instead of being able to make use of the window 
state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11517) Inefficient window state access when using RocksDB state backend

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11517:
--

 Summary: Inefficient window state access when using RocksDB state 
backend
 Key: FLINK-11517
 URL: https://issues.apache.org/jira/browse/FLINK-11517
 Project: Flink
  Issue Type: Bug
Reporter: Elias Levy


When using an aggregate function on a window with a process function and the 
RocksDB state backend, state access is inefficient.

The WindowOperator calls windowState.add to merge the new element using the 
aggregate function.  The add method of RocksDBAggregatingState will read the 
state, deserialize the state, call the aggregate function, deserialize the 
state, and write it out.

If the trigger decides the window must be fired, as the the windowState.add 
does not return the state, the WindowOperator must call windowState.get to get 
it and pass it to the window process function, resulting in another read and 
deserialization.

Finally, while the state is not passed in to the trigger, in some cases the 
trigger may have a need to access the state.  That is our case.  As the state 
is not passed to the trigger, we must read and deserialize the state one more 
from within the trigger.

Thus, state must be read and deserialized three times to process a single 
element.  If the state is large, this can be quite costly.

 

Ideally  windowState.add would return the state, so that the WindowOperator can 
pass it to the process function without having to read it again.  Additionally, 
the state would be made available to the trigger to enable more use cases 
without having to go through the state descriptor again.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)