Flink Kafka connector consume from a single kafka partition

2020-02-18 Thread hemant singh
Hello Flink Users,

I have a use case where I am processing metrics from different type of
sources(one source will have multiple devices) and for aggregations as well
as build alerts order of messages is important. To maintain customer data
segregation I plan to have single topic for each customer with each source
stream data to one kafka partition.
To maintain ordering I am planning to push data for a single source type to
single partitions. Then I can create keyedstream so that each of the
device-id I have a single stream which has ordered data for each device-id.

However, flink-kafka consumer I don't see that I can read from a specific
partition hence flink consumer read from multiple kafka partitions. So even
if I try to create a keyedstream on source type(and then write to a
partition for further processing like keyedstream on device-id) I think
ordering will not be maintained per source type.

Only other option I feel I am left with is have single partition for the
topic so that flink can subscribe to the topic and this maintains the
ordering, the challenge is too many topics(as I have this configuration for
multiple customers) which is not advisable for a kafka cluster.

Can anyone shed some light on how to handle this use case.

Thanks,
Hemant


Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Till Rohrmann
Then my statement must be wrong. Let me double check this. Yesterday when
checking the usage of the objectReuse field, I could only see it in the
batch operators. I'll get back to you.

Cheers,
Till

On Wed, Feb 19, 2020, 07:05 Jin Yi  wrote:

> Hi Till,
> I just read your comment:
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> I have an application that is written in apache beam, but the runner is
> flink, in the configuration of the pipeline, it is in streaming mode, and I
> see performance difference between enable/disable ObjectReuse, also when
> running in debugging mode, I noticed that with objectReuse set to true,
> there is no serialization/deserialization happening between operators,
> however, when set to false, in between each operator, the serialization and
> deserialization is happening. So do you have any idea why this is happening?
>
> MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
>
> options.setStreaming(true);
>
> options.setObjectReuse(true);
>
>
> Thanks a lot!
>
> Eleanore
>
>
> On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann 
> wrote:
>
>> Hi Theo,
>>
>> the KafkaDeserializationSchema does not allow to return asynchronous
>> results. Hence, Flink will always wait until
>> KafkaDeserializationSchema.deserialize returns the parsed value.
>> Consequently, the only way I can think of to offload the complex parsing
>> logic would be to do it in a downstream operator where you could use
>> AsyncI/O to run the parsing logic in a thread pool, for example.
>>
>> Alternatively, you could think about a simple program which transforms
>> your input events into another format where it is easier to extract the
>> timestamp from. This comes, however, at the cost of another Kafka topic.
>>
>> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
>> only affects the DataSet API. DataStream programs will always do defensive
>> copies. There is a FLIP to improve this behaviour [1].
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
>> theo.diefent...@scoop-software.de> wrote:
>>
>>> Hi,
>>>
>>> As for most pipelines, our flink pipeline start with parsing source
>>> kafka events into POJOs. We perform this step within a
>>> KafkaDeserizationSchema so that we properly extract the event itme
>>> timestamp for the downstream Timestamp-Assigner.
>>>
>>> Now it turned out that parsing is currently the most CPU intensive task
>>> in our pipeline and thus CPU bounds the number of elements we can ingest
>>> per second. Further splitting up the partitions will be hard as we need to
>>> maintain the exact order of events per partition and would also required
>>> quite some architectural changes for producers and the flink job.
>>>
>>> Now I had the idea to put the parsing task into ordered Async-IO. But
>>> AsyncIO can only be plugged in into an existing Stream, not into the
>>> deserialization schema, as far as I see. So the best idea I currently have
>>> is to keep parsing in the DeserializationSchema as minimal as possible to
>>> extract the Event timestamp and do the full parsing downstream in Async IO.
>>> This however, seems to be a bit tedious, especially as we have to deal with
>>> multiple input formats and would need to develop two parsers for the heavy
>>> load once: a timestamp only and a full parser.
>>>
>>> Do you know if it is somehow possible to parallelize / async IO the
>>> parsing within the KafkaDeserializationSchema? I don't have state access in
>>> there and I don't have a "collector" object in there so that one element as
>>> input needs to produce exactly one output element.
>>>
>>> Another question: My parsing produces Java POJO objects via "new", which
>>> are sent downstream (reusePOJO setting set) and finally will be garbage
>>> collected once they reached the sink. Is there some mechanism in Flink so
>>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>>> so that theoretically, that could be possible?
>>>
>>> Best regards
>>> Theo
>>>
>>


Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Jin Yi
Hi Till,
I just read your comment:
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].

I have an application that is written in apache beam, but the runner is
flink, in the configuration of the pipeline, it is in streaming mode, and I
see performance difference between enable/disable ObjectReuse, also when
running in debugging mode, I noticed that with objectReuse set to true,
there is no serialization/deserialization happening between operators,
however, when set to false, in between each operator, the serialization and
deserialization is happening. So do you have any idea why this is happening?

MyOptions options = PipelineOptionsFactory.as(MyOptions.class);

options.setStreaming(true);

options.setObjectReuse(true);


Thanks a lot!

Eleanore


On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann  wrote:

> Hi Theo,
>
> the KafkaDeserializationSchema does not allow to return asynchronous
> results. Hence, Flink will always wait until
> KafkaDeserializationSchema.deserialize returns the parsed value.
> Consequently, the only way I can think of to offload the complex parsing
> logic would be to do it in a downstream operator where you could use
> AsyncI/O to run the parsing logic in a thread pool, for example.
>
> Alternatively, you could think about a simple program which transforms
> your input events into another format where it is easier to extract the
> timestamp from. This comes, however, at the cost of another Kafka topic.
>
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi,
>>
>> As for most pipelines, our flink pipeline start with parsing source kafka
>> events into POJOs. We perform this step within a KafkaDeserizationSchema so
>> that we properly extract the event itme timestamp for the downstream
>> Timestamp-Assigner.
>>
>> Now it turned out that parsing is currently the most CPU intensive task
>> in our pipeline and thus CPU bounds the number of elements we can ingest
>> per second. Further splitting up the partitions will be hard as we need to
>> maintain the exact order of events per partition and would also required
>> quite some architectural changes for producers and the flink job.
>>
>> Now I had the idea to put the parsing task into ordered Async-IO. But
>> AsyncIO can only be plugged in into an existing Stream, not into the
>> deserialization schema, as far as I see. So the best idea I currently have
>> is to keep parsing in the DeserializationSchema as minimal as possible to
>> extract the Event timestamp and do the full parsing downstream in Async IO.
>> This however, seems to be a bit tedious, especially as we have to deal with
>> multiple input formats and would need to develop two parsers for the heavy
>> load once: a timestamp only and a full parser.
>>
>> Do you know if it is somehow possible to parallelize / async IO the
>> parsing within the KafkaDeserializationSchema? I don't have state access in
>> there and I don't have a "collector" object in there so that one element as
>> input needs to produce exactly one output element.
>>
>> Another question: My parsing produces Java POJO objects via "new", which
>> are sent downstream (reusePOJO setting set) and finally will be garbage
>> collected once they reached the sink. Is there some mechanism in Flink so
>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>> so that theoretically, that could be possible?
>>
>> Best regards
>> Theo
>>
>


Re: Persisting inactive state outside Flink

2020-02-18 Thread Akshay Aggarwal
Thanks Till.

Going with your suggestion, I'll run some benchmarks to figure out how the
lookups behave with increasing number of keys, and checkpoints with
increasing state size. I'll take a decision based on the results, and maybe
reach out to you if I need more information.

Thanks a lot,
Akshay

On Tue, Feb 18, 2020 at 11:02 PM Till Rohrmann  wrote:

> Hmm, with this size you will need an aggregated disk capacity of 11 TB
> (for the 1.2 Bn devices). If most of the entries are permanently dormant,
> then this is not ideal. On the other hand, they would occupy the same space
> on your Hbase cluster.
>
> Concerning your questions about RocksDB:
> 1. When using full checkpoints, then the "time to checkpoint" and the
> "time to recovery" will increase with the size of the state in the general
> case. Moreover, it will mostly be I/O bound wrt to your persistent storage.
> If you enable local recovery and don't suffer a machine loss, then the
> recovery should be almost instantaneous. If you activate incremental
> checkpoints, then the "time to checkpoint" depends on your access pattern.
> If the access pattern stays more or less the same, then the checkpoint time
> should stay constant. The "time to recovery" might be a bit worse compared
> to full checkpoints because you might have to download uncompacted sst
> files.
> 2. I think RocksDB's performance should slightly decrease (but I haven't
> ran the numbers). Given that you have more keys, the lookups should become
> slightly more expensive. However, I would expect that this should not
> really matter given that RocksDB uses some proper indexes. The bigger
> difference will probably make whether you are accessing data which is still
> kept in the write buffer (in memory) or whether you need to access one of
> the sst files. Also here, the more keys you have, the more sst files you
> potentially need to touch. I would recommend to run some benchmarks to see
> yourself how it behaves with your workload.
> 3. You can use Flink's state processor API [1] to access Flink state. The
> only thing you need to do is to take a savepoint of your job and then feed
> the savepoint to the state processor API in order to access it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Cheers,
> Till
>
> On Tue, Feb 18, 2020 at 5:57 PM Akshay Aggarwal <
> akshay.aggar...@flipkart.com> wrote:
>
>> Thanks Till, I really appreciate your response.
>>
>> We are in fact considering RocksDB as our state backend. The scale we are
>> looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
>> state per device is not expected to grow beyond few 10s of KBs though. The
>> peak ingestion rates are around 100k events per second. Another
>> consideration here is that many devices will go dormant forever, and it
>> seems pointless to keep that in state.
>>
>> I have few concerns because of which I wasn't completely convinced of
>> using RocksDB (only) -
>> 1. Will the "time to checkpoint" and "time to recovery"  keep increasing
>> with the size of the state?
>> 2. Will there be a slowdown in RocksDB operations as the number of keys
>> increase over time?
>> 3. If we go to production with just RocksDB and no external state
>> persistence, is there a way for us to migrate to lazy loading if we hit
>> scale issues?
>>
>> The iterative solution with AsyncIO seems complex but feasible, it
>> certainly needs more thought to handle edge cases. Also, our use case can
>> manage an occasional glitch that comes with at-least once processing since
>> the output will be used for analytical purposes. Thanks for the suggestion.
>>
>> Cheers!
>> Akshay
>>
>>
>> On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Akshay,
>>>
>>> there is no easy out-of-the-box implementation for what you are asking.
>>>
>>> Before drafting a potential solution I wanted to ask whether using the
>>> RocksDB state backend could already solve your problem. With this state
>>> backend Flink is able to spill state data to disk. Would this work for your
>>> use case or do you expect the device data per node to grow so big that it
>>> no longer fits onto disk?
>>>
>>> If using the RocksDB state backend does not work for you and you really
>>> need to offload state data to an external storage system from where you can
>>> load it lazily it become significantly more complicated. One approach I
>>> could think of is the following: You have a primary operator (process)
>>> which is responsible for processing the incoming events and keeps the state
>>> of the non-dormant devices. Once a device becomes dormant, you could send
>>> the data to a secondary operator (offload+fetching) which uses AsyncIO to
>>> offload the state to Hbase, for example. If the process operator should
>>> encounters an event from a dormant device, it would need to ask the
>>> secondary operator to load it (via sending a fetch event downstream). The
>>> secondary operator would 

Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
There might be a possible workaround for this, for now:

Basically, the trick is to explicitly tell the State Processor API to use a
specified type information to access the keyed state.
You can do that with the `ExistingSavepoint#readKeyedState(String uid,
KeyedStateReaderFunction function, TypeInformation keyTypeInfo,
TypeInformation outTypeInfo)`.
This would allow the State Processor API to bypass the Java type
information extraction process (which is not compatible with how it is done
in Scala DataStream right now, hence the StateMigrationException you are
getting).

What you'd have to do, is in your pipeline job, explicitly generate the
serializer / type information using either the Scala DataStream macro
`createTypeInformation` or just use a custom serializer.
Then, specify to use that serializer / type info when reading keyed state
with the State Processor API.
Simply put: you'll be specifying explicitly what serializer to use for the
keys, and tell the State Processor API to also use that serializer to
access state.

This is not nice, but should work for now. Would be interesting to hear how
that works out for you.
As mentioned above, eventually a possible ideal solution is that type
information extraction should be converged for the Java / Scala DataStream
APIs.

Cheers,
Gordon

On Wed, Feb 19, 2020 at 10:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Just to clarify -
> I quickly went through the README of the project, and saw this:
> "This error is seen after trying to read from a savepoint that was created
> using the same case class as a key."
>
> So, if I understood correctly, you were attempting to use the State
> Processor API to access a savepoint that was written with a Scala
> DataStream job, correct?
>
> If that's the case, I'm afraid this would not work as of now. See [1] for
> a similar scenario that others had also bumped into.
> TL;DR is - the State Processor API currently is not guaranteed to work for
> snapshots that are written with Scala DataStream jobs.
>
> For now, I'll add a big warning about this to the docs.
> But in general, it seems like we might want to consider bumping up the
> priority for enabling this, as quite a few users are using the Scala
> DataStream API for their jobs.
>
> Just as a side comment: this repo looks like a very interesting project!
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-15719
>
> On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:
>
>> Hey all,
>>
>> I've run into an issue with the State Processor API. To highlight the
>> issues I've been having, I've created a reference repository that will
>> demonstrate the issue (repository:
>> https://github.com/segmentio/flink-state-management).
>>
>> The current implementation of the pipeline has left us with keyed state
>> that we no longer need, and we don't have references some of the old keys.
>> My plan was to:
>> 1. create a savepoint
>> 2. read the keys from each operator (using State Processor API)
>> 3. filter out all the keys that are longer used
>> 4. bootstrap a new savepoint that contains the filtered state
>>
>> I managed to get this working using a sample pipeline and a very basic
>> key (a string), but when I switched the key to be something more complex (a
>> case class of two strings), I started seeing this exception:
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>> at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
>> ... 13 more
>>
>> Has anyone come across this before and figured out a fix? Any help you
>> can give would be greatly appreciated!
>>
>> Thanks,
>> --
>> 
>> Mark Niehe ·  Software Engineer
>> Integrations
>>   ·
>> Blog 
>>   ·  We're Hiring!
>> 
>>
>


Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
Hi,

Just to clarify -
I quickly went through the README of the project, and saw this:
"This error is seen after trying to read from a savepoint that was created
using the same case class as a key."

So, if I understood correctly, you were attempting to use the State
Processor API to access a savepoint that was written with a Scala
DataStream job, correct?

If that's the case, I'm afraid this would not work as of now. See [1] for a
similar scenario that others had also bumped into.
TL;DR is - the State Processor API currently is not guaranteed to work for
snapshots that are written with Scala DataStream jobs.

For now, I'll add a big warning about this to the docs.
But in general, it seems like we might want to consider bumping up the
priority for enabling this, as quite a few users are using the Scala
DataStream API for their jobs.

Just as a side comment: this repo looks like a very interesting project!

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-15719

On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:

> Hey all,
>
> I've run into an issue with the State Processor API. To highlight the
> issues I've been having, I've created a reference repository that will
> demonstrate the issue (repository:
> https://github.com/segmentio/flink-state-management).
>
> The current implementation of the pipeline has left us with keyed state
> that we no longer need, and we don't have references some of the old keys.
> My plan was to:
> 1. create a savepoint
> 2. read the keys from each operator (using State Processor API)
> 3. filter out all the keys that are longer used
> 4. bootstrap a new savepoint that contains the filtered state
>
> I managed to get this working using a sample pipeline and a very basic key
> (a string), but when I switched the key to be something more complex (a
> case class of two strings), I started seeing this exception:
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
> at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 13 more
>
> Has anyone come across this before and figured out a fix? Any help you can
> give would be greatly appreciated!
>
> Thanks,
> --
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·
> Blog   ·  
> We're
> Hiring! 
>


[Quesetion] how to havee additional Logging in Apache Beam KafkaWriter

2020-02-18 Thread Jin Yi
Hi there,

I am using Apache Beam (v2.16) in my application, and the Runner is
Flink(1.8). I use KafkaIO connector to consume from source topics and
publish to sink topics.

Here is the class that Apache Beam provides for publishing messages.
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

Due to requirement, I need to log at info level for every message that has
been published (regardless successful or failed).

So essentially, in this class, I need the logging added below, are there
any suggestions for it?

private class SendCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
  LOG.info("PublishToKafkaTopic. Published someId={} to
topic={}",someId, topic);
} else {
  LOG.error("PublishToKafkaTopic. Error publishing someId={} to
topic={}",someId, topic, exception);
}

Thanks a lot!

Eleanore


Re: CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Following up on this -- does anyone know if it's possible to stream
individual files to a directory using the StreamingFileSink? For instance,
if I want all records that come in during a certain day to be
partitioned into daily directories:

2020-02-18/
   large-file-1.txt
   large-file-2.txt
2020-02-19/
   large-file-3.txt

Or is there another way to accomplish this?

Thanks!
Austin

On Tue, Feb 18, 2020 at 5:33 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> Has anyone had success using the StreamingFileSink[1] to write CSV files?
> And if so, what about compressed (Gzipped, ideally) files/ which libraries
> did you use?
>
>
> Best,
> Austin
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>


State Processor API Keyed State

2020-02-18 Thread Mark Niehe
Hey all,

I've run into an issue with the State Processor API. To highlight the
issues I've been having, I've created a reference repository that will
demonstrate the issue (repository:
https://github.com/segmentio/flink-state-management).

The current implementation of the pipeline has left us with keyed state
that we no longer need, and we don't have references some of the old keys.
My plan was to:
1. create a savepoint
2. read the keys from each operator (using State Processor API)
3. filter out all the keys that are longer used
4. bootstrap a new savepoint that contains the filtered state

I managed to get this working using a sample pipeline and a very basic key
(a string), but when I switched the key to be something more complex (a
case class of two strings), I started seeing this exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 13 more

Has anyone come across this before and figured out a fix? Any help you can
give would be greatly appreciated!

Thanks,
-- 

Mark Niehe ·  Software Engineer
Integrations
  ·  Blog
  ·  We're
Hiring! 


CSV StreamingFileSink

2020-02-18 Thread Austin Cawley-Edwards
Hey all,

Has anyone had success using the StreamingFileSink[1] to write CSV files?
And if so, what about compressed (Gzipped, ideally) files/ which libraries
did you use?


Best,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html


Link to Flink on K8S Webinar

2020-02-18 Thread Aizhamal Nurmamat kyzy
Hi folks,

Recently Aniket Mokashi and Dagang Wei hosted a webinar on how to use the
flink k8s operator they have developed. The operator also supports working
with Beam.

If you think that this may be helpful to you, you may access the recording
and slides via this link:
https://www.cncf.io/webinars/operating-os-flink-beam-runtime-kubernetes/

Thanks,
Aizhamal


Updating ValueState not working in hosted Kinesis

2020-02-18 Thread Chris Stevens
Hi there,

I'm trying to update state in one of my applications hosted in Kinesis Data
Analytics.

private transient ValueState sensorState;
using sensorState.update(sensor);

Get error:

An error occurred: org.apache.flink.util.FlinkRuntimeException: Error while
adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
at
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
at
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
at
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Unable to create serializer
"com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
org.apache.logging.log4j.core.layout.AbstractCsvLayout
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
connection (org.postgresql.core.PGStream)
pgStream (org.postgresql.core.v3.QueryExecutorImpl)
transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
commitQuery (org.postgresql.jdbc.PgConnection)
connection (org.postgresql.jdbc.PgResultSet)
val$rs
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at

Re: Persisting inactive state outside Flink

2020-02-18 Thread Till Rohrmann
Hmm, with this size you will need an aggregated disk capacity of 11 TB (for
the 1.2 Bn devices). If most of the entries are permanently dormant, then
this is not ideal. On the other hand, they would occupy the same space on
your Hbase cluster.

Concerning your questions about RocksDB:
1. When using full checkpoints, then the "time to checkpoint" and the "time
to recovery" will increase with the size of the state in the general case.
Moreover, it will mostly be I/O bound wrt to your persistent storage. If
you enable local recovery and don't suffer a machine loss, then the
recovery should be almost instantaneous. If you activate incremental
checkpoints, then the "time to checkpoint" depends on your access pattern.
If the access pattern stays more or less the same, then the checkpoint time
should stay constant. The "time to recovery" might be a bit worse compared
to full checkpoints because you might have to download uncompacted sst
files.
2. I think RocksDB's performance should slightly decrease (but I haven't
ran the numbers). Given that you have more keys, the lookups should become
slightly more expensive. However, I would expect that this should not
really matter given that RocksDB uses some proper indexes. The bigger
difference will probably make whether you are accessing data which is still
kept in the write buffer (in memory) or whether you need to access one of
the sst files. Also here, the more keys you have, the more sst files you
potentially need to touch. I would recommend to run some benchmarks to see
yourself how it behaves with your workload.
3. You can use Flink's state processor API [1] to access Flink state. The
only thing you need to do is to take a savepoint of your job and then feed
the savepoint to the state processor API in order to access it.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Cheers,
Till

On Tue, Feb 18, 2020 at 5:57 PM Akshay Aggarwal <
akshay.aggar...@flipkart.com> wrote:

> Thanks Till, I really appreciate your response.
>
> We are in fact considering RocksDB as our state backend. The scale we are
> looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
> state per device is not expected to grow beyond few 10s of KBs though. The
> peak ingestion rates are around 100k events per second. Another
> consideration here is that many devices will go dormant forever, and it
> seems pointless to keep that in state.
>
> I have few concerns because of which I wasn't completely convinced of
> using RocksDB (only) -
> 1. Will the "time to checkpoint" and "time to recovery"  keep increasing
> with the size of the state?
> 2. Will there be a slowdown in RocksDB operations as the number of keys
> increase over time?
> 3. If we go to production with just RocksDB and no external state
> persistence, is there a way for us to migrate to lazy loading if we hit
> scale issues?
>
> The iterative solution with AsyncIO seems complex but feasible, it
> certainly needs more thought to handle edge cases. Also, our use case can
> manage an occasional glitch that comes with at-least once processing since
> the output will be used for analytical purposes. Thanks for the suggestion.
>
> Cheers!
> Akshay
>
>
> On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann 
> wrote:
>
>> Hi Akshay,
>>
>> there is no easy out-of-the-box implementation for what you are asking.
>>
>> Before drafting a potential solution I wanted to ask whether using the
>> RocksDB state backend could already solve your problem. With this state
>> backend Flink is able to spill state data to disk. Would this work for your
>> use case or do you expect the device data per node to grow so big that it
>> no longer fits onto disk?
>>
>> If using the RocksDB state backend does not work for you and you really
>> need to offload state data to an external storage system from where you can
>> load it lazily it become significantly more complicated. One approach I
>> could think of is the following: You have a primary operator (process)
>> which is responsible for processing the incoming events and keeps the state
>> of the non-dormant devices. Once a device becomes dormant, you could send
>> the data to a secondary operator (offload+fetching) which uses AsyncIO to
>> offload the state to Hbase, for example. If the process operator should
>> encounters an event from a dormant device, it would need to ask the
>> secondary operator to load it (via sending a fetch event downstream). The
>> secondary operator would again use AsyncIO to load the requested data. Once
>> it retrieves the data, you would need to send the data back to the primary
>> operator via a feedback edge (iteration edge).
>>
>> The problem with this approach is that Flink does not give you
>> exactly-once processing guarantees when using iterations at the moment. The
>> community is working on changing this, though.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
>> 

Re: Can Connected Components run on a streaming dataset using iterate delta?

2020-02-18 Thread Yun Gao
   Hi Kant, 

  As far as I know, I think the current example connected 
components implementation based on DataSet API could not be extended to 
streaming data or incremental batch directly. 
  From the algorithm's perspective, if the graph only add edge and 
never remove edge, I think the connected components should be able to be 
updated incrementally when the graph changes: When some edges are added, a new 
search should be started from the sources of the added edges to propagate its 
component ID. This will trigger a new pass of update of the following vertices, 
and the updates continues until no vertices' component ID get updated. However, 
if there are also edge removes, I think the incremental computation should not 
be easily achieved. 
  To implement the above logic on Flink, I think currently there 
should be two possible methods: 
1) Use DataSet API and DataSet iteration, maintains the 
graph structure and the latest computation result in a storage, and whenever 
there are enough changes to the graph, submits a new DataSet job to recompute 
the result. The job should load the edges, the latest component id and whether 
it is the source of the newly added edges for each graph vertex, and then start 
the above incremental computation logic. 
2) Flink also provide DataStream iteration API[1] that 
enables iterating on the unbounded data. In this case the graph modification 
should be modeled as a datastream, and some operators inside the iteration 
should maintain the graph structure and current component id. whenever there 
are enough changes, it starts a new pass of computation.

Best,
 Yun

[1] Flink DataStream iteration, 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#iterations


--
From:kant kodali 
Send Time:2020 Feb. 18 (Tue.) 15:11
To:user 
Subject:Can Connected Components run on a streaming dataset using iterate delta?

Hi All,

I am wondering if connected components can run on a streaming data? or say 
incremental batch?

I see that with delta iteration not all vertices need to participate at every 
iteration which is great but in my case the graph is evolving over time other 
words new edges are getting added over time. If so, does the algorithm needs to 
run on the entire graph or can it simply run on the new batch of edges?

Finally, What is the best way to compute connected components on Graphs 
evolving over time? Should I use streaming or batch or any custom incremental 
approach? Also, the documentation take maxIterations as an input. How can I 
come up with a good number for max iterations? and once I come up with a good 
number for max Iterations is the algorithm guaranteed to converge?


Thanks,
Kant




Re: Persisting inactive state outside Flink

2020-02-18 Thread Akshay Aggarwal
Thanks Till, I really appreciate your response.

We are in fact considering RocksDB as our state backend. The scale we are
looking at is 1.2 Bn new devices every year, with a growth of ~30% YoY, the
state per device is not expected to grow beyond few 10s of KBs though. The
peak ingestion rates are around 100k events per second. Another
consideration here is that many devices will go dormant forever, and it
seems pointless to keep that in state.

I have few concerns because of which I wasn't completely convinced of using
RocksDB (only) -
1. Will the "time to checkpoint" and "time to recovery"  keep increasing
with the size of the state?
2. Will there be a slowdown in RocksDB operations as the number of keys
increase over time?
3. If we go to production with just RocksDB and no external state
persistence, is there a way for us to migrate to lazy loading if we hit
scale issues?

The iterative solution with AsyncIO seems complex but feasible, it
certainly needs more thought to handle edge cases. Also, our use case can
manage an occasional glitch that comes with at-least once processing since
the output will be used for analytical purposes. Thanks for the suggestion.

Cheers!
Akshay


On Tue, Feb 18, 2020 at 4:06 PM Till Rohrmann  wrote:

> Hi Akshay,
>
> there is no easy out-of-the-box implementation for what you are asking.
>
> Before drafting a potential solution I wanted to ask whether using the
> RocksDB state backend could already solve your problem. With this state
> backend Flink is able to spill state data to disk. Would this work for your
> use case or do you expect the device data per node to grow so big that it
> no longer fits onto disk?
>
> If using the RocksDB state backend does not work for you and you really
> need to offload state data to an external storage system from where you can
> load it lazily it become significantly more complicated. One approach I
> could think of is the following: You have a primary operator (process)
> which is responsible for processing the incoming events and keeps the state
> of the non-dormant devices. Once a device becomes dormant, you could send
> the data to a secondary operator (offload+fetching) which uses AsyncIO to
> offload the state to Hbase, for example. If the process operator should
> encounters an event from a dormant device, it would need to ask the
> secondary operator to load it (via sending a fetch event downstream). The
> secondary operator would again use AsyncIO to load the requested data. Once
> it retrieves the data, you would need to send the data back to the primary
> operator via a feedback edge (iteration edge).
>
> The problem with this approach is that Flink does not give you
> exactly-once processing guarantees when using iterations at the moment. The
> community is working on changing this, though.
>
> Cheers,
> Till
>
> On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
> akshay.aggar...@flipkart.com> wrote:
>
>> Hi,
>>
>> We have a use case where we have to persist some state information about
>> a device forever. Each new event will fetch the keyed state and update it.
>> And this has to be applied in-order of events.
>>
>> The problem is that the number of devices (keys) will keep growing
>> infinitely. Usually a device comes online, stays active for a while
>> (generates new events) and then goes into dormant mode. Is there a way we
>> can persist the state outside of Flink (say HBase) when the device goes
>> dormant and later fetch when it's activated?
>>
>> I know we can do this in process function using timers. But here I'll
>> have to make a synchronous call to the external store every time a new
>> device comes live, or when an active device goes dormant, which will stall
>> the task and become a scalability bottleneck. Using AsyncIO also doesn't
>> seem to be an option.
>>
>> Is there a way to achieve this without hacking into Flink code?
>>
>> Thanks,
>> Akshay Aggarwal
>>
>>
>> *-*
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error, please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee,
>> you should not disseminate, distribute or copy this email. Please notify
>> the sender immediately by email if you have received this email by mistake
>> and delete this email from your system. If you are not the intended
>> recipient, you are notified that disclosing, copying, distributing or
>> taking any action in reliance on the contents of this information is
>> strictly prohibited.*
>>
>>
>>
>> *Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of the organization. Any
>> information on shares, debentures or 

Identifying Flink Operators of the Latency Metric

2020-02-18 Thread Morgan Geldenhuys

Hi All,

I have setup monitoring for Flink (1.9.2) via Prometheus and am 
interested in viewing the end-to-end latency at the sink operators for 
the 95 percentile. I have enabled latency markers at the operator level 
and can see the results, one of the entries looks as follows:


flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="flink",component="taskmanager",host="flink_taskmanager_6bdc8fc49_kr4bs",instance="10.244.18.2:",job="kubernetes-pods",job_id="96d32d8e380dc267bd69403fd7e20adf",job_name="Traffic",kubernetes_namespace="default",kubernetes_pod_name="flink-taskmanager-6bdc8fc49-kr4bs",operator_id="2e32dc82f03b1df764824a4773219c97",operator_subtask_index="7",pod_template_hash="6bdc8fc49",quantile="0.95",source_id="cbc357ccb763df2852fee8c4fc7d55f2",tm_id="7fb02c0ed734ed1815fa51373457434f"}

That is great, however... I am unable to determine which of the 
operators is the sink operator I'm looking for based solely on the 
operator_id. Is there a way of determining this?


Regards,
M.


Re: Not able to consume kafka massages in flink-1.10.0 version

2020-02-18 Thread Stephan Ewen
This looks like a Kafka version mismatch.

Please check that you have the right Flink connector and not any other
Kafka dependencies from in the classpath.

On Tue, Feb 18, 2020 at 10:46 AM Avinash Tripathy <
avinash.tripa...@stellapps.com> wrote:

> Hi,
>
> I am getting this error message.
>
> [image: flink-kafka-consumer-error.png]
>
> Flink version: 1.10.0
> Kafka version: kafka_2.12-2.1.0
>
> Thanks,
> Avinash
>


Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

2020-02-18 Thread Robert Metzger
Hey Milind,

can you additionally also set

metrics.internal.query-service.port

to the range?


Best,
Robert


On Fri, Feb 7, 2020 at 8:35 PM Milind Vaidya  wrote:

> I tried setting that option but did not work.
>
> 2020-02-07 19:28:45,999 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registering TaskManager with ResourceID 32fb9e7dcc9d41917bce38a2d5bb0093
> (akka.tcp://flink@ip-1:34718/user/taskmanager_0) at ResourceManager
> 2020-02-07 19:28:46,425 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registering TaskManager with ResourceID 8c402a9c039d3c33466631510c48b552
> (akka.tcp://flink@ip-2:37120/user/taskmanager_0) at ResourceManager
>
> I have setting as follows
>
> taskmanager.rpc.port : 50100-50200
> blob.server.port : 50201-50300
>
> So how to control the port for TaskManager ? Inspite above setting the
> task managers are being scheduled at ports 34718 and 37120.
>
> Thanks,
> Milind
>
>
>
> On Thu, Feb 6, 2020 at 5:25 PM Yang Wang  wrote:
>
>> Maybe you forget to limit the blob server port(blob.server.port) to the
>> range.
>>
>>
>> Best,
>> Yang
>>
>> Milind Vaidya  于2020年2月7日周五 上午7:03写道:
>>
>>> I figured out that it was problem with the ports. 39493/34094 were not
>>> accessible. So to get this working I opened all the ports 0-65535 for the
>>> security group.
>>>
>>> How do I control that if I want to open only certain range of ports ?
>>>
>>> Is "taskmanager.rpc.port" the right parameter to set ? I did try and
>>> set this to certain port range, but did not work.
>>>
>>> Thanks
>>> Milind
>>>
>>> On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya  wrote:
>>>



 The  cluster is set up on AWS with 1 Job manager and 2 task managers.
 They all belong to same security group with 6123, 8081, 50100 - 50200
 ports having access granted

 Job manager config is as follows :

 FLINK_PLUGINS_DIR   :
 /usr/local/flink-1.9.1/plugins
 io.tmp.dirs :   /tmp/flink
 jobmanager.execution.failover-strategy  :   region
 jobmanager.heap.size:   1024m
 jobmanager.rpc.address  :   10.0.16.10
 jobmanager.rpc.port :   6123
 jobstore.cache-size :   52428800
 jobstore.expiration-time:   3600
 parallelism.default :   4
 slot.idle.timeout   :   5
 slot.request.timeout:   30
 task.cancellation.interval  :   3
 task.cancellation.timeout   :   18
 task.cancellation.timers.timeout:   7500
 taskmanager.exit-on-fatal-akka-error:   false
 taskmanager.heap.size   :   1024m
 taskmanager.network.bind-policy :   "ip"
 taskmanager.numberOfTaskSlots   :   2
 taskmanager.registration.initial-backoff:   500ms
 taskmanager.registration.timeout:   5min
 taskmanager.rpc.port:   50100-50200
 web.tmpdir  :
 /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



 I have summarised the more details in a stack overflow question where
 it is easier to put the various details.

 https://stackoverflow.com/questions/60082479/flink-1-9-standalone-cluster-failed-to-transfer-file-from-taskexecutor-id

 On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger 
 wrote:

> Hi,
>
> I don't think this is a bug. It looks like the machines can not talk
> to each other. Can you validate that all the machines can talk to each
> other on the ports used by Flink (6123, 8081, ...)
> If that doesn't help:
> - How is the network set up?
> - Are you running physical machines / VMs / containers?
> - Is there a firewall involved?
>
> Best,
> Robert
>
>
> On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya 
> wrote:
>
>> Hi
>>
>> I am trying to build a cluster for flink with 1 master and 2 workers.
>> The program is working fine locally. The messages are read from Kafka
>> and just printed on STDOUT.
>>
>> The cluster is successfully created and UI is also shows all config.
>> But the job fails to execute on the cluster.
>>
>> Here are few exceptions I see in the log files
>>
>> File : flink-root-standalonesession
>>
>> 2020-01-29 19:55:00,348 INFO
>>  akka.remote.transport.ProtocolStateActor  - No
>> response from remote for outbound association. Associate timed out after
>> [2 ms].
>> 2020-01-29 19:55:00,350 INFO
>>  akka.remote.transport.ProtocolStateActor  - No
>> response from remote for outbound association. Associate timed out after
>> [2 ms].
>> 2020-01-29 19:55:00,350 WARN  

Re: Using retained checkpoints as savepoints

2020-02-18 Thread Stephan Ewen
Maybe one small addition:
  - for the heap state backend, there is no difference at all between the
format and behavior of retained checkpoints (after the job is canceled) and
savepoints. Same format and features.
  - For RocksDB incremental checkpoints, we do in fact support re-scaling,
and I think we should commit to doing that always in the future. But we
wanted to keep it open to not support state migration, for the reasons
mentioned by Aljoscha.

Supporting re-scaling on checkpoints is important for the upcoming work on
(reactive) auto-scaling, which means we need to commit to supporting this.
Which also means we can update the docs to say that.

Best,
Stephan



On Tue, Feb 18, 2020 at 1:06 PM Aljoscha Krettek 
wrote:

> Hi,
>
> the reason why we are quite conservative when it comes to stating
> properties of checkpoints is that we don't want to prevent ourselves
> from implementing possibly optimized checkpoint formats that would not
> support these features.
>
> You're right that currently checkpoints support most of the features of
> savepoints because they did not diverge far in their formats (or not at
> all).
>
> AFAIK, this is not written down anywhere so it would be good to discuss
> if we want to give those guarantees (which ties our hands a bit more) or
> keep it as is but properly document it.
>
> Best,
> Aljoscha
>
> On 30.01.20 01:58, Ken Krugler wrote:
> > Hi all,
> >
> > Currently
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#difference-to-savepoints
> <
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#difference-to-savepoints>
> says checkpoints…
> >
> > "do not support Flink specific features like rescaling"
> >
> > But I believe they do, and really must if you can use them like a
> savepoint. Should that sentence be changed, or removed?
> >
> > Also this page doesn’t talk about state migration, which is another
> aspect of restarting a (modified) workflow from a retained checkpoint…will
> that work?
> >
> > This sentence about checkpoints on
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
> <
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint>
> implies not:
> >
> > "Optimizations towards those goals can exploit certain properties, e.g.
> that the job code doesn’t change between the execution attempts"
> >
> > Thanks,
> >
> > — Ken
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com
> > custom big data solutions & training
> > Hadoop, Cascading, Cassandra & Solr
> >
> >
>


Re: job history server

2020-02-18 Thread Richard Moorhead
2020-02-18 09:44:45,227 ERROR
org.apache.flink.runtime.webmonitor.hist/ry.HistoryServerArchiveFetcher  -
Failure while fetching/process
ing job archive for job eaf0639027aca1624adaa100bdf1332e.
java.nio.file.FileSystemException:
/dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/062e4d80ed1d4bdafd24e46
2245c5926/subtasks/86/attempts/0.json: No space left on device

and there it is:

42103b5b-5410-d2d8-6a0b-21757e4a0fbc ~
0 % df -iH
Filesystem   Inodes IUsed IFree IUse% Mounted on
/dev/mapper/vg00-rootlv00
   132k   13k  119k   10% /
tmpfs   `  508k  465k   43k   92% /dev/shm

Thanks for the tip.

On Mon, Feb 17, 2020 at 8:08 PM Richard Moorhead 
wrote:

> I did not know that.
>
> I have since wiped the directory. I will post when I see this error again.
>
> On Mon, Feb 17, 2020 at 8:03 PM Benchao Li  wrote:
>
>> `df -H` only gives the sizes, not inodes information. Could you also show
>> us the result of `df -iH`?
>>
>> Richard Moorhead  于2020年2月18日周二 上午9:40写道:
>>
>>> Yes, I did. I mentioned it last but I should have been clearer:
>>>
>>> 22526:~/ $ df -H
>>>
>>>
>>>  [18:15:20]
>>> FilesystemSize  Used Avail Use% Mounted on
>>> /dev/mapper/vg00-rootlv00
>>>   2.1G  777M  1.2G  41% /
>>> tmpfs 2.1G  753M  1.4G  37% /dev/shm
>>>
>>> On Mon, Feb 17, 2020 at 7:13 PM Benchao Li  wrote:
>>>
 Hi Richard,

 Have you checked that inodes of the disk partition were full or not?

 Richard Moorhead |richard.moorh...@gmail.com> 于2020年2月18日周二 上午8:16写道:

> I see the following exception often:
>
> 2020-02-17 18:13:26,796 ERROR
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher  -
> Failure while fetching/processing job archive for job
> eaf0639027aca1624adaa100bdf1332e.
> java.nio.file.FileSystemException:
> /dev/shm/flink-history-server/jobs/eaf0639027aca1624adaa100bdf1332e/vertices/6ab&3ed37d1a5e48f2786b832033f074/subtasks/86/attempts:
> No space left on device
> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
> at java.nio.file.Files.createDirectory(Files.java:674)
> at
> java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
J> at java.nio.file.Files.createDirectories(Files.java:767)
> at
> org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:186)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Unfortunately the partition listed does not appear to be full or
> anywhere near full?
>
> Is there ! workaround to this?
>
>

 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn


>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-18 Thread Till Rohrmann
Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng!

Cheers,
Till

On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng  wrote:

> Thanks a lot for the release, Jincheng!
> Also thanks to everyone that make this release possible!
>
> Best,
> Hequn
>
> On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:
>
> > Thanks for the great work, Jincheng.
> >
> > Regards,
> > Dian
> >
> > 在 2020年2月13日,下午1:32,jincheng sun  写道:
> >
> > Hi everyone,
> >
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for
> the
> > Apache Flink Python API 1.9 series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> >
> > https://pypi.org/project/apache-flink/1.9.2/#files
> >
> > Or installed using pip command:
> >
> > pip install apache-flink==1.9.2
> >
> > We would like to thank all contributors of the Apache Flink community who
> > helped to verify this release and made this release possible!
> >
> > Best,
> > Jincheng
> >
> >
> >
>


Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-18 Thread Till Rohrmann
Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng!

Cheers,
Till

On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng  wrote:

> Thanks a lot for the release, Jincheng!
> Also thanks to everyone that make this release possible!
>
> Best,
> Hequn
>
> On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:
>
> > Thanks for the great work, Jincheng.
> >
> > Regards,
> > Dian
> >
> > 在 2020年2月13日,下午1:32,jincheng sun  写道:
> >
> > Hi everyone,
> >
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for
> the
> > Apache Flink Python API 1.9 series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> >
> > https://pypi.org/project/apache-flink/1.9.2/#files
> >
> > Or installed using pip command:
> >
> > pip install apache-flink==1.9.2
> >
> > We would like to thank all contributors of the Apache Flink community who
> > helped to verify this release and made this release possible!
> >
> > Best,
> > Jincheng
> >
> >
> >
>


Re: Process stream multiple time with different KeyBy

2020-02-18 Thread Till Rohrmann
Hi Sébastien,

there is always the possibility to reuse a stream. Given a
DataStream input, you can do the following:

KeyedStream a = input.keyBy(x -> f(x));
KeyedStream b = input.keyBy(x -> g(x));

This gives you two differently partitioned streams a and b.

If you want to evaluate every event against the full set of rules, then you
could take a look at Flink Broadcast State Pattern [1]. It allows you to
broadcast a stream of rules to all operators of a keyed input stream.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Cheers,
Till

On Mon, Feb 17, 2020 at 11:10 PM theo.diefent...@scoop-software.de <
theo.diefent...@scoop-software.de> wrote:

> Hi Sebastian,
> I'd also highly recommend a recent Flink blog post to you where exactly
> this question was answered in quote some detail :
> https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
> Best regardsTheo
>  Ursprüngliche Nachricht 
> Von: Eduardo Winpenny Tejedor 
> Datum: Mo., 17. Feb. 2020, 21:07
> An: Lehuede sebastien 
> Cc: user 
> Betreff: Re: Process stream multiple time with different KeyBy
>
>
> Hi Sebastien,
>
> Without being entirely sure of what's your use case/end goal I'll tell
> you (some of) the options Flink provides you for defining a flow.
>
> If your use case is to apply the same rule to each of your "swimlanes"
> of data (one with category=foo AND subcategory=bar, another with
> category=foo and another with category=bar) you can do this by
> implementing your own org.apache.flink.api.java.functions.KeySelector
> function for the keyBy function. You'll just need to return a
> different key for each of your rules and the data will separate to the
> appropriate "swimlane".
>
> If your use case is to apply different rules to each swimlane then you
> can write a ProcessFunction that redirects elements to different *side
> outputs*. You can then apply different operations to each side output.
>
> Your application could get tricky to evolve IF the number of swimlanes
> or the operators are meant to change over time, you'd have to be
> careful how the existing state fits into your new flows.
>
> Regards,
> Eduardo
>
> On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien 
> wrote:
> >
> > Hi all,
> >
> > I'm currently working on a Flink Application where I match events
> against a set of rules. At the beginning I wanted to dynamically create
> streams following the category of events (Event are JSON formatted and I've
> a field like "category":"foo" in each event) but I'm stuck by the
> impossibility to create streams at runtime.
> >
> > So, one of the solution for me is to create a single Kafka topic and
> then use the "KeyBy" operator to match events with "category":"foo" against
> rules also containing "category":"foo" in rule specification.
> >
> > Now I have some cases where events and rules have one category and one
> subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
> >
> > Example :
> >
> > Events have : "category":"foo" AND "subcategory":"bar"
> > Rule1 specification has : "category":"foo" AND "subcategory":"bar"
> > Rule2 specification has : "category':"foo"
> > Rule3 specification has : "category":"bar"
> >
> > In this case, my events need to be match against Rule1, Rule2 and Rule3.
> >
> > If I'm right, if I apply a multiple key "KeyBy()" with "category" and
> "subcategory" fields and then apply two single key "KeyBy()" with
> "category" field, my events will be consumed by the first "KeyBy()"
> operator and no events will be streamed in the operators after ?
> >
> > Is there any way to process the same stream one time for multi key
> KeyBy() and another time for single key KeyBy() ?
> >
> > Thanks !
> > Sébastien.
>


Re: Parallelize Kafka Deserialization of a single partition?

2020-02-18 Thread Till Rohrmann
Hi Theo,

the KafkaDeserializationSchema does not allow to return asynchronous
results. Hence, Flink will always wait until
KafkaDeserializationSchema.deserialize returns the parsed value.
Consequently, the only way I can think of to offload the complex parsing
logic would be to do it in a downstream operator where you could use
AsyncI/O to run the parsing logic in a thread pool, for example.

Alternatively, you could think about a simple program which transforms your
input events into another format where it is easier to extract the
timestamp from. This comes, however, at the cost of another Kafka topic.

Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982

Cheers,
Till

On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
> As for most pipelines, our flink pipeline start with parsing source kafka
> events into POJOs. We perform this step within a KafkaDeserizationSchema so
> that we properly extract the event itme timestamp for the downstream
> Timestamp-Assigner.
>
> Now it turned out that parsing is currently the most CPU intensive task in
> our pipeline and thus CPU bounds the number of elements we can ingest per
> second. Further splitting up the partitions will be hard as we need to
> maintain the exact order of events per partition and would also required
> quite some architectural changes for producers and the flink job.
>
> Now I had the idea to put the parsing task into ordered Async-IO. But
> AsyncIO can only be plugged in into an existing Stream, not into the
> deserialization schema, as far as I see. So the best idea I currently have
> is to keep parsing in the DeserializationSchema as minimal as possible to
> extract the Event timestamp and do the full parsing downstream in Async IO.
> This however, seems to be a bit tedious, especially as we have to deal with
> multiple input formats and would need to develop two parsers for the heavy
> load once: a timestamp only and a full parser.
>
> Do you know if it is somehow possible to parallelize / async IO the
> parsing within the KafkaDeserializationSchema? I don't have state access in
> there and I don't have a "collector" object in there so that one element as
> input needs to produce exactly one output element.
>
> Another question: My parsing produces Java POJO objects via "new", which
> are sent downstream (reusePOJO setting set) and finally will be garbage
> collected once they reached the sink. Is there some mechanism in Flink so
> that I could reuse "old" sinked POJOs in the source? All tasks are chained
> so that theoretically, that could be possible?
>
> Best regards
> Theo
>


Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-18 Thread David Magalhães
Thanks for the feedback Arvid. Currently isn't an issue, but I will look
back into it in the future.

On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise  wrote:

> Hi David,
>
> sorry for replying late. I was caught up on other incidents.
>
> I double-checked all the information that you provided and conclude that
> you completely bypass our filesystems and plugins.
>
> What you are using is AvroParquetWriter, which brings in the hadoop
> dependencies, including raw hadoop s3. It becomes obvious since the Path
> you are using is not coming from Flink namespace.
> The class issues that come from that are hard to debug. You are
> effectively bundling another hadoop, so if you also have a specific Hadoop
> version on your cluster (e.g. on EMR), then there can be ambiguities and
> the seen error happens.
>
> What I'd recommend you do is a completely different approach. Assuming you
> just want exponential backoff for all s3 write accesses, you could wrap the
> S3AFileSystem and create your own s3 plugin. That would work with any given
> format for future cases.
>
> If you want to stick to your approach, you should use
> org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your
> mentioned StreamOutputFile.
>
> Best,
>
> Arvid
>
> On Thu, Feb 13, 2020 at 12:04 AM David Magalhães 
> wrote:
>
>> Hi Arvid, I use a docker image. Here is the Dockerfile:
>>
>> FROM flink:1.9.1-scala_2.12
>>
>> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
>> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
>> /opt/flink/plugins/flink-s3-fs-hadoop/
>>
>> Please let me know if you need more information.
>>
>> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise  wrote:
>>
>>> Hi David,
>>>
>>> can you double-check the folder structure of your plugin? It should
>>> reside in its own subfolder. Here is an example.
>>>
>>> flink-dist
>>> ├── conf
>>> ├── lib
>>> ...
>>> └── plugins
>>> └── s3
>>> └── flink-s3-fs-hadoop.jar
>>>
>>> I will investigate your error deeply in the next few days but I'd like
>>> to have a final confirmation about the folder structure.
>>>
>>>
>>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães 
>>> wrote:
>>>
 Hi Robert, I couldn't found any previous mention before the
 NoClassDefFoundError.
 Here is the full log [1] if you want to look for something more
 specific.

 [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0

 On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger 
 wrote:

> According to this answer [1] the first exception "mentioning"
> org/joda/time/format/DateTimeParserBucket should be a different one.
> Can you go through the logs to make sure it is really a
> ClassNotFoundException, and not a ExceptionInInitializerError or something
> else?
>
> [1]https://stackoverflow.com/a/5756989/568695
>
> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <
> speeddra...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
>> previously, this works normally until an exception is throw inside the
>> sink. It will try to recover again, but sometimes doesn't recover giving
>> this error.
>>
>> To write to S3 I use *AvroParquetWriter* with the following code:
>>
>> val writer = AvroParquetWriter
>>  .builder[GenericRecord](new Path(finalFilePath))
>>
>> *Path* is from *org.apache.hadoop.fs*, the other option is to use* 
>> org.apache.flink.formats.parquet.StreamOutputFile
>> *which will use flink S3 plugin, right ? Not sure how can I convert
>> from Path to StreamOuputFile. I know that when I've used 
>> StreamingFileSink,
>> I used StreamOuputFile.
>>
>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise 
>> wrote:
>>
>>> Hi David,
>>>
>>> upon closer reviewing your stacktrace, it seems like you are trying
>>> to access S3 without our S3 plugin. That's in general not recommended at
>>> all.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise 
>>> wrote:
>>>
 Hi David,

 this seems to be a bug in our s3 plugin. The joda dependency should
 be bundled there.

 Are you using s3 as a plugin by any chance? Which flink version are
 you using?

 If you are using s3 as a plugin, you could put joda in your plugin
 folder like this

 flink-dist
 ├── conf
 ├── lib
 ...
 └── plugins
 └── s3
 ├── joda.jar
 └── flink-s3-fs-hadoop.jar

 If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
 that.

 Adding joda to your user code will unfortunately not work.

 Best,

 Arvid

 On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
 

Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-18 Thread Arvid Heise
Hi David,

sorry for replying late. I was caught up on other incidents.

I double-checked all the information that you provided and conclude that
you completely bypass our filesystems and plugins.

What you are using is AvroParquetWriter, which brings in the hadoop
dependencies, including raw hadoop s3. It becomes obvious since the Path
you are using is not coming from Flink namespace.
The class issues that come from that are hard to debug. You are effectively
bundling another hadoop, so if you also have a specific Hadoop version on
your cluster (e.g. on EMR), then there can be ambiguities and the seen
error happens.

What I'd recommend you do is a completely different approach. Assuming you
just want exponential backoff for all s3 write accesses, you could wrap the
S3AFileSystem and create your own s3 plugin. That would work with any given
format for future cases.

If you want to stick to your approach, you should use
org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your
mentioned StreamOutputFile.

Best,

Arvid

On Thu, Feb 13, 2020 at 12:04 AM David Magalhães 
wrote:

> Hi Arvid, I use a docker image. Here is the Dockerfile:
>
> FROM flink:1.9.1-scala_2.12
>
> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
> /opt/flink/plugins/flink-s3-fs-hadoop/
>
> Please let me know if you need more information.
>
> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise  wrote:
>
>> Hi David,
>>
>> can you double-check the folder structure of your plugin? It should
>> reside in its own subfolder. Here is an example.
>>
>> flink-dist
>> ├── conf
>> ├── lib
>> ...
>> └── plugins
>> └── s3
>> └── flink-s3-fs-hadoop.jar
>>
>> I will investigate your error deeply in the next few days but I'd like to
>> have a final confirmation about the folder structure.
>>
>>
>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães 
>> wrote:
>>
>>> Hi Robert, I couldn't found any previous mention before the
>>> NoClassDefFoundError.
>>> Here is the full log [1] if you want to look for something more specific.
>>>
>>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>>>
>>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger 
>>> wrote:
>>>
 According to this answer [1] the first exception "mentioning"
 org/joda/time/format/DateTimeParserBucket should be a different one.
 Can you go through the logs to make sure it is really a
 ClassNotFoundException, and not a ExceptionInInitializerError or something
 else?

 [1]https://stackoverflow.com/a/5756989/568695

 On Wed, Feb 12, 2020 at 12:36 PM David Magalhães 
 wrote:

> Hi Arvid,
>
> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I said
> previously, this works normally until an exception is throw inside the
> sink. It will try to recover again, but sometimes doesn't recover giving
> this error.
>
> To write to S3 I use *AvroParquetWriter* with the following code:
>
> val writer = AvroParquetWriter
>  .builder[GenericRecord](new Path(finalFilePath))
>
> *Path* is from *org.apache.hadoop.fs*, the other option is to use* 
> org.apache.flink.formats.parquet.StreamOutputFile
> *which will use flink S3 plugin, right ? Not sure how can I convert
> from Path to StreamOuputFile. I know that when I've used 
> StreamingFileSink,
> I used StreamOuputFile.
>
> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise 
> wrote:
>
>> Hi David,
>>
>> upon closer reviewing your stacktrace, it seems like you are trying
>> to access S3 without our S3 plugin. That's in general not recommended at
>> all.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise 
>> wrote:
>>
>>> Hi David,
>>>
>>> this seems to be a bug in our s3 plugin. The joda dependency should
>>> be bundled there.
>>>
>>> Are you using s3 as a plugin by any chance? Which flink version are
>>> you using?
>>>
>>> If you are using s3 as a plugin, you could put joda in your plugin
>>> folder like this
>>>
>>> flink-dist
>>> ├── conf
>>> ├── lib
>>> ...
>>> └── plugins
>>> └── s3
>>> ├── joda.jar
>>> └── flink-s3-fs-hadoop.jar
>>>
>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into
>>> that.
>>>
>>> Adding joda to your user code will unfortunately not work.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
>>> speeddra...@gmail.com> wrote:
>>>
 Hi Andrey, thanks for your reply.

 The class is on the jar created with `*sbt assembly*` that is
 submitted to Flink to start a Job.

 unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
 DateTimeParserBucket
  1649  05-27-2016 10:24
 

Re: Emit message at start and end of event time session window

2020-02-18 Thread Till Rohrmann
Hi Manas,

you can implement something like this with a bit of trigger magic. What you
need to do is to define your own trigger implementation which keeps state
to remember whether it has triggered the "started window" message or not.
In the stateful window function you would need to do something similar. The
first call could trigger the output of "window started" and any subsequent
call will trigger the evaluation of the window. It would have been a bit
easier if the trigger and the window process function could share its
internal state. Unfortunately, this is not possible at the moment.

I've drafted a potential solution which you can find here [1].

[1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef

Cheers,
Till

On Mon, Feb 17, 2020 at 8:09 AM Manas Kale  wrote:

> Hi,
> I want to achieve the following using event time session windows:
>
>1. When the window.getStart() and last event timestamp in the window
>is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a message
>"Window started @ timestamp".
>2. When the session window ends, i.e. the watermark passes
>lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>ended @ timestamp".
>
>  It is guaranteed that all events are on time and no lateness is allowed.
> I am having difficulty implementing both 1 and 2 simultaneously.
> I am able to implement point 1 using a custom trigger, which checks if
> (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and triggers a
> customProcessWindowFunction().
> However, with this architecture I can't detect the end of the window.
>
> Is my approach correct or is there a completely different method to
> achieve this?
>
> Thanks,
> Manas Kale
>
>
>
>


Side Outputs from RichAsyncFunction

2020-02-18 Thread KristoffSC
Hi all,
Is there a way to emit a side output from RichAsyncFunction operator like it
is possible with ProcessFunctions via ctx.output(outputTag, value); At first
glance I don't see a way to do it

In my use case RichAsyncFunction is used to call REST services and I would
like to handle REST error codes and exceptions by emitting special Events as
a SideOutput.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: about registering completion function for worker shutdown

2020-02-18 Thread Dominique De Vito
Hi Robert,

Thanks for your hint / reply / help.

So far I have not tested your way (may be next), but tried another one:

* use mapPartitions
-- at the beginning, get a KafkaProducer
-- the KafkaProducerFactory class I use is lazy and caches the first
instances created; so, there is reuse.

* register a JVM hook for closing KafkaProducer.

So far I have met some perf issue, but I don't know yet it's due to my
pattern, or something else.

Anyway, thanks.

Regards,
Dominique


Le ven. 31 janv. 2020 à 14:20, Robert Metzger  a
écrit :

> Hi,
>
> Flink's ProcessFunction has a close() method, which is executed on
> shutdown of the workers. (You could also use any of the Rich* functions for
> that purpose).
> If you add a ProcessFunction with the same parallelism before the
> KafkaSink, it'll be executed on the same machines as the Kafka producer.
>
> Afaik, the close() call should not take forever, as the system might
> interrupt your thread if it doesn't finish closing on time (30s is the
> default for "cluster.services.shutdown-timeout")
>
> Best,
> Robert
>
>
> On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito 
> wrote:
>
>> Hi,
>>
>> For a Flink batch job, some value are writing to Kafka through a Producer.
>>
>> I want to register a hook for closing (at the end) the Kafka producer a
>> worker is using hook to be executed, of course, on worker side.
>>
>> Is there a way to do so ?
>>
>> Thanks.
>>
>> Regards,
>> Dominique
>>
>>
>>
>>


Re: Using retained checkpoints as savepoints

2020-02-18 Thread Aljoscha Krettek

Hi,

the reason why we are quite conservative when it comes to stating 
properties of checkpoints is that we don't want to prevent ourselves 
from implementing possibly optimized checkpoint formats that would not 
support these features.


You're right that currently checkpoints support most of the features of 
savepoints because they did not diverge far in their formats (or not at 
all).


AFAIK, this is not written down anywhere so it would be good to discuss 
if we want to give those guarantees (which ties our hands a bit more) or 
keep it as is but properly document it.


Best,
Aljoscha

On 30.01.20 01:58, Ken Krugler wrote:

Hi all,

Currently 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/checkpoints.html#difference-to-savepoints
 

 says checkpoints…

"do not support Flink specific features like rescaling"

But I believe they do, and really must if you can use them like a savepoint. 
Should that sentence be changed, or removed?

Also this page doesn’t talk about state migration, which is another aspect of 
restarting a (modified) workflow from a retained checkpoint…will that work?

This sentence about checkpoints on 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
 

 implies not:

"Optimizations towards those goals can exploit certain properties, e.g. that the job 
code doesn’t change between the execution attempts"

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Re: Persisting inactive state outside Flink

2020-02-18 Thread Till Rohrmann
Hi Akshay,

there is no easy out-of-the-box implementation for what you are asking.

Before drafting a potential solution I wanted to ask whether using the
RocksDB state backend could already solve your problem. With this state
backend Flink is able to spill state data to disk. Would this work for your
use case or do you expect the device data per node to grow so big that it
no longer fits onto disk?

If using the RocksDB state backend does not work for you and you really
need to offload state data to an external storage system from where you can
load it lazily it become significantly more complicated. One approach I
could think of is the following: You have a primary operator (process)
which is responsible for processing the incoming events and keeps the state
of the non-dormant devices. Once a device becomes dormant, you could send
the data to a secondary operator (offload+fetching) which uses AsyncIO to
offload the state to Hbase, for example. If the process operator should
encounters an event from a dormant device, it would need to ask the
secondary operator to load it (via sending a fetch event downstream). The
secondary operator would again use AsyncIO to load the requested data. Once
it retrieves the data, you would need to send the data back to the primary
operator via a feedback edge (iteration edge).

The problem with this approach is that Flink does not give you exactly-once
processing guarantees when using iterations at the moment. The community is
working on changing this, though.

Cheers,
Till

On Fri, Feb 14, 2020 at 6:05 PM Akshay Aggarwal <
akshay.aggar...@flipkart.com> wrote:

> Hi,
>
> We have a use case where we have to persist some state information about a
> device forever. Each new event will fetch the keyed state and update it.
> And this has to be applied in-order of events.
>
> The problem is that the number of devices (keys) will keep growing
> infinitely. Usually a device comes online, stays active for a while
> (generates new events) and then goes into dormant mode. Is there a way we
> can persist the state outside of Flink (say HBase) when the device goes
> dormant and later fetch when it's activated?
>
> I know we can do this in process function using timers. But here I'll have
> to make a synchronous call to the external store every time a new device
> comes live, or when an active device goes dormant, which will stall the
> task and become a scalability bottleneck. Using AsyncIO also doesn't seem
> to be an option.
>
> Is there a way to achieve this without hacking into Flink code?
>
> Thanks,
> Akshay Aggarwal
>
>
> *-*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> 

Not able to consume kafka massages in flink-1.10.0 version

2020-02-18 Thread Avinash Tripathy
Hi,

I am getting this error message.

[image: flink-kafka-consumer-error.png]

Flink version: 1.10.0
Kafka version: kafka_2.12-2.1.0

Thanks,
Avinash


How Do i Serialize a class using default kryo and protobuf in scala

2020-02-18 Thread ApoorvK
I have some case class which have primitive as well as nested class objects
hence if I add any more variable in class savepoint does not restore I read
if I can add kyroserializer on those class using google protobuf  I will be
able to serialize it from state. Can anyone please share any example in
scala for the same.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink's Either type information

2020-02-18 Thread Yun Gao
  Hi Jacopo,

  Could you also provide how the KeyedBroadcastProcessFunction is 
created when constructing datastream API ? For example, are you using something 
like 

  new KeyedBroadcastProcessFunction() { 
   // Function implementation
 }

 or something else?

 Best, 
  Yun



--
From:jacopo.gobbi 
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user 
Subject:Flink's Either type information

Hi all,
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on 
Either type as it does not contain information about the 'left' type." when 
doing: out.collect(Either.Right(myObject));

Thanks,

Jacopo Gobbi




Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-18 Thread Niels Basjes
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)

List result = new ArrayList<>(5);

DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

*resultDataStream.print();*

environment.execute();


how ever this does not work

List result = new ArrayList<>(5);

DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

environment.execute();


and this also does not work

*resultDataStream.print();*

List result = new ArrayList<>(5);

DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);

environment.execute();


In both these cases it fails with


java.lang.IllegalStateException: *No operators defined in streaming
topology. Cannot execute.*

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at
nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)



Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai  wrote:

> Hi,
>
> To collect the elements of a DataStream (usually only meant for testing
> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes