Remote Stateful Function Scalability

2020-10-17 Thread Elias Levy
After reading the Stateful Functions documentation, I am left wondering how
remote stateful functions scale.

The documentation mentions that the use of remote functions allows the
state and compute tiers to scale independently. But the documentation seems
to imply that only a single instance of a function type can execute at a
time per worker ("*When an application starts, each parallel worker of the
framework will create one physical object per function type. This object
will be used to execute all logical instances of that type that are run by
that particular worker.*") That would seem to tie and limit the parallelism
of the compute layer to that of the storage layer even when using remote
functions.

Can a worker execute multiple concurrent remote stateful functions of
different types?

Can a worker execute multiple concurrent remote stateful functions of the
same type with different keys?

If a worker can execute multiple concurrent remote stateful functions of
the same type with different keys, does it ensure their output is ordered
like its inputs?


Re: Kafka Schema registry

2019-09-12 Thread Elias Levy
Just for a Kafka source:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema


   - There is also a version of this schema available that can lookup the
   writer’s schema (schema which was used to write the record) in Confluent
   Schema Registry
   .
   Using these deserialization schema record will be read with the schema that
   was retrieved from Schema Registry and transformed to a statically
   provided( either through
   ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or
   ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).


On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard 
wrote:

> Hi.
> Do Flink have out of the Box Support for Kafka Schema registry for both
> sources and sinks?
> If not, does anyone knows about a implementation we can build on so we can
> help make it general available in a future release.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: Scylla connector

2019-08-13 Thread Elias Levy
Scylla is protocol compatible with Cassandra, so you can just use the
Cassandra connector. Scylla has extended the Go gocql package to make it
shard aware, but such an extension does not exist for the Cassandra Java
driver.  That just means that the driver will sent requests to any shard on
a node, rather than to the specific shard that will process the request,
resulting in slightly higher load.

On Sun, Aug 11, 2019 at 11:06 PM Lian Jiang  wrote:

> Hi,
>
> i am new to Flink. Is there scylla connector equivalent to the cassandra
> connector:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/cassandra.html?
> Or can Flink use Scylla as a sink via the cassandra connector? Thanks.
>


Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-05 Thread Elias Levy
There is no such concept in Flink.  Flink tracks offsets in its
checkpoints.  It can optionally commit offsets to Kafka, but that is only
for reporting purposes.  If you wish to lower the number of records that
get reprocessed in the case of a restart, then you must lower the
checkpoint interval.

On Tue, Jun 4, 2019 at 10:47 AM wang xuchen  wrote:

>
> Hi Flink users,
>
> When # of Kafka consumers  = # of partitions, and I use
> setParallelism(>1), something like this
>
> 'messageSteam.rebalance().map(lamba).setParallelism(3).print()'
>
> How do I tune # of outstanding uncommitted offset? Something similar to
>
> https://storm.apache.org/releases/1.1.2/storm-kafka-client.html in Storm.
>
> Thanks
> Ben
>


Re: Flink vs KStreams

2019-05-21 Thread Elias Levy
My 2c:

KStreams:

Pros:
* Streaming as a library:  No need to submit your job to a cluster.  Easy
to scale up/down the job by adding or removing workers.
* Streaming durability: State is durably stored in Kafka topics in a
streaming fashion. Durability is amortized across the job's lifetime.
* No dependencies other than Kafka:  If you are already a Kafka shop, then
there are no additional dependencies.

Cons:
* Intra-job traffic goes flows through Kafka:  This increases the workload
of your Kafka traffic.
* State is stored in Kafka:  This increases the workload of your Kafka
cluster, particularly as they are compacted topics.
* Tight Kafka coupling:  If you are not a Kafka shop or some other message
broker would serve you better.
* Parallelism is limited by the number of topic partitions: This is a side
effect of the last two issues.

Flink:

Pros:
* Intra-job traffic flows directly between workers.
* More mature.
* Higher-level constructs: SQL, CEP, etc.

Cons:
* Requires a cluster to submit a job to:  You can't just have some jar and
run it.  You need either a stand alone or YARN cluster to submit the job
to.  This makes initial deployment and job deployment complicated.  There
is some work to alleviate this (e.g. embedding the job with Flink in a
container), but its still nowhere as easy as KStreams.
* Checkpointed durability:  State is durability stored in a distributed
filesystem in a checkpoint fashion.  If you have a lot of state,
checkpointing will intermittently drop the performance of the job and
create very spiky network traffic.


On Mon, May 20, 2019 at 3:30 PM Peter Groesbeck 
wrote:

> Hi folks,
>
> I'm hoping to get some deeper clarification on which framework, Flink or
> KStreams, to use in a given scenario. I've read over the following blog
> article which I think sets a great baseline understanding of the
> differences between those frameworks but I would like to get some outside
> opinions:
>
> https://www.confluent.io/blog/apache-flink-apache-kafka-streams-comparison-guideline-users/
>
> My understanding of this article is that KStreams works well as an
> embedded library in a microservice, API layer, or as a standalone
> application at a company with centralized deployment infrastructure, such
> as a shared Kubernetes cluster.
>
> In this case, there is discussion around deploying KStreams as a
> standalone application stack backed by EC2 or ECS, and whether or not Flink
> is better suited to serve as the data transformation layer. We already do
> run Flink applications on EMR.
>
> The point against Flink is that it requires a cluster whereas KStreams
> does not, and can be deployed on ECS or EC2. We do not have a centralized
> deployment team, and will still have to maintain either the CNF
> Stack/AutoScaling Group or EMR Cluster ourselves.
>
> What are some of the advantages of using Flink over KStreams standalone?
>
> The Job management UI is one that comes to mind, and another are some of
> the more advanced API options such as CEP. But I would really love to hear
> the opinions of people who are familiar with both. In what scenarios would
> you choose one over the other? Is it advisable or even preferable to
> attempt to deploy KStreams as it's own stack and avoid the complexity of
> maintaining a cluster?
>
> Thanks,
> Peter
>


Re: [Discuss] Semantics of event time for state TTL

2019-04-08 Thread Elias Levy
;> To sum up:
>> last-access-timestamp: event-time of event
>> expiration-check-time: processing-time
>>
>> What do you think?
>>
>> Aljoscha
>>
>> > On 6. Apr 2019, at 01:30, Konstantin Knauf 
>> wrote:
>> >
>> > Hi Andrey,
>> >
>> > I agree with Elias. This would be the most natural behavior. I wouldn't
>> add
>> > additional slightly different notions of time to Flink.
>> >
>> > As I can also see a use case for the combination
>> >
>> > * Timestamp stored: Event timestamp
>> > * Timestamp to check expiration: Processing Time
>> >
>> > we could (maybe in a second step) add the possibility to mix and match
>> time
>> > characteristics for both aspects.
>> >
>> > Cheers,
>> >
>> > Konstantin
>> >
>> > On Thu, Apr 4, 2019 at 7:59 PM Elias Levy 
>> > wrote:
>> >
>> >> My 2c:
>> >>
>> >> Timestamp stored with the state value: Event timestamp
>> >> Timestamp used to check expiration: Last emitted watermark
>> >>
>> >> That follows the event time processing model used elsewhere is Flink.
>> >> E.g. events are segregated into windows based on their event time, but
>> the
>> >> windows do not fire until the watermark advances past the end of the
>> window.
>> >>
>> >>
>> >> On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin 
>> >> wrote:
>> >>
>> >>> Hi All,
>> >>>
>> >>> As you might have already seen there is an effort tracked in
>> FLINK-12005
>> >>> [1] to support event time scale for state with time-to-live (TTL) [2].
>> >>> While thinking about design, we realised that there can be multiple
>> >>> options
>> >>> for semantics of this feature, depending on use case. There is also
>> >>> sometimes confusion because of event time out-of-order nature in
>> Flink. I
>> >>> am starting this thread to discuss potential use cases of this
>> feature and
>> >>> their requirements for interested users and developers. There was
>> already
>> >>> discussion thread asking about event time for TTL and it already
>> contains
>> >>> some thoughts [3].
>> >>>
>> >>> There are two semantical cases where we use time for TTL feature at
>> the
>> >>> moment. Firstly, we store timestamp of state last access/update.
>> Secondly,
>> >>> we use this timestamp and current timestamp to check expiration and
>> >>> garbage
>> >>> collect state at some point later.
>> >>>
>> >>> At the moment, Flink supports *only processing time* for both
>> timestamps:
>> >>> state *last access and current timestamp*. It is basically current
>> local
>> >>> system unix epoch time.
>> >>>
>> >>> When it comes to event time scale, we also need to define what Flink
>> >>> should
>> >>> use for these two timestamps. Here I will list some options and their
>> >>> possible pros for discussion. There might be more depending on
>> use
>> >>> case.
>> >>>
>> >>> *Last access timestamp (stored in backend with the actual state
>> value):*
>> >>>
>> >>>   - *Event timestamp of currently being processed record.* This seems
>> to
>> >>>   be the simplest option and it allows user-defined timestamps in
>> state
>> >>>   backend. The problem here might be instability of event time which
>> can
>> >>> not
>> >>>   only increase but also decrease if records come out of order. This
>> can
>> >>> lead
>> >>>   to rewriting the state timestamp to smaller value which is unnatural
>> >>> for
>> >>>   the notion of time.
>> >>>   - *Max event timestamp of records seen so far for this record key.*
>> >>> This
>> >>>   option is similar to the previous one but it tries to fix the
>> notion of
>> >>>   time to make it always increasing. Maintaining this timestamp has
>> also
>> >>>   performance implications because the previous timestamp needs to be
>> >>> read
>> >>>   out to decide whether to rewrite it.
>> >>>   - *Last emitted watermark*. This is w

Re: [Discuss] Semantics of event time for state TTL

2019-04-04 Thread Elias Levy
My 2c:

Timestamp stored with the state value: Event timestamp
Timestamp used to check expiration: Last emitted watermark

That follows the event time processing model used elsewhere is Flink.  E.g.
events are segregated into windows based on their event time, but the
windows do not fire until the watermark advances past the end of the window.


On Thu, Apr 4, 2019 at 7:55 AM Andrey Zagrebin  wrote:

> Hi All,
>
> As you might have already seen there is an effort tracked in FLINK-12005
> [1] to support event time scale for state with time-to-live (TTL) [2].
> While thinking about design, we realised that there can be multiple options
> for semantics of this feature, depending on use case. There is also
> sometimes confusion because of event time out-of-order nature in Flink. I
> am starting this thread to discuss potential use cases of this feature and
> their requirements for interested users and developers. There was already
> discussion thread asking about event time for TTL and it already contains
> some thoughts [3].
>
> There are two semantical cases where we use time for TTL feature at the
> moment. Firstly, we store timestamp of state last access/update. Secondly,
> we use this timestamp and current timestamp to check expiration and garbage
> collect state at some point later.
>
> At the moment, Flink supports *only processing time* for both timestamps:
> state *last access and current timestamp*. It is basically current local
> system unix epoch time.
>
> When it comes to event time scale, we also need to define what Flink should
> use for these two timestamps. Here I will list some options and their
> possible pros for discussion. There might be more depending on use
> case.
>
> *Last access timestamp (stored in backend with the actual state value):*
>
>- *Event timestamp of currently being processed record.* This seems to
>be the simplest option and it allows user-defined timestamps in state
>backend. The problem here might be instability of event time which can
> not
>only increase but also decrease if records come out of order. This can
> lead
>to rewriting the state timestamp to smaller value which is unnatural for
>the notion of time.
>- *Max event timestamp of records seen so far for this record key.* This
>option is similar to the previous one but it tries to fix the notion of
>time to make it always increasing. Maintaining this timestamp has also
>performance implications because the previous timestamp needs to be read
>out to decide whether to rewrite it.
>- *Last emitted watermark*. This is what we usually use for other
>operations to trigger some actions in Flink, like timers and windows
> but it
>can be unrelated to the record which actually triggers the state update.
>
> *Current timestamp to check expiration:*
>
>- *Event timestamp of last processed record.* Again quite simple but
>unpredictable option for out-of-order events. It can potentially lead to
>undesirable expiration of late buffered data in state without control.
>- *Max event timestamp of records seen so far for operator backend.*
> Again
>similar to previous one, more stable but still user does not have too
> much
>control when to expire state.
>- *Last emitted watermark*. Again, this is what we usually use for other
>operations to trigger some actions in Flink, like timers and windows. It
>also gives user some control to decide when state is expired (up to
> which
>point in event time) by emitting certain watermark. It is more flexible
> but
>complicated. If some watermark emitting strategy is already used for
> other
>operations, it might be not optimal for TTL and delay state cleanup.
>- *Current processing time.* This option is quite simple, It would mean
>that user just decides which timestamp to store but it will expire in
> real
>time. For data privacy use case, it might be better because we want
> state
>to be unavailable in particular real moment of time since the associated
>piece of data was created in event time. For long term approximate
> garbage
>collection, it might be not a problem as well. For quick expiration, the
>time skew between event and processing time can lead again to premature
>deletion of late data and user cannot delay it.
>
> We could also make this behaviour configurable. Another option is to make
> time provider pluggable for users. The interface can give users context
> (currently processed record, watermark etc) and ask them which timestamp to
> use. This is more complicated though.
>
> Looking forward for your feedback.
>
> Best,
> Andrey
>
> [1] https://issues.apache.org/jira/browse/FLINK-12005
> [2]
>
> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM
> [3]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-TTL-in-Flink-1-6-0-td22509.html
>


Re: What should I take care if I enable object reuse

2019-03-15 Thread Elias Levy
That's certainly the safe thing to do, but if you do not mutate the object,
a copy is not strictly necessary.



On Thu, Mar 14, 2019 at 9:19 PM Kurt Young  wrote:

> Keep one thing in mind: if you want the element remains legal after the
> function call ends (maybe map(), flatmap(), depends on what you are using),
> you should copy the elements.
> Typical scenarios includes:
> 1. Save the elements into some collection like array, list, map for later
> usage, you should copy it explicitly.
> 2. Pass the element into some async calls, you should copy it.
>
> Best,
> Kurt
>
>
> On Fri, Mar 15, 2019 at 8:45 AM yinhua.dai 
> wrote:
>
>> Hi Elias,
>>
>> Thanks.
>> Would it be good enough as long as we use always use different object when
>> call the Collector.collect() method in the operator?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: using updating shared data

2019-01-06 Thread Elias Levy
That is not fully correct.  While in practice it may not matter, ignoring
the timestamp of control messages may result in non-deterministic behavior,
as during a restart the control message may be processed in a different
order in relation to the other stream.  So the output of multiple runs may
differ depending in the relative order the messages in the stream are
processed in.

On Sun, Jan 6, 2019 at 12:36 AM Avi Levi  wrote:

> Sounds like a good idea. because in the control stream the time doesn't
> really matters. Thanks !!!
>
> On Fri, Jan 4, 2019 at 11:13 AM David Anderson 
> wrote:
>
>> Another solution to the watermarking issue is to write an
>> AssignerWithPeriodicWatermarks for the control stream that always returns
>> Watermark.MAX_WATERMARK as the current watermark. This produces watermarks
>> for the control stream that will effectively be ignored.
>>
>> On Thu, Jan 3, 2019 at 9:18 PM Avi Levi  wrote:
>>
>>> Thanks for the tip Elias!
>>>
>>> On Wed, Jan 2, 2019 at 9:44 PM Elias Levy 
>>> wrote:
>>>
>>>> One thing you must be careful of, is that if you are using event time
>>>> processing, assuming that the control stream will only receive messages
>>>> sporadically, is that event time will stop moving forward in the operator
>>>> joining the streams while the control stream is idle.  You can get around
>>>> this by using a periodic watermark extractor one the control stream that
>>>> bounds the event time delay to processing time or by defining your own low
>>>> level operator that ignores watermarks from the control stream.
>>>>
>>>> On Wed, Jan 2, 2019 at 8:42 AM Avi Levi 
>>>> wrote:
>>>>
>>>>> Thanks Till I will defiantly going to check it. just to make sure that
>>>>> I got you correctly. you are suggesting the the list that I want to
>>>>> broadcast will be broadcasted via control stream and it will be than be
>>>>> kept in the relevant operator state correct ? and updates (CRUD) on that
>>>>> list will be preformed via the control stream. correct ?
>>>>> BR
>>>>> Avi
>>>>>
>>>>> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> you could use Flink's broadcast state pattern [1]. You would need to
>>>>>> use the DataStream API but it allows you to have two streams (input and
>>>>>> control stream) where the control stream is broadcasted to all sub tasks.
>>>>>> So by ingesting messages into the control stream you can send model 
>>>>>> updates
>>>>>> to all sub tasks.
>>>>>>
>>>>>> [1]
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_stream_state_broadcast-5Fstate.html=DwQFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=uITdFlQPKLbqxkTux4nR21JhUpLIkS5Pdfi9D_ZSUwE=>
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>>>>>
>>>>>>> Im trying to understand  your  use case.
>>>>>>> What is the source  of the data ? FS ,KAFKA else ?
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I have a list (couple of thousands text lines) that I need to use
>>>>>>>> in my map function. I read this article about broadcasting
>>>>>>>> variables
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23broadcast-2Dvariables=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=U3vGeHdL9fGDfP0GNZUkGpSlcVLz9CNLg2MXNwHP0_M=>
>>>>>>>>  or
>>>>>>>> using distributed cache
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_batch_-23distributed-2Dcache=DwMFaQ=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM=dpWtkT5FJRWFqDA3MAnB4-dRYGDQjgfQTYAocqGkRKo=u5UQh821Gau2wZ7S3M8IRmVpL5JxGADJaq_k7iq6sYo=m5IHbX1Dbz7AYERvVgyxKXmrUQQ06IkA4VCDllkR0HM=>
>>>>>>>> however I need to update this list from time to time, and if I 
>>>>>>>> understood
>>>>>>>> correctly it is not possible on broadcast or cache without restarting 
>>>>>>>> the
>>>>>>>> job. Is there idiomatic way to achieve this? A db seems to be an 
>>>>>>>> overkill
>>>>>>>> for that and I do want to be cheap on io/network calls as much as 
>>>>>>>> possible.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Avi
>>>>>>>>
>>>>>>>>


Re: using updating shared data

2019-01-02 Thread Elias Levy
One thing you must be careful of, is that if you are using event time
processing, assuming that the control stream will only receive messages
sporadically, is that event time will stop moving forward in the operator
joining the streams while the control stream is idle.  You can get around
this by using a periodic watermark extractor one the control stream that
bounds the event time delay to processing time or by defining your own low
level operator that ignores watermarks from the control stream.

On Wed, Jan 2, 2019 at 8:42 AM Avi Levi  wrote:

> Thanks Till I will defiantly going to check it. just to make sure that I
> got you correctly. you are suggesting the the list that I want to broadcast
> will be broadcasted via control stream and it will be than be kept in the
> relevant operator state correct ? and updates (CRUD) on that list will be
> preformed via the control stream. correct ?
> BR
> Avi
>
> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann  wrote:
>
>> Hi Avi,
>>
>> you could use Flink's broadcast state pattern [1]. You would need to use
>> the DataStream API but it allows you to have two streams (input and control
>> stream) where the control stream is broadcasted to all sub tasks. So by
>> ingesting messages into the control stream you can send model updates to
>> all sub tasks.
>>
>> [1]
>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> 
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>
>>> Im trying to understand  your  use case.
>>> What is the source  of the data ? FS ,KAFKA else ?
>>>
>>>
>>> On Tue, Jan 1, 2019 at 6:29 PM Avi Levi  wrote:
>>>
 Hi,
 I have a list (couple of thousands text lines) that I need to use in my
 map function. I read this article about broadcasting variables
 
  or
 using distributed cache
 
 however I need to update this list from time to time, and if I understood
 correctly it is not possible on broadcast or cache without restarting the
 job. Is there idiomatic way to achieve this? A db seems to be an overkill
 for that and I do want to be cheap on io/network calls as much as possible.

 Cheers
 Avi




Re: Live configuration change

2018-11-06 Thread Elias Levy
Also note that there is a pending PR to allow the Cassandra sink to back
pressure, so that the cluster does not get overwhelmed.

On Tue, Nov 6, 2018 at 12:46 PM Ning Shi  wrote:

> > for rate limiting, would quota at Kafka brokers help?
>
> Thanks, Steven. This looks very promising. I'll try it out.
>
> --
> Ning
>


Re: RocksDB checkpointing dir per TM

2018-10-26 Thread Elias Levy
There is also state.backend.rocksdb.localdir.  Oddly, I can find the
documentation for it in the 1.5 docs
,
but not in the 1.6 docs
.
The option is still in master
,
and it is used

.

On Fri, Oct 26, 2018 at 3:01 AM Andrey Zagrebin 
wrote:

> Hi Taher,
>
> TMs keep state locally while running, in this case RocksDB files already
> belong to TM.
> You can point it to the same NVME disk location on each node, relevant
> Flink options here are:
> - io.tmp.dirs
> - taskmanager.state.local.root-dirs
> This data is transient and has temporary nature. It does not survive a job
> failure.
>
> The checkpoint is a logical snapshot of the operator state for all
> involved TMs,
> so it belongs to the job and usually uploaded to a distributed file system
> available on all TMs.
> The location is set in Flink option ‘state.checkpoints.dir'.
> This way job can restore from it with different set of TMs.
>
> Best,
> Andrey
>
> > On 26 Oct 2018, at 08:29, Taher Koitawala 
> wrote:
> >
> > Hi All,
> >   Our current cluster configuration uses one HDD which is mainly
> for root and an other NVME disk per node, [1]we want make sure all TMs
> write their own RocksDB files to the NVME disk only, how do we do that?
> >
> > [2] Is it also possible to specify multiple directories per TMs so that
> we have an even spread when the RocksDB files are written?
> >
> > Thanks,
> > Taher Koitawala
>
>


Re: Watermark on keyed stream

2018-10-10 Thread Elias Levy
You are correct that watermarks are not tracked per key.  You are dealing
with events with a high degree of delay variability.  That is usually not a
good match for event time processing as implemented in Flink.

You could use event time processing and configure a very large window
allowed lateness (days in your case), but that would significantly increase
the amount of state you must track.  That may be acceptable depending on
your message volume, scale of deployment, and state and timer storage
backend (RocksDB).


On Wed, Oct 10, 2018 at 11:12 AM Nick Triller 
wrote:

> Hi everyone,
>
>
>
> it seems Flink only supports global watermarks currently which is a
> problem for my use case.
>
> Many sensors send data which might be buffered for days in upstream
> systems before arriving at the Flink job.
>
> The job keys the stream by sensor. If other sensors send values in the
> meantime, the global watermark is advanced
>
> and buffered data that arrives late is dropped.
>
>
>
> How could the issue be solved? I guess it would be possible to calculate
> the watermark manually and add it to a wrapper object,
>
> but I am not sure how to correctly implement windowing (tumbling window)
> then.
>
>
>
> Thank you in advance for any ideas.
>
>
>
> Regards,
>
> Nick
>


Re: Kafka Per-Partition Watermarks

2018-10-04 Thread Elias Levy
Does your job perform a keyBy or broadcast that would result in data from
different partitions being distributed among tasks?  If so, then that would
be the cause.

On Thu, Oct 4, 2018 at 12:58 PM Andrew Kowpak 
wrote:

> Hi all,
>
> I apologize if this has been discussed to death in the past, but, I'm
> finding myself very confused, and google is not proving helpful.
>
> Based on the documentation, I understand that if there are idle partitions
> in a kafka stream, watermarks will not advance for the entire application.
> I was hoping that by setting parallelism = the number of partitions that I
> would be able to work around the issue, but, this didn't work.  I'm totally
> willing to accept the fact that if I have idle partitions, my windowed
> partitions won't work, however, I would really like to understand why
> setting the parallelism didn't work.  If someone can explain, or perhaps
> point me to documentation or code, it would be very much appreciated.
>
> Thanks.
>
> --
> *Andrew Kowpak P.Eng* *Sr. Software Engineer*
> (519)  489 2688 | SSIMWAVE Inc.
> 402-140 Columbia Street West, Waterloo ON
>


Re: Deserialization of serializer errored

2018-10-02 Thread Elias Levy
I am wondering if the issue here is the createTuple2TypeInformation
implicit is creating an anonymous class
<https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala#L97-L110>
which
results in a non-stable class name if the code is refactored, leading to
the class no longer being found by that name in a new version of the job.

On Tue, Oct 2, 2018 at 4:55 PM Elias Levy 
wrote:

> To add to the mystery, I extracted the class file mentioned in the
> exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the
> savepoint and disassembled it to determine what serializer it is.  The
> serializer actually has nothing to do with the case class that was
> initially modified and then completely removed.  Rather, it was the
> serializer
> org/apache/flink/api/scala/typeutils/CaseClassSerializer;>;
>
> That's the serializer generated by Flink for source 1 data stream type
> (DataStream[ (String, LazyObject) ]), which is consumed by the async
> function.  AFAIK there is no reason for there to be any error with that
> serializer.
>
> Thoughts?
>
>
>
> On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske  wrote:
>
>> Hi Elias,
>>
>> I am not familiar with the recovery code, but Flink might read (some of )
>> the savepoint data even though it is not needed and loaded into operators.
>> That would explain why you see an exception when the case class is
>> modified or completely removed.
>>
>> Maybe Stefan or Gordon can help here.
>>
>> Best, Fabian
>>
>> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
>> fearsome.lucid...@gmail.com>:
>>
>>> Any of the Flink folks seen this before?
>>>
>>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy 
>>> wrote:
>>>
>>>> I am experiencing a rather odd error.  We have a job running on a Flink
>>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>>>> by an async function, and the output of the async function and the other
>>>> original stream are consumed by a CoProcessOperator, that intern emits
>>>> Scala case class instances, that go into a stateful ProcessFunction filter,
>>>> and then into a sink.  I.e.
>>>>
>>>> source 1 -> async function --\
>>>>|---> co process -->
>>>> process --> sink
>>>> source 2 --/
>>>>
>>>> A field was added to output case class and the job would no longer
>>>> start up from a save point.  I assumed this was a result of a serializer
>>>> incompatibility.  I verified this by reversing the addition of the field
>>>> and the job could then restore from the previous savepoint.  So far it
>>>> makes sense.
>>>>
>>>> Then I decided to leave the new field in the case class, but eliminated
>>>> most of the DAG, leaving only the source 1 --> async function portion of
>>>> it.  The case class is emitted by the co process.  So this removed the
>>>> modified case class from the processing graph.  When I try to restore from
>>>> the savepoint, even if Allow Non Restored State is selected, the job fails
>>>> to restore with the error "Deserialization of serializer erroed".
>>>>
>>>> So then I decided to completely eliminate the modified case class.  I
>>>> removed all trace of it from the job, again only leaving the source 1 ->
>>>> async function.  I tried to restore this job, with no traces of the case
>>>> class, and still the job failed with the "Deserialization of serializer
>>>> erroed" even when Allow Non Restored State is selected.
>>>>
>>>> Anyone seen anything like this?
>>>>
>>>> This is the error being generated:
>>>>
>>>> WARN
>>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>>> Deserialization of serializer errored; replacing with null.
>>>> java.io.IOException: Unloadable class for type serializer.
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>

Re: Deserialization of serializer errored

2018-10-02 Thread Elias Levy
To add to the mystery, I extracted the class file mentioned in the
exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the
savepoint and disassembled it to determine what serializer it is.  The
serializer actually has nothing to do with the case class that was
initially modified and then completely removed.  Rather, it was the
serializer
org/apache/flink/api/scala/typeutils/CaseClassSerializer;>;

That's the serializer generated by Flink for source 1 data stream type
(DataStream[ (String, LazyObject) ]), which is consumed by the async
function.  AFAIK there is no reason for there to be any error with that
serializer.

Thoughts?



On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske  wrote:

> Hi Elias,
>
> I am not familiar with the recovery code, but Flink might read (some of )
> the savepoint data even though it is not needed and loaded into operators.
> That would explain why you see an exception when the case class is
> modified or completely removed.
>
> Maybe Stefan or Gordon can help here.
>
> Best, Fabian
>
> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
> fearsome.lucid...@gmail.com>:
>
>> Any of the Flink folks seen this before?
>>
>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy 
>> wrote:
>>
>>> I am experiencing a rather odd error.  We have a job running on a Flink
>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>>> by an async function, and the output of the async function and the other
>>> original stream are consumed by a CoProcessOperator, that intern emits
>>> Scala case class instances, that go into a stateful ProcessFunction filter,
>>> and then into a sink.  I.e.
>>>
>>> source 1 -> async function --\
>>>|---> co process -->
>>> process --> sink
>>> source 2 --/
>>>
>>> A field was added to output case class and the job would no longer start
>>> up from a save point.  I assumed this was a result of a serializer
>>> incompatibility.  I verified this by reversing the addition of the field
>>> and the job could then restore from the previous savepoint.  So far it
>>> makes sense.
>>>
>>> Then I decided to leave the new field in the case class, but eliminated
>>> most of the DAG, leaving only the source 1 --> async function portion of
>>> it.  The case class is emitted by the co process.  So this removed the
>>> modified case class from the processing graph.  When I try to restore from
>>> the savepoint, even if Allow Non Restored State is selected, the job fails
>>> to restore with the error "Deserialization of serializer erroed".
>>>
>>> So then I decided to completely eliminate the modified case class.  I
>>> removed all trace of it from the job, again only leaving the source 1 ->
>>> async function.  I tried to restore this job, with no traces of the case
>>> class, and still the job failed with the "Deserialization of serializer
>>> erroed" even when Allow Non Restored State is selected.
>>>
>>> Anyone seen anything like this?
>>>
>>> This is the error being generated:
>>>
>>> WARN
>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>> Deserialization of serializer errored; replacing with null.
>>> java.io.IOException: Unloadable class for type serializer.
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>> at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>> at
>>> org.apache.flink.streaming.ru

Scala case class state evolution

2018-10-02 Thread Elias Levy
Currently it is impossible to evolve a Scala case class by adding a new
field to it that is stored as managed state using the default Flink
serializer and restore a the job from a savepoint created using the
previous version of the class, correct?


Re: Deserialization of serializer errored

2018-10-01 Thread Elias Levy
Any of the Flink folks seen this before?

On Fri, Sep 28, 2018 at 5:23 PM Elias Levy 
wrote:

> I am experiencing a rather odd error.  We have a job running on a Flink
> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
> by an async function, and the output of the async function and the other
> original stream are consumed by a CoProcessOperator, that intern emits
> Scala case class instances, that go into a stateful ProcessFunction filter,
> and then into a sink.  I.e.
>
> source 1 -> async function --\
>|---> co process -->
> process --> sink
> source 2 --/
>
> A field was added to output case class and the job would no longer start
> up from a save point.  I assumed this was a result of a serializer
> incompatibility.  I verified this by reversing the addition of the field
> and the job could then restore from the previous savepoint.  So far it
> makes sense.
>
> Then I decided to leave the new field in the case class, but eliminated
> most of the DAG, leaving only the source 1 --> async function portion of
> it.  The case class is emitted by the co process.  So this removed the
> modified case class from the processing graph.  When I try to restore from
> the savepoint, even if Allow Non Restored State is selected, the job fails
> to restore with the error "Deserialization of serializer erroed".
>
> So then I decided to completely eliminate the modified case class.  I
> removed all trace of it from the job, again only leaving the source 1 ->
> async function.  I tried to restore this job, with no traces of the case
> class, and still the job failed with the "Deserialization of serializer
> erroed" even when Allow Non Restored State is selected.
>
> Anyone seen anything like this?
>
> This is the error being generated:
>
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 14 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewh

Deserialization of serializer errored

2018-09-28 Thread Elias Levy
I am experiencing a rather odd error.  We have a job running on a Flink
1.4.2 cluster with two Kafka input streams, one of the streams is processed
by an async function, and the output of the async function and the other
original stream are consumed by a CoProcessOperator, that intern emits
Scala case class instances, that go into a stateful ProcessFunction filter,
and then into a sink.  I.e.

source 1 -> async function --\
   |---> co process --> process
--> sink
source 2 --/

A field was added to output case class and the job would no longer start up
from a save point.  I assumed this was a result of a serializer
incompatibility.  I verified this by reversing the addition of the field
and the job could then restore from the previous savepoint.  So far it
makes sense.

Then I decided to leave the new field in the case class, but eliminated
most of the DAG, leaving only the source 1 --> async function portion of
it.  The case class is emitted by the co process.  So this removed the
modified case class from the processing graph.  When I try to restore from
the savepoint, even if Allow Non Restored State is selected, the job fails
to restore with the error "Deserialization of serializer erroed".

So then I decided to completely eliminate the modified case class.  I
removed all trace of it from the job, again only leaving the source 1 ->
async function.  I tried to restore this job, with no traces of the case
class, and still the job failed with the "Deserialization of serializer
erroed" even when Allow Non Restored State is selected.

Anyone seen anything like this?

This is the error being generated:

WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
 - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException:
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at

Queryable state and state TTL

2018-08-28 Thread Elias Levy
Is there a reason queryable state can't work with state TTL?  Trying to use
both at the same time leads to a "IllegalArgumentException: Queryable state
is currently not supported with TTL"


Why don't operations on KeyedStream return KeyedStream?

2018-08-28 Thread Elias Levy
Operators on a KeyedStream don't return a new KeyedStream.  Is there a
reason for this?  You need to perform `keyBy` again to get a KeyedStream.
Presumably if you key by the same value there won't be any shuffled data,
but the key may no longer be available within the stream record.


Re: Flink Rebalance

2018-08-09 Thread Elias Levy
What do you consider a lot of latency?  The rebalance will require
serializing / deserializing the data as it gets distributed.  Depending on
the complexity of your records and the efficiency of your serializers, that
could have a significant impact on your performance.

On Thu, Aug 9, 2018 at 2:14 PM antonio saldivar  wrote:

> Hello
>
> Does anyone know why when I add "rebalance()" to my .map steps is adding a
> lot of latency rather than not having rebalance.
>
>
> I have kafka partitions in my topic 44 and 44 flink task manager
>
> execution plan looks like this when I add rebalance but it is adding a lot
> of latency
>
> kafka-src -> rebalance -> step1 -> rebalance ->step2->rebalance ->
> kafka-sink
>
> Thank you
> regards
>
>


Flink Forwards 2018 videos

2018-08-05 Thread Elias Levy
It appears the Flink Forwards 2018 videos are FUBAR.  The data entry form
refuses to show them regardless of what you enter in it.


Re: Old job resurrected during HA failover

2018-08-03 Thread Elias Levy
Till,

Thoughts?

On Wed, Aug 1, 2018 at 7:34 PM vino yang  wrote:

> Your analysis is correct, yes, in theory the old jobgraph should be
> deleted, but Flink currently uses the method of locking and asynchronously
> deleting Path, so that it can not give you the acknowledgment of deleting,
> so this is a risk point.
>
> cc Till, there have been users who have encountered this problem before. I
> personally think that asynchronous deletion may be risky, which may cause
> JM to be revived by the cancel job after the failover.
>


Re: Behavior of time based operators

2018-08-02 Thread Elias Levy
See the section on Operators here
https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing

On Thu, Aug 2, 2018 at 3:42 PM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hello,
>
> I have recently started reading Stream Processing with Apache Flink by
> Fabian and Vasiliki. In Chapter 3 of the book there is a statement that
> says: None of the functions expose an API to set time stamps of emitted
> records, manipulate the event-time clock of a task, or emit watermarks.
> Instead, time-based DataStream operator tasks internally set the time
> stamps of emitted records to ensure that they are properly aligned with the
> emitted watermarks. For instance, a time-window operator task attached the
> end time of a window as time stamp to all records emitted by the window
> computation before it emits the watermark with the time stamp that
> triggered the computation of the window.
>
> Does this mean that time stamps in the records are overwritten by these
> time-based operators when using Event Time?
> --
> Regards,
> Harshvardhan
>


Re: Description of Flink event time processing

2018-08-02 Thread Elias Levy
Fabian,

https://github.com/apache/flink/pull/6481

I added a page, but did not remove or edit any existing page.  Let me know
what you'd like to see trimmed.


On Thu, Aug 2, 2018 at 8:44 AM Fabian Hueske  wrote:

> Hi Elias,
>
> Thanks for the update!
> I think the document can be added to the docs now.
>
> It has some overlap with the Event Time Overview page.
> IMO, it should not replace the overview page but rather be a new page.
> Maybe, we can make the overview a bit slimmer and point to the more
> detailed discussion of your document.
>
> Elias, do you want to put your document into Markdown and open a PR for
> the documentation?
>
>


Re: Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
I can see in the logs that the JM 1 (10.210.22.167), that one that became
leader after failover, thinks it deleted
the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled:

July 30th 2018, 15:32:27.231 Trying to cancel job with ID
2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232 Job Some Job
(2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to
CANCELED.
July 30th 2018, 15:32:27.232 Stopping checkpoint coordinator for job
2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239 Removed job graph
2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245 Removing
/flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251 Removing
/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Both /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1
and /flink/cluster_1/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 no
longer exist, but for some reason the job graph as is still there.

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241 Got user-level KeeperException when processing
sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94
txntype:-1 reqpath:n/a Error
Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = Directory not empty for
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

>From the comments in ZooKeeperStateHandleStore.java I gather that this
child node is used as a deletion lock.  Looking at the contents of this
ephemeral lock node:

[zk: localhost:2181(CONNECTED) 16] get
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
*10.210.42.62*
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get
/flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
*10.210.22.167*
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x201d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job
graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked
by the previous JM leader, JM 2(10.210.42.62), while the running job locked
by the current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership
failed over to JM 2.

Shouldn't something call ZooKeeperStateHandleStore.releaseAll during HA
failover to release the locks on the graphs?


On Wed, Aug 1, 2018 at 9:49 AM Elias Levy 
wrote:

> Thanks for the reply.  Looking in ZK I see:
>
> [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
> [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]
>
> Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even
> though that job is no longer running (it was canceled while it was in a
> loop attempting to restart, but failing because of a lack of cluster slots).
>
> Any idea why that may be the case?
>
>>


Re: Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
Vino,

Thanks for the reply.  Looking in ZK I see:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Again we see HA state for job 2a4eff355aef849c5ca37dbac04f2ff1, even though
that job is no longer running (it was canceled while it was in a loop
attempting to restart, but failing because of a lack of cluster slots).

Any idea why that may be the case?


On Wed, Aug 1, 2018 at 8:38 AM vino yang  wrote:

> If a job is explicitly canceled, its jobgraph node on ZK will be deleted.
> However, it is worth noting here that Flink enables a background thread to
> asynchronously delete the jobGraph node,
> so there may be cases where it cannot be deleted.
> On the other hand, the jobgraph node on ZK is the only basis for the JM
> leader to restore the job.
> There may be an unexpected recovery or an old job resurrection.
>


Old job resurrected during HA failover

2018-08-01 Thread Elias Levy
For the second time in as many months we've had an old job resurrected
during HA failover in a 1.4.2 standalone cluster.  Failover was initiated
when the leading JM lost its connection to ZK.  I opened FLINK-10011
 with the details.

We are using S3 with the Presto adapter as our distributed store.  After we
cleaned up the cluster by shutting down the two jobs started after failover
and starting a new job from the last known good checkpoint from the single
job running in the cluster before failover, the HA recovery directory looks
as follows:

3cmd ls s3://bucket/flink/cluster_1/recovery/
 DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
2018-07-31 17:33 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5
2018-07-31 17:34 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb
2018-07-31 17:32 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02
2018-06-12 20:01 284626
s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec
2018-07-30 23:01 285257
s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c

submittedJobGraph7f627a661cec appears to be job
2a4eff355aef849c5ca37dbac04f2ff1, the long running job that failed during
the ZK failover

submittedJobGraphf3767780c00c appears to be job
d77948df92813a68ea6dfd6783f40e7e, the job we started restoring from a
checkpoint after shutting down the duplicate jobs

Should submittedJobGraph7f627a661cec exist in the recovery directory if
2a4eff355aef849c5ca37dbac04f2ff1 is no longer running?


Re: Counting elements that appear "behind" the watermark

2018-07-31 Thread Elias Levy
Correct.  Context gives you access to the element timestamp
.
But it also gives you access to the current watermark via timerService

->
currentWatermark

.

On Tue, Jul 31, 2018 at 7:45 AM Julio Biason  wrote:

> Thanks for the tips. Unfortunately, it seems `Context` only have
> information from the element being processed (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java#L91)
> and the RuntimeContext doesn't have access to any watermark information (
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L57
> ).
>
>


Re: Counting elements that appear "behind" the watermark

2018-07-30 Thread Elias Levy
You can create a ProcessFunction.  That gives you access to
getRuntimeContext to register metrics, to the element timestamp, and the
current watermark.  Keep in mind that operators first process a record and
then process any watermark that was the result of that record, so that when
you get the current watermark from within the processElement method, the
watermark generated from that element won't be the current watermark.

On Mon, Jul 30, 2018 at 10:33 AM Julio Biason 
wrote:

> Hello,
>
> Our current watermark model is "some time behind the most recent seen
> element" (very close to what the docs have in "Periodic Watermark"
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#with-periodic-watermarks).
> It fits our current processing model.
>
> The thing is, we want to extract information about elements appearing
> behind the watermark, to give some insight when we need to update the
> amount of time behind the most seen element we need. The problem is, I
> can't create any metrics inside the AssignerWithPeriodicWatermarks 'cause
> it has no `getRuntime()` to attach the metric.
>
> Is there any way we can count those (a ProcessFunction before the
> .assignTimestampsAndWatermarks(), maybe)?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>


Re: Description of Flink event time processing

2018-07-30 Thread Elias Levy
Fabian,

You have any time to review the changes?

On Thu, Jul 19, 2018 at 2:19 AM Fabian Hueske  wrote:

> Hi Elias,
>
> Thanks for the update!
> I'll try to have another look soon.
>
> Best, Fabian
>
> 2018-07-11 1:30 GMT+02:00 Elias Levy :
>
>> Thanks for all the comments.  I've updated the document to account for
>> the feedback.  Please take a look.
>>
>> On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
>> wrote:
>>
>>> Apologies.  Comments are now enabled.
>>>
>>> On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:
>>>
>>>> Hi Elias,
>>>>
>>>> Thanks for putting together the document. This is actually a very good,
>>>> well-rounded document.
>>>> I think you did not to enable access for comments for the link. Would
>>>> you mind enabling comments for the google doc?
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>>
>>>> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>>>>
>>>>> Hi Elias,
>>>>>
>>>>> Thanks for the great document!
>>>>> I made a pass over it and left a few comments.
>>>>>
>>>>> I think we should definitely add this to the documentation.
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>>>>
>>>>>> Hi Elias,
>>>>>>
>>>>>> I agree, the docs lack a coherent discussion of event time features.
>>>>>> Thank you for this write up!
>>>>>> I just skimmed your document and will provide more detailed feedback
>>>>>> later.
>>>>>>
>>>>>> It would be great to add such a page to the documentation.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>>>>>
>>>>>>> The documentation of how Flink handles event time and watermarks is
>>>>>>> spread across several places.  I've been wanting a single location that
>>>>>>> summarizes the subject, and as none was available, I wrote one up.
>>>>>>>
>>>>>>> You can find it here:
>>>>>>> https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>>>>>>>
>>>>>>> I'd appreciate feedback, particularly about the correctness of the
>>>>>>> described behavior.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>


Re: Implement Joins with Lookup Data

2018-07-24 Thread Elias Levy
Alas, this suffer from the bootstrap problem.  At the moment Flink does not
allow you to pause a source (the positions), so you can't fully consume the
and preload the accounts or products to perform the join before the
positions start flowing.  Additionally, Flink SQL does not support
materializing an upset table for the accounts or products to perform the
join, so yo have to develop your own KeyedProcessFunction, maintain the
state, and perform the join on your own if you only want to join against
the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi Till,
>>
>> How would we do the initial hydration of the Product and Account data
>> since it’s currently in a relational DB? Do we have to copy over data to
>> Kafka and then use them?
>>
>> Regards,
>> Harsh
>>
>> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>>
>>> Hi Harshvardhan,
>>>
>>> I agree with Ankit that this problem could actually be solved quite
>>> elegantly with Flink's state. If you can ingest the product/account
>>> information changes as a stream, you can keep the latest version of it in
>>> Flink state by using a co-map function [1, 2]. One input of the co-map
>>> function would be the product/account update stream which updates the
>>> respective entries in Flink's state and the other input stream is the one
>>> to be enriched. When receiving input from this stream one would lookup the
>>> latest information contained in the operator's state and join it with the
>>> incoming event.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
 Hi,

 Thanks for your responses.

 There is no fixed interval for the data being updated. It’s more like
 whenever you onboard a new product or there are any mandates that change
 will trigger the reference data to change.

 It’s not just the enrichment we are doing here. Once we have enriched
 the data we will be performing a bunch of aggregations using the enriched
 data.

 Which approach would you recommend?

 Regards,
 Harshvardhan

 On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:

> How often is the product db updated? Based on that you can store
> product metadata as state in Flink, maybe setup the state on cluster
> startup and then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of
> value on top of Kafka. As Jorn said below, you can very well store all the
> events in an external store and then periodically run a cron to enrich
> later since your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db
> (eg key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first
> store the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We
> have Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink 
> so
> far I think there are two ways to achieve this. Here are two ways to do 
> it:
>
>
>
> 1) First Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Perform lookup from the database for each key and then obtain
> Tuple2
>
>
>
> 2) Second Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Window the keyed stream into say 15 seconds each.
>
> c) For each window get the unique product keys and perform a single
> lookup.
>
> d) Somehow join Positions and Products
>
>
>
> In the first approach we will be making a lot of calls to the DB and
> the solution is very chatty. Its 

Re: event time and late events - documentation

2018-07-16 Thread Elias Levy
Tovi,

The document here

should answer your question.  If it doesn't, please let me know.

On Mon, Jul 16, 2018 at 5:17 AM Sofer, Tovi  wrote:

> Hi group,
>
> Can someone please elaborate on the comment at the end of section
> “Debugging Windows & Event Time”?
>
> Didn’t understand it meaning.
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html
>
> *“Handling Event Time Stragglers*
>
> *Approach 1: Watermark stays late (indicated completeness), windows fire
> early*
>
> *Approach 2: Watermark heuristic with maximum lateness, windows accept
> late data”*
>
>
>
> Thanks,
>
> Tovi
>


Re: StateMigrationException when switching from TypeInformation.of to createTypeInformation

2018-07-14 Thread Elias Levy
Apologies for the delay.  I've been traveling.

On Mon, Jul 9, 2018 at 8:44 AM Till Rohrmann  wrote:

> could you check whether the `TypeInformation` returned by
> `TypeInformation.of(new TypeHint[ConfigState]() {}))` and
> `createTypeInformation[ConfigState]` return the same `TypeInformation`
> subtype? The problem is that the former goes through the Java TypeExtractor
> whereas the latter goes through the Scala `TypeUtils#createTypeInfo` where
> the resulting `TypeInformation` is created via Scala macros. It must be the
> case that the Scala `TypeUtils` generate a different `TypeInformation`
> (e.g. Java generating a GenericTypeInfo whereas Scala generates a
> TraversableTypeInfo).
>

TypeInformation.of to returns a GenericTypeInfo and toString reports it as
GenericType.

createTypeInformation returns an anonymous class but toString reports it as
interface scala.collection.mutable.Map[scala.Tuple2(_1: String, _2:
scala.Tuple2(_1: GenericType, _2:
byte[]))].

Looks like you are correct about the Java version using GenericTypeInfo.  I
suppose the only way around this if we wanted to move over to
createTypeInformation
is to release a job that supports both types and upgrade the state from one
to the other, then drop support for the older state.  Yes?

It would also be helpful if you could share the definition of `ConfigState`
> in order to test it ourselves.
>

ConfigState is defined as type ConfigState =
mutable.Map[String,ConfigStateValue] and ConfigStateValue is defined as type
ConfigStateValue = (LazyObject,Array[Byte]).  LazyObject is from the
Doubledutch LazyJSON  package.


Re: Description of Flink event time processing

2018-07-10 Thread Elias Levy
Thanks for all the comments.  I've updated the document to account for the
feedback.  Please take a look.

On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
wrote:

> Apologies.  Comments are now enabled.
>
> On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:
>
>> Hi Elias,
>>
>> Thanks for putting together the document. This is actually a very good,
>> well-rounded document.
>> I think you did not to enable access for comments for the link. Would you
>> mind enabling comments for the google doc?
>>
>> Thanks,
>> Rong
>>
>>
>> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>>
>>> Hi Elias,
>>>
>>> Thanks for the great document!
>>> I made a pass over it and left a few comments.
>>>
>>> I think we should definitely add this to the documentation.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>>
>>>> Hi Elias,
>>>>
>>>> I agree, the docs lack a coherent discussion of event time features.
>>>> Thank you for this write up!
>>>> I just skimmed your document and will provide more detailed feedback
>>>> later.
>>>>
>>>> It would be great to add such a page to the documentation.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>>>
>>>>> The documentation of how Flink handles event time and watermarks is
>>>>> spread across several places.  I've been wanting a single location that
>>>>> summarizes the subject, and as none was available, I wrote one up.
>>>>>
>>>>> You can find it here:
>>>>> https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>>>>>
>>>>> I'd appreciate feedback, particularly about the correctness of the
>>>>> described behavior.
>>>>>
>>>>
>>>>
>>>


Re: Description of Flink event time processing

2018-07-06 Thread Elias Levy
Apologies.  Comments are now enabled.

On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:

> Hi Elias,
>
> Thanks for putting together the document. This is actually a very good,
> well-rounded document.
> I think you did not to enable access for comments for the link. Would you
> mind enabling comments for the google doc?
>
> Thanks,
> Rong
>
>
> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>
>> Hi Elias,
>>
>> Thanks for the great document!
>> I made a pass over it and left a few comments.
>>
>> I think we should definitely add this to the documentation.
>>
>> Thanks,
>> Fabian
>>
>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>
>>> Hi Elias,
>>>
>>> I agree, the docs lack a coherent discussion of event time features.
>>> Thank you for this write up!
>>> I just skimmed your document and will provide more detailed feedback
>>> later.
>>>
>>> It would be great to add such a page to the documentation.
>>>
>>> Best, Fabian
>>>
>>> 2018-07-03 3:07 GMT+02:00 Elias Levy :
>>>
>>>> The documentation of how Flink handles event time and watermarks is
>>>> spread across several places.  I've been wanting a single location that
>>>> summarizes the subject, and as none was available, I wrote one up.
>>>>
>>>> You can find it here:
>>>> https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>>>>
>>>> I'd appreciate feedback, particularly about the correctness of the
>>>> described behavior.
>>>>
>>>
>>>
>>


StateMigrationException when switching from TypeInformation.of to createTypeInformation

2018-07-06 Thread Elias Levy
During some refactoring we changed a job using managed state from:

ListStateDescriptor("config", TypeInformation.of(new
TypeHint[ConfigState]() {}))

to

ListStateDescriptor("config", createTypeInformation[ConfigState])

After this change, Flink refused to start the new job from a savepoint or
checkpoint, raising StateMigrationException instead.

Why is Flink raising this error?  Both TypeInformation.of and
createTypeInformation return TypeInformation[ConfigState], so why does it
think the state type has changed?


Description of Flink event time processing

2018-07-02 Thread Elias Levy
The documentation of how Flink handles event time and watermarks is spread
across several places.  I've been wanting a single location that summarizes
the subject, and as none was available, I wrote one up.

You can find it here:
https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing

I'd appreciate feedback, particularly about the correctness of the
described behavior.


Re: String Interning

2018-06-28 Thread Elias Levy
Am I the only one that feels the config should be renamed or the docs on it
expanded?  Turning on object reuse doesn't really reuse objects, not in the
sense that an object can be reused for different values / messages /
records.  Instead, it stops Flink from making copies of of a record, by
serializing them and deserializing them, when passing them to the next
operator.

On Tue, Jun 26, 2018 at 1:26 AM Stefan Richter 
wrote:

> Hi,
>
> you can enable object reuse via the execution config [1]: „By default,
> objects are not reused in Flink. Enabling the object reuse mode will
> instruct the runtime to reuse user objects for better performance. Keep in
> mind that this can lead to bugs when the user-code function of an operation
> is not aware of this behavior.“.
>
> Best,
> Stefan
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/execution_configuration.html
>
> Am 22.06.2018 um 20:09 schrieb Martin, Nick :
>
> I have a job where I read data from Kafka, do some processing on it, and
> write it to a database. When I read data out of Kafka, I put it into an
> object that has a String field based on the Kafka message key. The possible
> values for the message key are tightly constrained so there are fewer than
> 100 possible unique key values. Profiling of the Flink job shows millions
> of in flight stream elements, with an equal number of Strings, but I know
> all the strings are duplicates of a small number of unique values.  So it’s
> an ideal usecase for String interning. I’ve tried to use interning in the
> constructors for the message elements, but I suspect that I need to do
> something to preserve the interning when Flink serializes/deserializes
> objects when passing them between operators. What’s the best way to
> accomplish that?
>
>
>
>
> --
> Notice: This e-mail is intended solely for use of the individual or entity
> to which it is addressed and may contain information that is proprietary,
> privileged and/or exempt from disclosure under applicable law. If the
> reader is not the intended recipient or agent responsible for delivering
> the message to the intended recipient, you are hereby notified that any
> dissemination, distribution or copying of this communication is strictly
> prohibited. This communication may also contain data subject to U.S. export
> laws. If so, data subject to the International Traffic in Arms Regulation
> cannot be disseminated, distributed, transferred, or copied, whether
> incorporated or in its original form, to foreign nationals residing in the
> U.S. or abroad, absent the express prior approval of the U.S. Department of
> State. Data subject to the Export Administration Act may not be
> disseminated, distributed, transferred or copied contrary to U. S.
> Department of Commerce regulations. If you have received this communication
> in error, please notify the sender by reply e-mail and destroy the e-mail
> message and any physical copies made of the communication.
>  Thank you.
> *
>
>
>


high-availability.storageDir clean up?

2018-06-25 Thread Elias Levy
I noticed in one of our cluster that they are relatively old
submittedJobGraph* and completedCheckpoint* files.  I was wondering at what
point it is save to clean some of these up.


Re: Cluster resurrects old job

2018-06-20 Thread Elias Levy
Alas, that error appears to be a red herring.  Admin mistyped the cancel
command leading to the error.  But immediately corrected it, resulting in
the job being canceled next.  So seems unrelated to the job coming back to
life later on.

On Wed, Jun 20, 2018 at 10:04 AM Elias Levy 
wrote:

> The source of the issue may be this error that occurred when the job was
> being canceled on June 5:
>
> June 5th 2018, 14:59:59.430 Failure during cancellation of job
> c59dd3133b1182ce2c05a5e2603a0646 with savepoint.
> java.io.IOException: Failed to create savepoint directory at
> --checkpoint-dir
> at
> org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createSavepointDirectory(SavepointStore.java:106)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:376)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:577)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> On Wed, Jun 20, 2018 at 9:31 AM Elias Levy 
> wrote:
>
>> We had an unusual situation last night.  One of our Flink clusters
>> experienced some connectivity issues, with lead to the the single job
>> running on the cluster failing and then being restored.
>>
>> And then something odd happened.  The cluster decided to also restore an
>> old version of the job.  One we were running a month ago.  That job was
>> canceled on June 5 with a savepoint:
>>
>> June 5th 2018, 15:00:43.865 Trying to cancel job
>> c59dd3133b1182ce2c05a5e2603a0646 with savepoint to
>> s3://bucket/flink/foo/savepoints
>> June 5th 2018, 15:00:44.438 Savepoint stored in
>> s3://bucket/flink/foo/savepoints/savepoint-c59dd3-f748765c67df. Now
>> cancelling c59dd3133b1182ce2c05a5e2603a0646.
>> June 5th 2018, 15:00:44.438 Job IOC Engine
>> (c59dd3133b1182ce2c05a5e2603a0646) switched from state RUNNING to
>> CANCELLING.
>> June 5th 2018, 15:00:44.495 Job IOC Engine
>> (c59dd3133b1182ce2c05a5e2603a0646) switched from state CANCELLING to
>> CANCELED.
>> June 5th 2018, 15:00:44.507 Removed job graph
>> c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper.
>> June 5th 2018, 15:00:44.508 Removing
>> /flink/foo/checkpoints/c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper
>> June 5th 2018, 15:00:44.732 Job c59dd3133b1182ce2c05a5e2603a0646 has
>> been archived at
>> s3://bucket/flink/foo/archive/c59dd3133b1182ce2c05a5e2603a0646.
>>
>> But then yesterday:
>>
>> June 19th 2018, 17:55:31.917 Attempting to recover job
>> c59dd3133b1182ce2c05a5e2603a0646.
>> June 19th 2018, 17:55:32.155 Recovered
>> SubmittedJobGraph(c59dd3133b1182ce2c05a5e2603a0646, JobInfo(clients:
>> Set((Actor[akka.tcp://fl...@ip-10-201-11-121.eu-west-1.compute.internal:42823/temp/$c],DETACHED)),
>> start: 1524514537697)).
>> June 19th 2018, 17:55:32.157 Submitting job
>> c59dd3133b1182ce2c05a5e2603a0646 (Some Job) (Recovery).
>> June 19th 2018, 17:55:32.157 Using restart strategy
>> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
>> delayBetweenRestartAttempts=3) for c59dd3133b1182ce2c05a5e2603a0646.
>> June 19th 2018, 17:55:32.157 Submitting recovered job
>> c59dd3133b1182ce2c05a5e2603a0646.
>> June 19th 2018, 17:55:32.158 Running initialization on master for job
>> Some Job (c59dd3133b1182ce2c05a5e2603a0646)

Re: Cluster resurrects old job

2018-06-20 Thread Elias Levy
The source of the issue may be this error that occurred when the job was
being canceled on June 5:

June 5th 2018, 14:59:59.430 Failure during cancellation of job
c59dd3133b1182ce2c05a5e2603a0646 with savepoint.
java.io.IOException: Failed to create savepoint directory at
--checkpoint-dir
at
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createSavepointDirectory(SavepointStore.java:106)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:376)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:577)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

On Wed, Jun 20, 2018 at 9:31 AM Elias Levy 
wrote:

> We had an unusual situation last night.  One of our Flink clusters
> experienced some connectivity issues, with lead to the the single job
> running on the cluster failing and then being restored.
>
> And then something odd happened.  The cluster decided to also restore an
> old version of the job.  One we were running a month ago.  That job was
> canceled on June 5 with a savepoint:
>
> June 5th 2018, 15:00:43.865 Trying to cancel job
> c59dd3133b1182ce2c05a5e2603a0646 with savepoint to
> s3://bucket/flink/foo/savepoints
> June 5th 2018, 15:00:44.438 Savepoint stored in
> s3://bucket/flink/foo/savepoints/savepoint-c59dd3-f748765c67df. Now
> cancelling c59dd3133b1182ce2c05a5e2603a0646.
> June 5th 2018, 15:00:44.438 Job IOC Engine
> (c59dd3133b1182ce2c05a5e2603a0646) switched from state RUNNING to
> CANCELLING.
> June 5th 2018, 15:00:44.495 Job IOC Engine
> (c59dd3133b1182ce2c05a5e2603a0646) switched from state CANCELLING to
> CANCELED.
> June 5th 2018, 15:00:44.507 Removed job graph
> c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper.
> June 5th 2018, 15:00:44.508 Removing
> /flink/foo/checkpoints/c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper
> June 5th 2018, 15:00:44.732 Job c59dd3133b1182ce2c05a5e2603a0646 has been
> archived at s3://bucket/flink/foo/archive/c59dd3133b1182ce2c05a5e2603a0646.
>
> But then yesterday:
>
> June 19th 2018, 17:55:31.917 Attempting to recover job
> c59dd3133b1182ce2c05a5e2603a0646.
> June 19th 2018, 17:55:32.155 Recovered
> SubmittedJobGraph(c59dd3133b1182ce2c05a5e2603a0646, JobInfo(clients:
> Set((Actor[akka.tcp://fl...@ip-10-201-11-121.eu-west-1.compute.internal:42823/temp/$c],DETACHED)),
> start: 1524514537697)).
> June 19th 2018, 17:55:32.157 Submitting job
> c59dd3133b1182ce2c05a5e2603a0646 (Some Job) (Recovery).
> June 19th 2018, 17:55:32.157 Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=3) for c59dd3133b1182ce2c05a5e2603a0646.
> June 19th 2018, 17:55:32.157 Submitting recovered job
> c59dd3133b1182ce2c05a5e2603a0646.
> June 19th 2018, 17:55:32.158 Running initialization on master for job
> Some Job (c59dd3133b1182ce2c05a5e2603a0646).
> June 19th 2018, 17:55:32.165 Initialized in
> '/checkpoints/c59dd3133b1182ce2c05a5e2603a0646'.
> June 19th 2018, 17:55:32.170 Job Some Job
> (c59dd3133b1182ce2c05a5e2603a0646) switched from state CREATED to RUNNING.
> June 19th 2018, 17:55:32.170 Scheduling job
> c59dd3133b1182ce2c05a5e2603a0646 (Some Job).
>
> Anyone seen anything like this?  Any ideas what the cause may have been?
>
> I am guessing that the state in ZK or S3 may have been somewhat corrupted
> when the job was previously shutdown, and that when the cluster encountered
> networking problems yesterday
> that lead to the cancel and restore of the currently running job, the

Cluster resurrects old job

2018-06-20 Thread Elias Levy
We had an unusual situation last night.  One of our Flink clusters
experienced some connectivity issues, with lead to the the single job
running on the cluster failing and then being restored.

And then something odd happened.  The cluster decided to also restore an
old version of the job.  One we were running a month ago.  That job was
canceled on June 5 with a savepoint:

June 5th 2018, 15:00:43.865 Trying to cancel job
c59dd3133b1182ce2c05a5e2603a0646 with savepoint to
s3://bucket/flink/foo/savepoints
June 5th 2018, 15:00:44.438 Savepoint stored in
s3://bucket/flink/foo/savepoints/savepoint-c59dd3-f748765c67df. Now
cancelling c59dd3133b1182ce2c05a5e2603a0646.
June 5th 2018, 15:00:44.438 Job IOC Engine
(c59dd3133b1182ce2c05a5e2603a0646) switched from state RUNNING to
CANCELLING.
June 5th 2018, 15:00:44.495 Job IOC Engine
(c59dd3133b1182ce2c05a5e2603a0646) switched from state CANCELLING to
CANCELED.
June 5th 2018, 15:00:44.507 Removed job graph
c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper.
June 5th 2018, 15:00:44.508 Removing
/flink/foo/checkpoints/c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper
June 5th 2018, 15:00:44.732 Job c59dd3133b1182ce2c05a5e2603a0646 has been
archived at s3://bucket/flink/foo/archive/c59dd3133b1182ce2c05a5e2603a0646.

But then yesterday:

June 19th 2018, 17:55:31.917 Attempting to recover job
c59dd3133b1182ce2c05a5e2603a0646.
June 19th 2018, 17:55:32.155 Recovered
SubmittedJobGraph(c59dd3133b1182ce2c05a5e2603a0646, JobInfo(clients:
Set((Actor[akka.tcp://fl...@ip-10-201-11-121.eu-west-1.compute.internal:42823/temp/$c],DETACHED)),
start: 1524514537697)).
June 19th 2018, 17:55:32.157 Submitting job
c59dd3133b1182ce2c05a5e2603a0646 (Some Job) (Recovery).
June 19th 2018, 17:55:32.157 Using restart strategy
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=3) for c59dd3133b1182ce2c05a5e2603a0646.
June 19th 2018, 17:55:32.157 Submitting recovered job
c59dd3133b1182ce2c05a5e2603a0646.
June 19th 2018, 17:55:32.158 Running initialization on master for job Some
Job (c59dd3133b1182ce2c05a5e2603a0646).
June 19th 2018, 17:55:32.165 Initialized in
'/checkpoints/c59dd3133b1182ce2c05a5e2603a0646'.
June 19th 2018, 17:55:32.170 Job Some Job
(c59dd3133b1182ce2c05a5e2603a0646) switched from state CREATED to RUNNING.
June 19th 2018, 17:55:32.170 Scheduling job
c59dd3133b1182ce2c05a5e2603a0646 (Some Job).

Anyone seen anything like this?  Any ideas what the cause may have been?

I am guessing that the state in ZK or S3 may have been somewhat corrupted
when the job was previously shutdown, and that when the cluster encountered
networking problems yesterday
that lead to the cancel and restore of the currently running job, the
restore logic scanned ZK or S3 looking for jobs to restore, came across the
old job with bad state and decided to bring it back to life.

Any way to scan ZooKeeper or S3 for such jobs?


Re: [DISCUSS] Flink 1.6 features

2018-06-16 Thread Elias Levy
One more, since it we have to deal with it often:

- Idling sources (Kafka in particular) and proper watermark propagation:
FLINK-5018 / FLINK-5479

On Fri, Jun 8, 2018 at 2:58 PM, Elias Levy 
wrote:

> Since wishes are free:
>
> - Standalone cluster job isolation: https://issues.
> apache.org/jira/browse/FLINK-8886
> - Proper sliding window joins (not overlapping hoping window joins):
> https://issues.apache.org/jira/browse/FLINK-6243
> - Sharing state across operators: https://issues.
> apache.org/jira/browse/FLINK-6239
> - Synchronizing streams: https://issues.apache.org/jira/browse/FLINK-4558
>
> Seconded:
> - Atomic cancel-with-savepoint: https://issues.apache.org/jira/
> browse/FLINK-7634
> - Support dynamically changing CEP patterns : https://issues.apache.org/
> jira/browse/FLINK-7129
>
>
> On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  wrote:
>
>> Hi all!
>>
>> Thanks for the discussion and good input. Many suggestions fit well with
>> the proposal above.
>>
>> Please bear in mind that with a time-based release model, we would
>> release whatever is mature by end of July.
>> The good thing is we could schedule the next release not too far after
>> that, so that the features that did not quite make it will not be delayed
>> too long.
>> In some sense, you could read this as as "*what to do first*" list,
>> rather than "*this goes in, other things stay out"*.
>>
>> Some thoughts on some of the suggestions
>>
>> *Kubernetes integration:* An opaque integration with Kubernetes should
>> be supported through the "as a library" mode. For a deeper integration, I
>> know that some committers have experimented with some PoC code. I would let
>> Till add some thoughts, he has worked the most on the deployment parts
>> recently.
>>
>> *Per partition watermarks with idleness:* Good point, could one
>> implement that on the current interface, with a periodic watermark
>> extractor?
>>
>> *Atomic cancel-with-savepoint:* Agreed, this is important. Making this
>> work with all sources needs a bit more work. We should have this in the
>> roadmap.
>>
>> *Elastic Bloomfilters:* This seems like an interesting new feature - the
>> above suggested feature set was more about addressing some longer standing
>> issues/requests. However, nothing should prevent contributors to work on
>> that.
>>
>> Best,
>> Stephan
>>
>>
>> On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] > > wrote:
>>
>>> +1 on https://issues.apache.org/jira/browse/FLINK-5479
>>> [FLINK-5479] Per-partition watermarks in ...
>>> <https://issues.apache.org/jira/browse/FLINK-5479>
>>> issues.apache.org
>>> Reported in ML: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-
>>> skewness-causes-watermark-not-being-emitted-td11008.html It's normally
>>> not a common case to have Kafka partitions not producing any data, but
>>> it'll probably be good to handle this as well. I ...
>>>
>>> --
>>> *From:* Rico Bergmann 
>>> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
>>> *To:* Hao Sun
>>> *Cc:* d...@flink.apache.org; user
>>> *Subject:* Re: [DISCUSS] Flink 1.6 features
>>>
>>> +1 on K8s integration
>>>
>>>
>>>
>>> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>>>
>>> adding my vote to K8S Job mode, maybe it is this?
>>> > Smoothen the integration in Container environment, like "Flink as a
>>> Library", and easier integration with Kubernetes services and other proxies.
>>>
>>>
>>>
>>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
>>> wrote:
>>>
>>> Hi Stephan,
>>>
>>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
>>> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
>>> partitions) be included in 1.6? As we are seeing more users with this
>>> issue on the mailing lists.
>>>
>>> Thanks.
>>> Ben
>>>
>>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>>
>>> Hi Stephan,
>>>
>>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>>> in 1.6? There were discussions about possibly including it in 1.6:
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.m
>>> box/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@m
>>> ail.gmail.com%3e
>>>

Re: [DISCUSS] Flink 1.6 features

2018-06-08 Thread Elias Levy
Since wishes are free:

- Standalone cluster job isolation:
https://issues.apache.org/jira/browse/FLINK-8886
- Proper sliding window joins (not overlapping hoping window joins):
https://issues.apache.org/jira/browse/FLINK-6243
- Sharing state across operators:
https://issues.apache.org/jira/browse/FLINK-6239
- Synchronizing streams: https://issues.apache.org/jira/browse/FLINK-4558

Seconded:
- Atomic cancel-with-savepoint:
https://issues.apache.org/jira/browse/FLINK-7634
- Support dynamically changing CEP patterns :
https://issues.apache.org/jira/browse/FLINK-7129


On Fri, Jun 8, 2018 at 1:31 PM, Stephan Ewen  wrote:

> Hi all!
>
> Thanks for the discussion and good input. Many suggestions fit well with
> the proposal above.
>
> Please bear in mind that with a time-based release model, we would release
> whatever is mature by end of July.
> The good thing is we could schedule the next release not too far after
> that, so that the features that did not quite make it will not be delayed
> too long.
> In some sense, you could read this as as "*what to do first*" list,
> rather than "*this goes in, other things stay out"*.
>
> Some thoughts on some of the suggestions
>
> *Kubernetes integration:* An opaque integration with Kubernetes should be
> supported through the "as a library" mode. For a deeper integration, I know
> that some committers have experimented with some PoC code. I would let Till
> add some thoughts, he has worked the most on the deployment parts recently.
>
> *Per partition watermarks with idleness:* Good point, could one implement
> that on the current interface, with a periodic watermark extractor?
>
> *Atomic cancel-with-savepoint:* Agreed, this is important. Making this
> work with all sources needs a bit more work. We should have this in the
> roadmap.
>
> *Elastic Bloomfilters:* This seems like an interesting new feature - the
> above suggested feature set was more about addressing some longer standing
> issues/requests. However, nothing should prevent contributors to work on
> that.
>
> Best,
> Stephan
>
>
> On Wed, Jun 6, 2018 at 6:23 AM, Yan Zhou [FDS Science] 
> wrote:
>
>> +1 on https://issues.apache.org/jira/browse/FLINK-5479
>> [FLINK-5479] Per-partition watermarks in ...
>> 
>> issues.apache.org
>> Reported in ML: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-
>> skewness-causes-watermark-not-being-emitted-td11008.html It's normally
>> not a common case to have Kafka partitions not producing any data, but
>> it'll probably be good to handle this as well. I ...
>>
>> --
>> *From:* Rico Bergmann 
>> *Sent:* Tuesday, June 5, 2018 9:12:00 PM
>> *To:* Hao Sun
>> *Cc:* d...@flink.apache.org; user
>> *Subject:* Re: [DISCUSS] Flink 1.6 features
>>
>> +1 on K8s integration
>>
>>
>>
>> Am 06.06.2018 um 00:01 schrieb Hao Sun :
>>
>> adding my vote to K8S Job mode, maybe it is this?
>> > Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>
>>
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
>> wrote:
>>
>> Hi Stephan,
>>
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]
>> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
>> partitions) be included in 1.6? As we are seeing more users with this
>> issue on the mailing lists.
>>
>> Thanks.
>> Ben
>>
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.m
>> box/%3cCAMq=OU7gru2O9JtoWXn1Lc1F7NKcxAyN6A3e58kxctb4b508RQ@m
>> ail.gmail.com%3e
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: d...@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make 

Re: Odd job failure

2018-05-28 Thread Elias Levy
On Mon, May 28, 2018 at 1:48 AM, Piotr Nowojski 
wrote:

> Most likely suspect is the standard java problem of some dependency
> convergence issue. Please check if you are not pulling in multiple Kafka
> versions into your class path. Especially your job shouldn’t pull any Kafka
> library except of the one that comes from flnk-connector-kafka-0.11 (which
> is 0.11.0.2).
>

Alas, that is not the case.  The job correctly includes kafka-clients:
0.11.0.2:

[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]
[warn]  * org.apache.kafka:kafka-clients:0.11.0.2 is selected over
{0.10.2.1, 0.9.0.1}
[warn]  +- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2
(depends on 0.11.0.2)
[warn]  +- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2
(depends on 0.10.2.1)
[warn]  +- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2
(depends on 0.10.2.1)
[warn]




> Please also consider upgrading your cluster at least to Kafka 0.11.0.2.
> Kafka 0.11.0.0 was pretty unstable release, and we do not support it. Our
> connector depend on Kafka 0.11.0.2 client and while I don’t assume that
> there is some incompatibility between 0.11.0.0 cluster and 0.11.0.2 client,
> it definitely wouldn’t hurt to upgrade the cluster.
>

Thanks for the tip.  That said, this error should be unrelated to the
version of the cluster.


Re: Odd job failure

2018-05-26 Thread Elias Levy
Piotr & Stephan,

Thanks for the replies.  Apologies for the late response.  I've been
traveling for the past month.

We've not observed this issue (spilling) again, but it is good to know that
1.5 will use back-pressure based alignment.  I think for now we'll leave
task.checkpoint.alignment.max-size as is and work towards moving to 1.5
once we confirm it is stable.

As for the java.lang.NoClassDefFoundError: org/apache/kafka/clients/
NetworkClient$1 error.  We see that one constantly when jobs are
canceled/restarted/upgraded.  We are using the flink-connector-kafka-0.11
connector against a 0.11.0.0 cluster.  The error indicates to me that the
Kafka threads are not being fully shutdown and they are trying to reload
the NetworkClient class but failing, maybe because the code is no longer
accessible via the class loader or some other reason.

It looks like others are observing the same error.  Alexander Smirnov
reported it here on the list last month as well.


On Thu, May 3, 2018 at 1:22 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Elias!
>
> Concerning the spilling of alignment data to disk:
>
>   - In 1.4.x , you can set an upper limit via "
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does
> not spill any more.
>
> Best,
> Stephan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/config.html#task-checkpoint-alignment-max-size
>
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> It might be some Kafka issue.
>>
>> From what you described your reasoning seems sound. For some reason TM3
>> fails and is unable to restart and process any data, thus forcing spilling
>> on checkpoint barriers on TM1 and TM2.
>>
>> I don’t know the reason behind java.lang.NoClassDefFoundError:
>> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to
>> be important in this case.
>>
>> 1. What Kafka version are you using? Have you looked for any known Kafka
>> issues with those symptoms?
>> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS
>> image? It might be some system issue.
>>
>> Piotrek
>>
>> On 28 Apr 2018, at 01:54, Elias Levy <fearsome.lucid...@gmail.com> wrote:
>>
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
>> failure the other day.  It seems that it started as some sort of network
>> event.
>>
>> It began with the 3rd TM starting to warn every 30 seconds about socket
>> timeouts while sending metrics to DataDog.  This latest for the whole
>> outage.
>>
>> Twelve minutes later, all TMs reported at nearly the same time that they
>> had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the
>> system attempted to recover it.  Then things got into a weird state.
>>
>> The following related for six or seven times for a period of about 40
>> minutes:
>>
>>1. TM attempts to restart the job, but only the first and second TMs
>>show signs of doing so.
>>2. The disk begins to fill up on TMs 1 and 2.
>>3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
>>org/apache/kafka/clients/NetworkClient$1 errors.  These were
>>mentioned on this list earlier this month.  It is unclear if the are 
>> benign.
>>4. The job dies when the disks finally fills up on 1 and 2.
>>
>>
>> Looking at the backtrace logged when the disk fills up, I gather that
>> Flink is buffering data coming from Kafka into one of my operators as a
>> result of a barrier.  The job has a two input operator, with one input the
>> primary data, and a secondary input for control commands.  It would appear
>> that for whatever reason the barrier for the control stream is not making
>> it to the operator, thus leading to the buffering and full disks.  Maybe
>> Flink scheduled the operator source of the control stream on the 3rd TM
>> which seems like it was not scheduling tasks?
>>
>> Finally the JM records that it 13 late messages for already expired
>> checkpoints (could they be from the 3rd TM?), the job is restored one more
>> time and it works.  All TMs report nearly at the same time that they can
>> now find the Kafka coordinator.
>>
>>
>> Seems like the 3rd TM has some connectivity issue, but then all TMs seems
>> to have a problem communicating with the Kafka coordinator at the same time
>> and recovered at the same time.  The TMs are hosted in AW

Re: Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> From top of my head I can imagine two solutions:
>>
>> 1. Override the default behaviour of the operator via for example
>> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>>
>
> That seems the safer, but more complicated path.
>

As we had already implemented the business logic in
a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper:
CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit =
processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
  .connect(stream2)
  .flatMap( new BusinessCoFlatMapFunction(params) )
.name("Operator")
.uid("op")

with:

stream1
  .connect(stream2)
  .transform("Operator", new
SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
  .uid("op")


Re: Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski 
wrote:

> From top of my head I can imagine two solutions:
>
> 1. Override the default behaviour of the operator via for example
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>

That seems the safer, but more complicated path.


2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or
> maybe Watermark#MAX_WATERMARK - 1 ?
>

That seems simpler, put potentially perilous if at some point in the future
there was some use to control stream watermarks.  Also, would it work if
there are no messages in the control stream?  Wouldn't that mean no
watermark would be emitted, even if they were hardcoded to Long.MAX_VALUE?
In which case, the operator default for the stream would be used, which
would still be Long.MIN_VALUE.


BTW, this reminds me of an issue I've mentioned previously, the
documentation is lacking on a description of how watermarks are processed
by operators.  E.g. when does a window emit watermarks?  what watermarks
does it emit?  That seems like a rather large omission, as one of the main
features of Flink is event time processing, which puts watermarks almost on
equal footing to data and data operations.  Just as the docs describe how
data is process, merged, etc, the same should be true for watermarks.


Multiple stream operator watermark handling

2018-05-24 Thread Elias Levy
Is there mechanism for a multiple stream operator to ignore watermarks from
one of the streams?

The use case is a multiple stream operator that consumes a primary stream
and a secondary control stream.  The control stream may only receive
messages in rare occasion, and possibly never.  The default behavior of the
operator is to only emit the lowest of the last watermark received from
each input stream.  That means that event time fails to advance if there
are no control messages.

I also notice that FLIP-17, the Side Input proposal, does not address this
issue, either in the Wiki or in the Google Docs.

Assuming there is no currently prescribed way to handle this, are folks
taking care of this by introducing a new Assigner after the multiple input
operator to generate watermarks?


Low Watermark: No Watermark

2018-05-20 Thread Elias Levy
Any reason the web UI would show No Watermark for the Low Watermark of a
job?  The job is using a punctuated timestamp assigner with watermarks, and
as far as I can tell it is generating watermarks properly.


Odd job failure

2018-04-27 Thread Elias Levy
We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
failure the other day.  It seems that it started as some sort of network
event.

It began with the 3rd TM starting to warn every 30 seconds about socket
timeouts while sending metrics to DataDog.  This latest for the whole
outage.

Twelve minutes later, all TMs reported at nearly the same time that they
had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
2147482640 rack: null) dead for group ZZZ").  The job terminated and the
system attempted to recover it.  Then things got into a weird state.

The following related for six or seven times for a period of about 40
minutes:

   1. TM attempts to restart the job, but only the first and second TMs
   show signs of doing so.
   2. The disk begins to fill up on TMs 1 and 2.
   3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
   org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on
   this list earlier this month.  It is unclear if the are benign.
   4. The job dies when the disks finally fills up on 1 and 2.


Looking at the backtrace logged when the disk fills up, I gather that Flink
is buffering data coming from Kafka into one of my operators as a result of
a barrier.  The job has a two input operator, with one input the primary
data, and a secondary input for control commands.  It would appear that for
whatever reason the barrier for the control stream is not making it to the
operator, thus leading to the buffering and full disks.  Maybe Flink
scheduled the operator source of the control stream on the 3rd TM which
seems like it was not scheduling tasks?

Finally the JM records that it 13 late messages for already expired
checkpoints (could they be from the 3rd TM?), the job is restored one more
time and it works.  All TMs report nearly at the same time that they can
now find the Kafka coordinator.


Seems like the 3rd TM has some connectivity issue, but then all TMs seems
to have a problem communicating with the Kafka coordinator at the same time
and recovered at the same time.  The TMs are hosted in AWS across AZs, so
all of them having connectivity issues at the same time is suspect.  The
Kafka node in question was up and other clients in our infrastructure seems
to be able to communicate with it without trouble.  Also, the Flink job
itself seemed to be talking to the Kafka cluster while restarting as it was
spilling data to disk coming from Kafka.  And the JM did not report any
reduction on available task slots, which would indicate connectivity issues
between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
record of trying to restore the job during the intermediate attempts.

What do folks make of it?


And a question for Flink devs, is there some reason why Flink does not stop
spilling messages to disk when the disk is going to fill up?  Seems like
there should be a configurable limit to how much data can be spilled before
back-pressure is applied to slow down or stop the source.


Re: Tracking deserialization errors

2018-04-18 Thread Elias Levy
Either proposal would work.  In the later case, at a minimum we'd need a
way to identify the source within the metric.  The basic error metric would
then allow us to go into the logs to determine the cause of the error, as
we already record the message causing trouble in the log.


On Mon, Apr 16, 2018 at 4:42 AM, Fabian Hueske  wrote:

> Thanks for starting the discussion Elias.
>
> I see two ways to address this issue.
>
> 1) Add an interface that a deserialization schema can implement to
> register metrics. Each source would need to check for the interface and
> call it to setup metrics.
> 2) Check for null returns in the source functions and increment a
> respective counter.
>
> In both cases, we need to touch the source connectors.
>
> I see that passing information such as topic name, partition, and offset
> are important debugging information. However, I don't think that metrics
> would be good to capture them.
> In that case, log files might be a better approach.
>
> I'm not sure to what extend the source functions (Kafka, Kinesis) support
> such error tracking.
> Adding Gordon to the thread who knows the internals of the connectors.
>
>


Tracking deserialization errors

2018-04-06 Thread Elias Levy
I was wondering how are folks tracking deserialization errors.
The AbstractDeserializationSchema interface provides no mechanism for the
deserializer to instantiate a metric counter, and "deserialize" must return
a null instead of raising an exception in case of error if you want your
job to continue functioning during a deserialization error.  But that means
such errors are invisible.

Thoughts?


Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-02-22 Thread Elias Levy
Something seems to be off with the user code class loader.  The only way I
can get my job to start is if I drop the job into the lib folder in the JM
and configure the JM's classloader.resolve-order to parent-first.

Suggestions?

On Thu, Feb 22, 2018 at 12:52 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I am currently suffering through similar issues.
>
> Had a job running happily, but when it the cluster tried to restarted it
> would not find the JSON serializer in it. The job kept trying to restart in
> a loop.
>
> Just today I was running a job I built locally.  The job ran fine.  I
> added two commits and rebuilt the jar.  Now the job dies when it tries to
> start claiming it can't find the time assigner class.  I've unzipped the
> job jar, both locally and in the TM blob directory and have confirmed the
> class is in it.
>
> This is the backtrace:
>
> java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
>   at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
>
>
> On Tue, Jan 23, 2018 at 7:51 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> We changed a few things between 1.3 and 1.4 concerning Avro. One of the
>> main things is that Avro is no longer part of the core Flink class library,
>> but needs to be packaged into your application jar file.
>>
>> The class loading / caching issues of 1.3 with respect to Avro should
>> disappear in Flink 1.4, because Avro classes and caches are scoped to the
>> job classloaders, so the caches do not go across different jobs, or even
>> different operators.
>>
>>
>> *Please check: Make sure you have Avro as a dependency in your jar file
>> (in scope "compile").*
>>
>> Hope that solves the issue.
>>
>> Stephan
>>
>>
>> On Mon, Jan 22, 2018 at 2:31 PM, Edward <egb...@hotmail.com> wrote:
>>
>>> Yes, we've seen this issue as well, though it usually takes many more
>>> resubmits before the error pops up. Interestingly, of the 7 jobs we run
>>> (all
>>> of which use different Avro schemas), we only see this issue on 1 of
>>> them.
>>> Once the NoClassDefFoundError crops up though, it is necessary to
>>> recreate
>>> the task managers.
>>>
>>> There's a whole page on the Flin

Re: NoClassDefFoundError of a Avro class after cancel then resubmit the same job

2018-02-22 Thread Elias Levy
I am currently suffering through similar issues.

Had a job running happily, but when it the cluster tried to restarted it
would not find the JSON serializer in it. The job kept trying to restart in
a loop.

Just today I was running a job I built locally.  The job ran fine.  I added
two commits and rebuilt the jar.  Now the job dies when it tries to start
claiming it can't find the time assigner class.  I've unzipped the job jar,
both locally and in the TM blob directory and have confirmed the class is
in it.

This is the backtrace:

java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)


On Tue, Jan 23, 2018 at 7:51 AM, Stephan Ewen  wrote:

> Hi!
>
> We changed a few things between 1.3 and 1.4 concerning Avro. One of the
> main things is that Avro is no longer part of the core Flink class library,
> but needs to be packaged into your application jar file.
>
> The class loading / caching issues of 1.3 with respect to Avro should
> disappear in Flink 1.4, because Avro classes and caches are scoped to the
> job classloaders, so the caches do not go across different jobs, or even
> different operators.
>
>
> *Please check: Make sure you have Avro as a dependency in your jar file
> (in scope "compile").*
>
> Hope that solves the issue.
>
> Stephan
>
>
> On Mon, Jan 22, 2018 at 2:31 PM, Edward  wrote:
>
>> Yes, we've seen this issue as well, though it usually takes many more
>> resubmits before the error pops up. Interestingly, of the 7 jobs we run
>> (all
>> of which use different Avro schemas), we only see this issue on 1 of them.
>> Once the NoClassDefFoundError crops up though, it is necessary to recreate
>> the task managers.
>>
>> There's a whole page on the Flink documentation on debugging classloading,
>> and Avro is mentioned several times on that page:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> monitoring/debugging_classloading.html
>>
>> It seems that (in 1.3 at least) each submitted job has its own
>> classloader,
>> and its own instance of the Avro class definitions. However, the Avro
>> class
>> cache will keep references to the Avro classes from classloaders for the
>> previous cancelled jobs. That said, we haven't been able to find a
>> solution
>> to this error yet. Flink 1.4 would be worth a try because the of the
>> changes
>> to the default classloading behaviour (child-first is the new default, not
>> parent-first).
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: SQL materialized upsert tables

2018-02-21 Thread Elias Levy
On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske  wrote:

> Hi Elias,
>
> Flink does not have built-in support for upsert stream -> table
> conversions, yet. However, the community is working on that (see FLINK-8545
> [1]).
> With a workaround, you can also solve the issue with what Flink supports
> so far.
>

Fabian,

Thanks for the reply.  Great to see some progress on this area.  If we
could implement this job in Flink rather than Kafka Stream it would mean
one less technology to support and to train our developers on, which is
always a plus.



> The approach with the MAX(tstamp) query was good idea, but the query needs
> another join predicate on time.
>
> tableEnv.sqlQuery("""
> SELECT a.tstamp, a.item, a.score, a.source
> FROM Telemetry a
>   INNER JOIN (
> SELECT MAX(tstamp) AS maxT, item, source
> FROM Telemetry
> GROUP BY item, source
>   ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
> """)
>
> Otherwise, the table will have multiple records for each combination of
> item and score as you noticed.
>
> HOWEVER, you might not want to use the query above because it will
> accumulate all records from Telemetry in state and never clean them up.
> The reason for this is that the query planner is not smart enough yet to
> infer that old records will never be joined (this is implied by the join
> condition on time).
>

Thanks for the correction.  But, yes, the indefinite accumulation is a deal
breakers for using this approach.


A better solution is to use a custom user-defined aggregation function [2]
> (LAST_VAL) that returns the value with associated max timestamp.
>
> SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
> FROM Telemetry
> GROUP BY item, source
>
> LAST_VAL would have an accumulator that stores a score and its associated
> timestamp.
> When a new (score, timestamp) pair is accumulated, the UDAGG compares the
> timestamps and only updates the accumulator if the new timestamp is larger.
>

I'll give this approach a try.


Btw. I'm not sure if KStreams only updates the KTable if the update has a
> higher timestamp or just take the last received record.
> That might be an issue with out-of-order data. I would check the behavior
> if you expect data with out-of-order timestamps.
>

I believe you are correct.  KS will attempt to consume records from across
partitions by attempting to align their timestamps, but it won't reorder
records within a partition, which can be problematic if you can't guarantee
ordered records within a partition.  While I talked about KTables, in
reality the job I wrote is a combination of the KS Stream DSL and Operator
API, to get around some of these issues.

The upsert stream table conversion that we are working on will support
> event time (max timestamp) or processing time (last value) upserts.
>

Excellent.


Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8545
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/udfs.html#aggregation-functions
>


Re: SQL materialized upsert tables

2018-02-20 Thread Elias Levy
[ Adding the list back in, as this clarifies my question ]

On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh 
wrote:

> I am no expert in Flink but I will try my best. Issue you mentioned will
> be with all streaming systems even with Kafka KTable I use them a lot for
> similar sort of requirements.
>
> In Kafka you have KTable on Telemetry with 3 records and join with say
> Scores which could be KTable or Kstrem  and you start your streaming query
> as mentioned above it will give just 1 row as expected. However, if there
> is a new value for the same key with timestamp greater than previous max
> will be added to the Telemetry it will output the new value as well and
> that is main idea about the streaming anyway you want to see the changed
> value. So once you started streaming you will get whatever is the outcome
> of your
>

Darshan,

Thanks for the reply.  I've already implemented this job using Kafka
Streams, so I am aware of how KTables behaves.  I would have helped if I
had included some sample data in my post, so here it is.  If you have this
data coming into Telemetry:

ts, item, score, source
0, item1, 1, source1
1, item1, 1, source1
2, item1, 1, source1

And this comes into Scores:

ts, item, score
3, item1, 3

Flink will output 3 records from the queries I mentioned:

(3, item1, 3, source1)
(3, item1, 3, source1)
(3, item1, 3, source1)


In contrast, if you run the query in Kafka Stream configuring Telemetry as
a KTable keyed by (item, source), the output will be a single record.  In
Telemetry record for key (item1, source1) at time 1 will overwrite the
record at time 0, and the record at time 2 will overwrite the one at time
1.  By the time the record at time 3 comes in via Scores, it will be joined
only with the record from time 2 in Telemetry.

Yes, it is possible for the Kafka Streams query to output multiple records
if the records from the different streams are not time aligned, as Kafka
Streams only guarantees a best effort aligning the streams. But in the
common case the output will be a single record.


I think in fllink you can do the same, from your telemeter stream/table you
> can create the LatestTelemetry table using similar sql(I am sure it should
> give you latest timestamp's data) as you did with the RDBMS and then join
> with scores table. You should get similar results to KTable or any other
> streaming system.
>

Not sure if you missed it, but I actually executed the query to define the
LatestTelemetry table in Flink using that query and joined against it.  The
output was the same three records.


SQL materialized upsert tables

2018-02-20 Thread Elias Levy
I noticed that has been significant work on the SQL / Table subsystem and
decided to evaluate it for one of our use cases.  The use case requires the
joining of two streams, which can be considered a stream of table upserts.
Critically, when joining the streams, we only want to join against the
latest value per key in one of the tables/streams.

Simply performing a join between the stream/tables is not sufficient, as it
will generate result of records other than the latest one.  E.g. if you
have two steam/tables with schema:

Telemetry [
  tstamp: Long
  item: String
  score: Int
  source: String
]

Scores [
  tstamp: Long
  item: String
  score: Int
]

tableEnv.sqlQuery("""
SELECT s.tstamp, s.item, s.score, t.source
FROMTelemetry t INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score AND s.tstamp >= t.tstamp
""")

If the stream receives 3 records from the telemetry stream for the same
source and then a record that matches the item from the score stream that
updates the score, it will generate three output records, even though we
only want the latest record from the source to be considered.

If this were a regular database we could do the following to only get the
latest records with the telemetry table:

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
SELECT MAX(tstamp), item, source
FROM Telemetry
GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source
""")

and then execute the previous query against this LatestTelemetry table
instead of Telemetry.  But that does not work.  The query executed within
Flink, but still outputs multiple records, regardless of the order the
records come into the source streams.

I am wondering if there is a way to accomplish this within Flink's
SQL/Table abstractions.

Kafka Streams has the concept of a KTable, where records are considered
upserts and update previous records that have the same key.  Thus, when you
join against a KTable, you only join against the latest record for a given
key, rather than all previous records for the key.

Thoughts?


Flink network access control documentation

2017-12-22 Thread Elias Levy
There is a need for better documentation on what connects to what over
which ports in a Flink cluster to allow users to configure network access
control rules.

I was under the impression that in a ZK HA configuration the Job Managers
were essentially independent and only coordinated via ZK.  But starting
multiple JMs in HA with the JM RPC port blocked between JMs shows that the
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No
response from remote for outbound association. Associate timed out after
[2 ms].
WARN  akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has
failed, address is now gated for [5000] ms. Reason: [Association failed
with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from
remote for outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport-
Remote connection to [null] failed with
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException:
connection timed out: /10.210.210.127:6123


Re: Flink flick cancel vs stop

2017-12-13 Thread Elias Levy
I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new
producer, when used with the exactly once semantics, has the rather
troublesome behavior that it will fallback to at-most-once, rather than
at-least-once, if the job is down for longer than the Kafka broker's
transaction.max.timeout.ms setting.

In situations that require extended maintenance downtime, this behavior is
nearly certain to lead to message loss, as a canceling a job while taking a
savepoint will not wait for the Kafka transactions to bet committed and is
not atomic.

So it seems like there is a need for an atomic stop or cancel with
savepoint that waits for transactional sinks to commit and then immediately
stop any further message processing.


On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> I would propose implementations of NewSource to be not
> blocking/asynchronous. For example something like
>
> public abstract Future getCurrent();
>
> Which would allow us to perform some certain actions while there are no
> data available to process (for example flush output buffers). Something
> like this came up recently when we were discussing possible future changes
> in the network stack. It wouldn’t complicate API by a lot, since default
> implementation could just:
>
> public Future getCurrent() {
>   return completedFuture(getCurrentBlocking());
> }
>
> Another thing to consider is maybe we would like to leave the door open
> for fetching records in some batches from the source’s input buffers?
> Source function (like Kafka) have some internal buffers and it would be
> more efficient to read all/deserialise all data present in the input buffer
> at once, instead of paying synchronisation/calling virtual method/etc costs
> once per each record.
>
> Piotrek
>
> On 22 Sep 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> @Eron Yes, that would be the difference in characterisation. I think
> technically all sources could be transformed by that by pushing data into a
> (blocking) queue and having the "getElement()" method pull from that.
>
> On 15. Sep 2017, at 20:17, Elias Levy <fearsome.lucid...@gmail.com> wrote:
>
> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwri...@gmail.com>
> wrote:
>
>> Aljoscha, would it be correct to characterize your idea as a 'pull'
>> source rather than the current 'push'?  It would be interesting to look at
>> the existing connectors to see how hard it would be to reverse their
>> orientation.   e.g. the source might require a buffer pool.
>>
>
> The Kafka client works that way.  As does the QueueingConsumer used by the
> RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those
> are all the bundled sources.
>
>
>
>


FlinkKafkaProducer011 and Flink 1.4.0 Kafka docs

2017-12-13 Thread Elias Levy
Looks like the Flink Kafka connector page, in the Producer section
,
is missing a section for the new FlinkKafkaProducer011 producer.  Given
that the new producer no longer has a static writeToKafkaWithTimestamps
method, it would be good to add a section that specifies that you must now
use DataStream.addSink.


Incremental checkpointing documentation

2017-11-02 Thread Elias Levy
There doesn't appear to be much in the way of documentation for incremental
checkpointing other than how to turn it on.  That leaves a lot of questions
unanswered.

What is the interaction of incremental checkpointing and external
checkpoints?

Any interaction with the state.checkpoints.num-retained config?

Does incremental checkpointing require any maintenance?

Any interaction with savepoints?

Does it perform better against certain "file systems"?  E.g. it S3 not
recommended for it?  How about EFS?


Re: Empty directories left over from checkpointing

2017-10-17 Thread Elias Levy
Stephan,

Thanks for taking care of this.  We'll give it a try once 1.4 drops.

On Sat, Oct 14, 2017 at 1:25 PM, Stephan Ewen <se...@apache.org> wrote:

> Some updates on this:
>
> Aside from reworking how the S3 directory handling is done, we also looked
> into supporting S3 different than we currently do. Currently support goes
> strictly through Hadoop's S3 file systems, which we need to change, because
> we want it to be possible to use Flink without Hadoop dependencies.
>
> In the next release, we will have S3 file systems without Hadoop
> dependency:
>
>   - One implementation wraps and shades a newer version of s3a. For
> compatibility with current behavior.
>
>   - The second is interesting for this directory problem: It uses Pesto's
> S3 support which is a bit different from Hadoop' s3n and s3a. It does not
> create empty directly marker files, hence it is not trying to make S3 look
> as much like a file system as s3a and s3n are, but that is actually of
> advantage for checkpointing. With that implementation, the here mentioned
> issue should not exist.
>
> Caveat: The new file systems and their aggressive shading needs to be
> testet at scale still, but we are happy to take any feedback on this.
>
> Merged as of https://github.com/apache/flink/commit/
> 991af3652479f85f732cbbade46bed7df1c5d819
>
> You can use them by simply dropping the respective JARs from "/opt" into
> "/lib" and using the file system scheme "s3://".
> The configuration is as in Hadoop/Presto, but you can drop the config keys
> into the Flink configuration - they will be forwarded to the Hadoop
> configuration.
>
> Hope that this makes the S3 use a lot easier and more fun...
>
>
> On Wed, Sep 20, 2017 at 2:49 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> We recently removed some cleanup code, because it involved checking some
>> store meta data to check when we can delete a directory. For certain stores
>> (like S3), requesting this meta data whenever we delete a file was so
>> expensive that it could bring down the job because removing state could not
>> be processed fast enough. We have a temporary fix in place now, so that
>> jobs at large scale can still run reliably on stores like S3. Currently,
>> this comes at the cost of not cleaning up directories but we are clearly
>> planning to introduce a different mechanism for directory cleanup in the
>> future that is not as fine grained as doing meta data queries per file
>> delete. In the meantime, unfortunately the best way is to cleanup empty
>> directories with some external tool.
>>
>> Best,
>> Stefan
>>
>> Am 20.09.2017 um 01:23 schrieb Hao Sun <ha...@zendesk.com>:
>>
>> Thanks Elias! Seems like there is no better answer than "do not care
>> about them now", or delete with a background job.
>>
>> On Tue, Sep 19, 2017 at 4:11 PM Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>>
>>> There are a couple of related JIRAs:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-7587
>>> https://issues.apache.org/jira/browse/FLINK-7266
>>>
>>>
>>> On Tue, Sep 19, 2017 at 12:20 PM, Hao Sun <ha...@zendesk.com> wrote:
>>>
>>>> Hi, I am using RocksDB and S3 as storage backend for my checkpoints.
>>>> Can flink delete these empty directories automatically? Or I need a
>>>> background job to do the deletion?
>>>>
>>>> I know this has been discussed before, but I could not get a concrete
>>>> answer for it yet. Thanks
>>>>
>>>> 
>>>>
>>>
>>>
>>
>


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-26 Thread Elias Levy
I presume then that the Job Managers and Task Managers are performing
service discovery via Zookeeper in HA mode, rather than from the config
file or the masters file.  Yes?

On Mon, Sep 25, 2017 at 11:14 PM, Till Rohrmann <trohrm...@apache.org>
wrote:

> Because a single port could easily lead to clashes if there is another
> JobManager running on the same machine with the same port (e.g. due to
> standby JobManagers).
>
> Cheers,
> Till
>
> On Sep 26, 2017 03:20, "Elias Levy" <fearsome.lucid...@gmail.com> wrote:
>
>> Why a range instead of just a single port in HA mode?
>>
>> On Mon, Sep 25, 2017 at 1:49 PM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Yes, with Flip-6 it will most likely look like how Stephan described it.
>>> We need the explicit port in standalone mode so that TMs can connect to the
>>> JM. In the other deployment scenarios, the port can be randomly picked
>>> unless you want to specify a port range, e.g. for firewall configuration
>>> purposes.
>>>
>>


FIP-6: Job specific Docker images status?

2017-09-25 Thread Elias Levy
I was wondering what is the status of support for job specific Docker
images, meaning images that combine the job jars with the job manager, do
not require job submission, and automatically execute the job when there
are enough task managers registered with the job manager to satisfy the
job's parallelism.

I could not tell as the FLINK-4319 FIP-6 umbrella issue does not seem to
have an entry for it.


Re: high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-25 Thread Elias Levy
Why a range instead of just a single port in HA mode?

On Mon, Sep 25, 2017 at 1:49 PM, Till Rohrmann  wrote:

> Yes, with Flip-6 it will most likely look like how Stephan described it.
> We need the explicit port in standalone mode so that TMs can connect to the
> JM. In the other deployment scenarios, the port can be randomly picked
> unless you want to specify a port range, e.g. for firewall configuration
> purposes.
>


high-availability.jobmanager.port vs jobmanager.rpc.port

2017-09-23 Thread Elias Levy
I am wondering why HA mode there is a need for a separate config parameter
to set the JM RPC port (high-availability.jobmanager.port) and why this
parameter accepts a range, unlike jobmanager.rpc.port.


History Server

2017-09-23 Thread Elias Levy
I am curious, why is the History Server a separate process and Web UI
instead of being part of the Web Dashboard within the Job Manager?


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Elias Levy
Till,

Using 1.3.2 and like Ufuk mentioned, using S3 for checkpointing.

On Tue, Sep 19, 2017 at 4:28 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Elias,
>
> which version of Flink and which state backend are you running? I tried to
> reproduce it and wasn't successful so far.
>
> We recently changed a bit how we load the GlobalConfiguration in
> combination with dynamic properties [1]. Maybe this has affected what
> you've reported as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-7269
>
> Cheers,
> Till
>
> On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
>> Is there a particular reason that GlobalConfiguration is so noisy?
>>
>> The task manager log is full of "Loading configuration property" messages
>> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
>> configuration read once?
>>
>
>


Re: Flink flick cancel vs stop

2017-09-15 Thread Elias Levy
On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright  wrote:

> Aljoscha, would it be correct to characterize your idea as a 'pull' source
> rather than the current 'push'?  It would be interesting to look at the
> existing connectors to see how hard it would be to reverse their
> orientation.   e.g. the source might require a buffer pool.
>

The Kafka client works that way.  As does the QueueingConsumer used by the
RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those
are all the bundled sources.


Re: Assigning operators to slots

2017-09-13 Thread Elias Levy
The execution within the IDE is most likely not loading the flink-conf.yaml
file to read the configuration.  When run from the IDE you get a
LocalStreamEnvironment, which starts a LocalFlinkMiniCluster.
LocalStreamEnvironment is created by
StreamExecutionEnvironment.createLocalEnvironment without passing it any
configuration.  So none
of StreamExecutionEnvironment., LocalStreamEnvironment,
and LocalFlinkMiniCluster try to read the config file.

This makes it difficult to test certain Flink features from within the IDE,
as some configuration properties can't be set programmatically.  For
instance, you can't configure the external checkpoint URL in code.  It can
only be yet in the config file.  That means you can't run a job that turns
on external checkpoints from within the IDE.

Ideally one of these components would try load the config file when
executing locally.  You could then point it to the config file via
the FLINK_CONF_DIR environment variable.


On Fri, Sep 8, 2017 at 8:47 AM, AndreaKinn  wrote:

> UPDATE:
>
> I'm trying to implement the version with one node and two task slots on my
> laptop. I have also in configured flink-conf.yaml the key:
>
> taskmanager.numberOfTaskSlots: 2
>
> but when I execute my program in the IDE:
>
> /org.apache.flink.runtime.jobmanager.scheduler.
> NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. /
>
> parallelism is set 1.
>
> Which could be the problem?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Flink flick cancel vs stop

2017-09-13 Thread Elias Levy
Anyone?

On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I was wondering about the status of the flink stop command.  At first
> blush it would seem as the preferable way to shutdown a Flink job, but it
> depends on StoppableFunction being implemented by sources and I notice that
> the Kafka source does not seem to implement it.  In addition, the command
> does not -s  --withSavepoint like cancel does.
>
> Is stop deprecated?
>


Flink flick cancel vs stop

2017-09-11 Thread Elias Levy
I was wondering about the status of the flink stop command.  At first blush
it would seem as the preferable way to shutdown a Flink job, but it depends
on StoppableFunction being implemented by sources and I notice that the
Kafka source does not seem to implement it.  In addition, the command does
not -s  --withSavepoint like cancel does.

Is stop deprecated?


FLIP-17: Side Inputs

2017-09-10 Thread Elias Levy
A bit late to this discussion, but I wanted to reiterate something that
others also said. Side input readiness, and blocking until that is the
case, is an important feature.  This is specially true when the side input
is used as a configuration stream.  You don't want the main stream to be
processed until at least the minimal required configuration is loaded.

I'll also note that Kafka Streams has struggled with the same problem (
KAFKA-4113 ), but it has
the advantage that KS prioritizes consumption from sources based on
timestamp.  So with KS if your KTable config records have an earlier
timestamp than the stream records they are joined with, they will be
consumed first (although it does so on a best effort basis).


Re: can flink do streaming from data sources other than Kafka?

2017-09-07 Thread Elias Levy
If you want to ensure you see all changes to a Cassandra table, you need to
make use of the Change Data Capture
 feature.  For
that, you'll need code running on the Cassandra nodes to read the commit
log segments from the Cassandra CDC directory.  Given that you need to read
those files on the Cassandra nodes, and that Cassandra will stop accepting
writes to the tables if the CDC directory fills up beyond a configurable
size as a result of segments not being read, you probably don't want to
implement the segment reader within Flink.

The folks at DataMountainer have developed a Kafka Connect
 connector for Cassandra's
CDC .  That
would allow you to stream Cassandra table changes out of Cassandra and into
Kafka, where they can be consumed by Flink.  The connector is part of their
stream-reactor 
repo.



On Thu, Sep 7, 2017 at 1:34 AM, kant kodali  wrote:

> Yes I can indeed create them but I wonder if that is even possible? I
> haven't see any framework doing this as of today. Flink has something
> called AsyncDataStream? and I wonder if this can be leveraged to create a
> Stream out of Cassandra source?
>
> Thanks!
>
> On Thu, Sep 7, 2017 at 1:16 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Ah, I see. I’m not aware of any existing work / JIRAs on streaming
>> sources for Cassandra or HBase, only sinks.
>> If you are interested in one, could you open JIRAs for them?
>>
>>
>> On 7 September 2017 at 4:11:05 PM, kant kodali (kanth...@gmail.com)
>> wrote:
>>
>> Hi Gordon,
>>
>> Thanks for the response, I did go over the links for sources and sinks
>> prior to posting my question. Maybe, I didn't get my question across
>> correctly so let me rephrase it. Can I get data out of data stores like
>> Cassandra, Hbase in a streaming manner? coz, currently more or less all the
>> sources are of message queue family.
>>
>> Thanks,
>> Kant
>>
>> On Thu, Sep 7, 2017 at 1:04 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi!
>>>
>>>
>>> I am wondering if Flink can do streaming from data sources other than
>>> Kafka. For example can Flink do streaming from a database like Cassandra,
>>> HBase, MongoDb to sinks like says Elastic search or Kafka.
>>>
>>>
>>> Yes, Flink currently supports various connectors for different sources
>>> and sinks. For an overview you can check out this documentation [1]
>>> Apache Bahir [2] also maintains some Flink connectors and is released
>>> separately.
>>>
>>> Also for out of core stateful streaming. Is RocksDB the only option?
>>>
>>> Currently, RocksDB is the only option for out-of-core state. There was
>>> some previous discussion for a Cassandra state backend, though [3].
>>>
>>> - Gordon
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>>> 1.3/dev/connectors/index.html
>>> [2] http://bahir.apache.org/
>>> [3] https://issues.apache.org/jira/browse/FLINK-4266
>>>
>>> On 7 September 2017 at 2:58:38 PM, kant kodali (kanth...@gmail.com)
>>> wrote:
>>>
>>> Hi All,
>>>
>>> I am wondering if Flink can do streaming from data sources other than
>>> Kafka. For example can Flink do streaming from a database like Cassandra,
>>> HBase, MongoDb to sinks like says Elastic search or Kafka.
>>>
>>> Also for out of core stateful streaming. Is RocksDB the only option? Can
>>> I use some other key value store that has SQL interface (since RocksDB
>>> doesn't)?
>>>
>>> Thanks,
>>> kant
>>>
>>>
>>
>


Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

2017-08-30 Thread Elias Levy
On Wed, Aug 30, 2017 at 11:50 AM, Oleksandr Baliev <
aleksanderba...@gmail.com> wrote:

>
> So the main question is how to synchronize data reading between kafka
> partitions when data is sequential per partitions, but late for some of
> them and we care about that data is not thrown away and will be fully
> processed for some time range (window) later? :) It's more about manually
> handling consumption on Kafka Fetch level and FlinkKafka* is high level for
> that, isn't it?
>


At some point you have to give up on late data and drop it if you are
performing some window computation.  That said, that could be a long time,
allowing for very out of order data. Presumably most data won't be late,
and you want to output preliminary results to have timely data.  In that
case you want to implement a window trigger that fires early at regular
intervals without purging if it has received new events since the last time
it fired and purges the data once the allowed lateness time passes.

For instance, see this EventTimeTriggerWithEarlyAndLateFiring
 in
Java or this simplified EarlyFiringEventTimeTrigger
 in
Scala.


Re: metrics for Flink sinks

2017-08-30 Thread Elias Levy
Not an exact match, but I am guessing it is related to FLINK-7286
, which I
reported.  Feel free to modify that issue to cover the root cause.

On Wed, Aug 30, 2017 at 8:32 AM, Martin Eden 
wrote:

> Thanks Chesnay,
>
> Just for completeness, are there any relevant tickets for the discussion
> that one can follow, upvote, contribute to?
>
> M
>
> On Tue, Aug 29, 2017 at 8:57 PM, Chesnay Schepler 
> wrote:
>
>> Hello,
>>
>> 1. Because no one found time to fix it. In contrast to the remaining
>> byte/record metrics, input metrics for sources / output metrics for sinks
>> have to be implemented for every single implementation with their
>> respective semantics. In contrast, the output metrics are gathered in the
>> intersection between operators, independent of the actual operator
>> implementation. Furthermore, this requires system metrics (i.e. metrics
>> that Flink itself creates) to be exposed (and be mutable!) to user-defined
>> functions, which is something i *generally *wanted to avoid, but it
>> appears to be a big enough pain point to make an exception here.
>>
>> 2. Due to the above it is currently not possible without modifications of
>> the code to know how many reads/writes were made.
>>
>> 3. Do you mean aggregated metrics? The web UI allows the aggregation of
>> record/byte metrics on the task level. Beyond that we defer aggregation to
>> actual time-series databases that specialize in these things.
>>
>>
>> On 28.08.2017 19:08, Martin Eden wrote:
>>
>> Hi all,
>>
>> Just 3 quick questions both related to Flink metrics, especially around
>> sinks:
>>
>> 1. In the Flink UI Sources always have 0 input records / bytes and Sinks
>> always have 0 output records / bytes? Why is it like that?
>>
>> 2. What is the best practice for instrumenting off the shelf Flink sinks?
>>
>> Currently the only metrics available are num records/bytes in and out at
>> the operator and task scope. For the task scope there are extra buffer
>> metrics. However the output metrics are always zero (see question 1). How
>> can one know the actual number of successful writes done by an off the
>> shelf Flink sink? Or the latency of the write operation?
>>
>> 3. Is it possible to configure Flink to get global job metrics for all
>> subtasks of an operator? Or are there any best practices around that?
>>
>> Thanks,
>> M
>>
>>
>>
>


Re: Kafka consumer are too fast for some partitions in "flatMap" like jobs

2017-08-29 Thread Elias Levy
How many partitions does the output topic have?  If it has the same number
of partitions as the input topic (30), have you considered simply using a
custom partitioner for the Kafka sink that uses the input partition number
as the output partition number?  If the input messages are ordered per
input partition, that would guarantee their order in the output partitions.

On Tue, Aug 29, 2017 at 1:54 AM, Oleksandr Baliev  wrote:

> Hello,
>
> There is one Flink job which consumes from Kafka topic (TOPIC_IN), simply
> flatMap / map data and push to another Kafka topic (TOPIC_OUT).
> TOPIC_IN has around 30 partitions, data is more or less sequential per
> partition and the job has parallelism 30. So in theory there should be 1:1
> mapping between consumer and partition.
>
> But it's often to see big lag in offsets for some partitions. So that
> should mean that some of consumers are slower than another (i.e. some
> network issues for particular broker host or anything else). So data in
> TOPIC_OUT partitions is distributed but not sequential at all.
>
> So when some another flink job consumes from TOPIC_OUT and uses
> BoundedOutOfOrdernessTimestampExtractor to generate watermarks, due to
> difference in data timestamps, there can be a lot of late data. Maybe
> something is missing of course in this setup or there is more good approach
> for such flatMap / map jobs.
>
> Setting big WindowedStream#allowedLateness or giving more time for
> BoundedOutOfOrdernessTimestampExtractor will increase memory consumption
> and probably will cause another issues and anyway there can be late data
> which is not good for later windows.
>
> One of the solution is to have some shared place, to synchronize lower
> timestamp between consumers and somehow slow down consumption (Thread
> sleep, wait, while loop with condition...).
>
> 0. Is there any good approach to handle such "Kafka <-  flatMap / map ->
> Kafka" tasks? so data in TOPIC_OUT will be sequential as in TOPIC_IN.
>
> 1. As far as I see it should be common problem with some slow consumers
> for big Kafka topic with a lot of partitions, isn't it? How Flink/Kafka
> hadle it?
>
> 2. Does somebody know, is there any mechanism in Flink - Kafka,
> (backpreassure?), which can tell from child operator (some process function
> for example) to specific fast consumers to slow down a bit? Is something
> like callback possible in Flink, don't think so, but..?
>
> 3. Or is there in Flink already anything which can help to synchronize
> minimum timestamps between consumers and?
>
> 4. Is there any good approach to slow down consumption in Kafka consumer?
> There should be some problems between session timeout and poll I think or
> something related to that, but maybe there is already some good solution
> for that :)
>
> Will be glad if somebody can give some hints for any of the questions,
>
> Best,
> Sasha
>


Re: Global State and Scaling

2017-08-21 Thread Elias Levy
Looks like Gerard asked something along similar lines
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-operator-state-treating-state-of-all-parallel-operators-as-the-same-td14102.html>
just last month and that there is a JIRA
<https://issues.apache.org/jira/browse/FLINK-4940> for official support for
broadcast state.  Looks like the ugly hack is the way to go for now.


On Mon, Aug 21, 2017 at 1:23 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I am implementing a control stream.  The stream communicates a global
> configuration value for the whole job.  It uses DataStream.broadcast() to
> communicate this to all parallel operator instances.  I would like to save
> this value in state so that it can be recovered when the job
> restarts/recovers.  The control stream is not keyed, so the only option is
> Operator state.
>
> I could implement this using the ListCheckpointed interface, returning
> Collections.singletonList(configValue) from snapshotState.  It is clear
> what I'd need to do in restoreState in the case of scale in.  If I include
> a serial number in the config, and it receives multiple values on restore,
> it can keep the config value with the largest serial number, indicating the
> latest config.
>
> Alas, it is not clear what should happen on scale out, as some operator
> instances will receive empty lists.
>
> It seems the other alternative is to use CheckpointedFunction, along with
> union redistribution via getUnionListState, and then have each operator
> instance select from the union list the config with the latest serial
> number, of which they should be multiple copies.  But this seem like an
> ugly hack.
>
>
> In addition, the documentation is unclear on the relationship and effect,
> if any, of the maximum parallelism Flink job parameter on operator state,
> where as it is much clearer on this regard as it related to keyed state via
> key groups.
>
>
> How are folks handling this use case, i.e. storing and restoring global
> config values via Flink state?
>
>


Global State and Scaling

2017-08-21 Thread Elias Levy
I am implementing a control stream.  The stream communicates a global
configuration value for the whole job.  It uses DataStream.broadcast() to
communicate this to all parallel operator instances.  I would like to save
this value in state so that it can be recovered when the job
restarts/recovers.  The control stream is not keyed, so the only option is
Operator state.

I could implement this using the ListCheckpointed interface, returning
Collections.singletonList(configValue) from snapshotState.  It is clear
what I'd need to do in restoreState in the case of scale in.  If I include
a serial number in the config, and it receives multiple values on restore,
it can keep the config value with the largest serial number, indicating the
latest config.

Alas, it is not clear what should happen on scale out, as some operator
instances will receive empty lists.

It seems the other alternative is to use CheckpointedFunction, along with
union redistribution via getUnionListState, and then have each operator
instance select from the union list the config with the latest serial
number, of which they should be multiple copies.  But this seem like an
ugly hack.


In addition, the documentation is unclear on the relationship and effect,
if any, of the maximum parallelism Flink job parameter on operator state,
where as it is much clearer on this regard as it related to keyed state via
key groups.


How are folks handling this use case, i.e. storing and restoring global
config values via Flink state?


Re: Prioritize DataStream

2017-08-21 Thread Elias Levy
Flink folks,

A response to the question below?

On Sat, Aug 19, 2017 at 11:02 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I believe the answer to this question is "no", but I figure I might as
> well ask.  Is there a way to prioritize a stream?
>
> The use case is prioritization of a control stream.  This is mostly needed
> on start-up, where a job might start consuming from the data stream before
> consuming from the control stream.
>
>
>


Prioritize DataStream

2017-08-19 Thread Elias Levy
I believe the answer to this question is "no", but I figure I might as well
ask.  Is there a way to prioritize a stream?

The use case is prioritization of a control stream.  This is mostly needed
on start-up, where a job might start consuming from the data stream before
consuming from the control stream.


Re: CEP memory requirements

2017-05-04 Thread Elias Levy
Looking at the code I gather that 1.2 does not clear the per key NFA state
even if there is no state to keep, whereas this appears fixed in the master
branch. Yes?

On Thu, May 4, 2017 at 11:25 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I am observing odd memory behavior with the CEP library and I am wondering
> if it is expected.
>
> If I write a simple local streaming Flink job that reads from a 65MB
> compressed file of JSON objects, one per line, parses the JSON, performs a
> filter operation, and then a keyBy, heap usage is stable, staying below
> 250MB throughout per VisualVM.
>
> But if I create a CEP pattern that matches nothing
> (Pattern.begin[T]("foo").where( _ => false )) and match it against the
> stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
> then memory balloons until the program terminates, steadily growing until
> 3GB.
>
> The VisualVM memory profiler appears unable to account for that used heap
> space.  If I add the Live Bytes column I'd get only between 200-100 MB.
>
> Any idea what is going on?
>
> Flink 1.2.  Java 8.
>
>


CEP memory requirements

2017-05-04 Thread Elias Levy
I am observing odd memory behavior with the CEP library and I am wondering
if it is expected.

If I write a simple local streaming Flink job that reads from a 65MB
compressed file of JSON objects, one per line, parses the JSON, performs a
filter operation, and then a keyBy, heap usage is stable, staying below
250MB throughout per VisualVM.

But if I create a CEP pattern that matches nothing
(Pattern.begin[T]("foo").where( _ => false )) and match it against the
stream produced by the last keyBy (CEP.pattern(stream, pattern).select),
then memory balloons until the program terminates, steadily growing until
3GB.

The VisualVM memory profiler appears unable to account for that used heap
space.  If I add the Live Bytes column I'd get only between 200-100 MB.

Any idea what is going on?

Flink 1.2.  Java 8.


Re: RocksDB error with flink 1.2.0

2017-05-02 Thread Elias Levy
Any reason they can't share a single RocksDB state backend instance?


On Fri, Apr 28, 2017 at 8:44 AM, Aljoscha Krettek 
wrote:

> The problem here is that this will try to open 300 RocksDB instances on
> each of the TMs (depending on how the parallelism is spread between the
> machines this could be more or less). As the exception says, this will open
> too many files because each RocksDB instance has a directory with several
> files in it.
>
> One possible solution would be to increase the limit on open files but I
> don’t think that opening 300 RocksDB instances on one machine is a good
> idea for any size of machine. I think with this many patterns you could
> start thinking about writing the pattern matching yourself and multiplexing
> the several patterns in one stateful function or operator.
>
> @Stefan, what do you think about having this many Rocks instances?
>


Re: CEP join across events

2017-04-27 Thread Elias Levy
It would be useful if there were a cleaner syntax for specifying
relationships between matched events, as in an SQL join, particularly for
conditions with a quantifier of one.

At the moment you have to do something like

Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.where({ (next, ctx) =>
  val first = ctx.getEventsForPattern("first").next
  first.bar == next.bar && next => next.boo = "x"
})

which is not very clean.  It would friendlier if you could do something
like:

Pattern.
  .begin[Foo]("first")
.where( first => first.baz == 1 )
  .followedBy("next")
.relatedTo("first", { (first, next) => first.bar == next.bar })
.where( next => next.boo = "x" )



On Thu, Apr 27, 2017 at 1:21 AM, Kostas Kloudas  wrote:

> Glad that this is not a blocker for you and
> you are right that we should clarify it better in the documentation.
>


Re: CEP join across events

2017-04-26 Thread Elias Levy
You are correct.  Apologies for the confusion.  Given
that ctx.getEventsForPattern returns an iterator instead of a value and
that the example in the documentation deals with summing multiple matches,
I got the impression that the call would return all previous matches
instead of one at a time for each branch.

I suppose it returns an iterator to support patterns where the event has
some associated enumerator, like times(), zeroOrMore(), or oneOrMore(), yes?

Might be helpful to clarify this and point out that the iterator will
contain a single value for the common case of match with a enumerator of
one, which is the default.


On Wed, Apr 26, 2017 at 2:15 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Elias,
>
> If I understand correctly your use case, you want for an input:
>
> event_1 = (type=1, value_a=K, value_b=X)
> event_2 = (type=2, value_a=K, value_b=X)
> event_3 = (type=1, value_a=K, value_b=Y)
>
> to get a match:
>
> event_1, event_2
>
> and discard event_3, right?
>
> In this case, Dawid is correct and from a first look at your code, it
> should work.
> If not, what is the output that you get?
>
> Kostas
>
>
> On Apr 26, 2017, at 8:39 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com>
> wrote:
>
> Hi Elias,
>
> You can do it with 1.3 and IterativeConditions. Method
> ctx.getEventsForPattern("foo") returns only those events that were matched
> in "foo" pattern in that particular branch.
> I mean that for a sequence like (type =1, value_b = X); (type=1,
> value_b=Y); (type=2, value_b=X) both events of type = 1 create a seperate
> pattern branch and the event with type = 2 will be checked for a match
> twice for both of those branches.
>
> Regards,
> Dawid
>
> 2017-04-26 7:48 GMT+02:00 Elias Levy <fearsome.lucid...@gmail.com>:
>
>> There doesn't appear to be a way to join events across conditions using
>> the CEP library.
>>
>> Consider events of the form (type, value_a, value_b) on a stream keyed by
>> the value_a field.
>>
>> Under 1.2 you can create a pattern that for a given value_a, as specified
>> by the stream key, there is a match if an event of type 1 is followed by an
>> event of type 2 (e.g. begin("foo").where(_.type==1).
>> followedBy("bar").where(_.type==2).  But this will return a match
>> regardless of whether value_b in the first event matches value_b in the
>> second event.
>>
>> 1.3 snapshot introduces iterative conditions, but this is insufficient.
>> In 1.3 you can do:
>>
>> begin("foo").where(_.type==1).followedBy("bar").where(
>> (v, ctx) => {
>>v.type == 2 &&
>>ctx.getEventsForPattern("foo").asScala.exists(prev =>
>> prev.value_b == v.value_b)
>> })
>>
>> This will accept the current event if any if any previously had a value_b
>> that matches the current event. But the matches will include all previous
>> events, even those that did not match the current event at value_b, instead
>> of only matching the previous event where value_b equals the current event.
>>
>> Is there a way to only output the match there previous event matches the
>> current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
>> == (type=2, value_a=K, value_b=X)?
>>
>>
>>
>
>


Re: Re-keying / sub-keying a stream without repartitioning

2017-04-26 Thread Elias Levy
On Wed, Apr 26, 2017 at 5:11 AM, Aljoscha Krettek 
wrote:

> I did spend some time thinking about this and we had the idea for a while
> now to add an operation like “keyByWithoutPartitioning()” (name not final
> ;-) that would allow the user to tell the system that we don’t have to do a
> reshuffle. This would work if the key-type (and keys) would stay exactly
> the same.
>
> I think it wouldn’t work for your case because the key type changes and
> elements for key (A, B) would normally be reshuffled to different instances
> than with key (A), i.e. (1, 1) does not belong to the same key-group as
> (1). Would you agree that this happens in your case?
>

It happens if I use keyBy().  But there is no need for it to happen, which
is why I was asking about rekeying without repartitioning.  The stream is
already partitioned by A, so all elements of a new stream keyed by (A,B)
are already being processed by the local task.  Reshuffling as a result of
rekeying would have no benefit and would double the network traffic.  It is
why I suggested subKey(B) may be a good to clearly indicate that the new
key just sub-partitions the existing key partition without requiring
reshuffling.

Why would you not be able to use a different key type with
keyByWithoutRepartitioning()?


CEP join across events

2017-04-25 Thread Elias Levy
There doesn't appear to be a way to join events across conditions using the
CEP library.

Consider events of the form (type, value_a, value_b) on a stream keyed by
the value_a field.

Under 1.2 you can create a pattern that for a given value_a, as specified
by the stream key, there is a match if an event of type 1 is followed by an
event of type 2 (e.g.
begin("foo").where(_.type==1).followedBy("bar").where(_.type==2).  But this
will return a match regardless of whether value_b in the first event
matches value_b in the second event.

1.3 snapshot introduces iterative conditions, but this is insufficient.  In
1.3 you can do:

begin("foo").where(_.type==1).followedBy("bar").where(
(v, ctx) => {
   v.type == 2 &&
   ctx.getEventsForPattern("foo").asScala.exists(prev => prev.value_b
== v.value_b)
})

This will accept the current event if any if any previously had a value_b
that matches the current event. But the matches will include all previous
events, even those that did not match the current event at value_b, instead
of only matching the previous event where value_b equals the current event.

Is there a way to only output the match there previous event matches the
current event value_b (e.g. foo == (type=1, value_a=K, value_b=X) and bar
== (type=2, value_a=K, value_b=X)?


Re: Re-keying / sub-keying a stream without repartitioning

2017-04-25 Thread Elias Levy
Anyone?

On Fri, Apr 21, 2017 at 10:15 PM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> This is something that has come up before on the list, but in a different
> context.  I have a need to rekey a stream but would prefer the stream to
> not be repartitioned.  There is no gain to repartitioning, as the new
> partition key is a composite of the stream key, going from a key of A to a
> key of (A, B), so all values for the resulting streams are already being
> rerouted to the same node and repartitioning them to other nodes would
> simply generate unnecessary network traffic and serde overhead.
>
> Unlike previous use cases, I am not trying to perform aggregate
> operations.  Instead I am executing CEP patterns.  Some patterns apply the
> the stream keyed by A and some on the stream keyed by (A,B).
>
> The API does not appear to have an obvious solution to this situation.
> keyBy() will repartition and there is isn't something like subKey() to
> subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).
>
> I suppose I could accomplish it by using partitionCustom(), ignoring the
> second element in the key, and delegating to the default partitioner
> passing it only the first element, thus resulting in no change of task
> assignment.
>
> Thoughts?
>


Re-keying / sub-keying a stream without repartitioning

2017-04-21 Thread Elias Levy
This is something that has come up before on the list, but in a different
context.  I have a need to rekey a stream but would prefer the stream to
not be repartitioned.  There is no gain to repartitioning, as the new
partition key is a composite of the stream key, going from a key of A to a
key of (A, B), so all values for the resulting streams are already being
rerouted to the same node and repartitioning them to other nodes would
simply generate unnecessary network traffic and serde overhead.

Unlike previous use cases, I am not trying to perform aggregate
operations.  Instead I am executing CEP patterns.  Some patterns apply the
the stream keyed by A and some on the stream keyed by (A,B).

The API does not appear to have an obvious solution to this situation.
keyBy() will repartition and there is isn't something like subKey() to
subpartion a stream without repartitioning (e.g. keyBy(A).subKey(B)).

I suppose I could accomplish it by using partitionCustom(), ignoring the
second element in the key, and delegating to the default partitioner
passing it only the first element, thus resulting in no change of task
assignment.

Thoughts?


Does Flink DataStreams using combiners?

2016-08-11 Thread Elias Levy
I am wondering if Flink makes use of combiners to pre-reduce a keyed and
windowed stream before shuffling the data among workers.

I.e. will it use a combiner in something like:

stream.flatMap {...}
  .assignTimestampsAndWatermarks(...)
  .keyBy(...)
  .timeWindow(...)
  .trigger(...)
  .sum("cnt")

or will it shuffle the keyed input before the sum reduction?

If it does make use of combiners, it would be useful to point this out in
the documentation, particularly if it only applies to certain types of
reducers, folds, etc.


Arrays values in keyBy

2016-06-10 Thread Elias Levy
I would be useful if the documentation warned what type of equality it
expected of values used as keys in keyBy.  I just got bit in the ass by
converting a field from a string to a byte array.  All of the sudden the
windows were no longer aggregating.  So it seems Flink is not doing a deep
compare of arrays when comparing keys.


Re: FlinkKafkaProducer API

2016-06-09 Thread Elias Levy
On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske  wrote:

> thanks for your feedback. I think those are good observations and
> suggestions to improve the Kafka producers.
> The best place to discuss such improvements is the dev mailing list.
>
> Would like to repost your mail there or open JIRAs where the discussion
> about these changes can continue?


I opened FLINK-4050.  Since the JIRAs are posted to the dev list, I won't
cross post.

Cheers,
Elias


FlinkKafkaProducer API

2016-06-08 Thread Elias Levy
The FlinkKafkaProducer API seems more difficult to use than it should be.

The API requires you pass it a SerializationSchema or a
KeyedSerializationSchema, but the Kafka producer already has a
serialization API.  Requiring a serializer in the Flink API precludes the
use of the Kafka serializers.  For instance, they preclude the use of the
Confluent KafkaAvroSerializer class that makes use of the Confluent Schema
Registry.  Ideally, the serializer would be optional, so as to allow the
Kafka producer serializers to handle the task.

In addition, the KeyedSerializationSchema conflates message key extraction
with key serialization.  If the serializer were optional, to allow the
Kafka producer serializers to take over, you'd still need to extract a key
from the message.

And given that the key may not be part of the message you want to write to
Kafka, an upstream step may have to package the key with the message to
make both available to the sink, for instance in a tuple. That means you
also need to define a method to extract the message to write to Kafka from
the element passed into the sink by Flink.

In summary, there should be separation of extraction of the key and message
from the element passed into the sink from serialization, and the
serialization step should be optional.


Re: Kafka producer sink message loss?

2016-06-07 Thread Elias Levy
On Tue, Jun 7, 2016 at 4:52 AM, Stephan Ewen  wrote:

> The concern you raised about the sink being synchronous is exactly what my
> last suggestion should address:
>
> The internal state backends can return a handle that can do the sync in a
> background thread. The sink would continue processing messages, and the
> checkpoint would only be acknowledged after the background sync did
> complete.
> We should allow user code to return such a handle as well.
>

Sorry.  Apparently I hadn't had enough coffee and completely missed the
last paragraph of your response.  The async solution you propose seems
ideal.

What message ordering guarantees are you worried about?

I don't think you can do much about guaranteeing message ordering within
Kafka in case of failure, and you'll replay some messages.  And there isn't
any guarantee if you are writing to a Kafka topic with multiple partitions
from multiple sinks using a message key distinct from the key you used in a
keyBy in Flink, as you'll be writing from multiple sink instances in
parallel in what is essentially a shuffle.  It would seem the only ordering
guarantee is if you write from a sink into a Kafka topic using a message
key that is the same as the key used in a keyBy in Flink, and even that
will be violated during a failure and replay by the sink.


Re: Event processing time with lateness

2016-06-06 Thread Elias Levy
On Fri, Jun 3, 2016 at 6:47 AM, Kostas Kloudas 
wrote:

> To see a relatively more complex example of such a trigger and how to
> implement it,
> you can have a look at this implementation:
> https://github.com/dataArtisans/beam_comp/blob/master/src/main/java/com/dataartisans/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>

I've modified this trigger so that firing are suppressed unless there are
new events between timers.  This can significantly reduce the outputted
events, which could mean much reduced writes to a downstream data store.
See https://gist.github.com/eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932.

Also, I find the accumulating behavior somewhat unintuitive as when
disabled it only purges when the time window ends.  When discarding is in
effect, it seems more natural for purging it to occur at each firing,
whether early, at the windows event time end, or late.  Otherwise, you may
end up with output events of different semantics.  E.g. with the current
behavior if you are implementing a counter early firing will result on
partial counts until the window end, after that late will give you partial
counts of the delta from the window end count.  It would be more consistent
to either generate partial counts at all firing or deltas at all firing, so
that the output of the operator can be processes the same downstream.


Re: Kafka producer sink message loss?

2016-06-06 Thread Elias Levy
On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen  wrote:

> You raised a good point. Fortunately, there should be a simply way to fix
> this.
>
> The Kafka Sunk Function should implement the "Checkpointed" interface. It
> will get a call to the "snapshotState()" method whenever a checkpoint
> happens. Inside that call, it should then sync on the callbacks, and only
> return once all have completed. It may return null (no need to store
> anything in the checkpoint).
>
> While the "Checkpointed" method has not returned, the checkpoint will not
> complete. That way, there will be a "synchronization" point per checkpoint.
>
> We can even improve this further in the future: The checkpoint method can
> return an async state handle. While the async state handle completes its
> "wait for callbacks" in the background (and only acks the checkpoint after
> that has complete), the sink function can continue processing.
>
> What do you think?
>

I opened FLINK-4027  to
track the issue.

That seems like an acceptable solution.  Presumably an exception can be
raised in snapshotState() if there is a Kafka publishing error when calling
flush() on the Kafka producer, which will cause the checkpoint to fail.

I do wonder what sort of performance penalty using flush() will incur, as
it is a synchronous call.  I assume no other messages can be processed by
the sink while inside snapshotState().  In theory a sink could continue
processing messages, so long as it kept track of pending messages that
occurred before the barrier and responded to the snapshotState() call when
there no longer were any pending messages from before the barrier.


  1   2   >