Re: Checkpointing in StateFun

2022-03-14 Thread Seth Wiesman
There's a passing reference to it in the 1.2 docs but we removed it to
discourage use.

https://nightlies.apache.org/flink/flink-docs-release-1.2/dev/stream/state.html#raw-and-managed-state

On Sat, Mar 12, 2022 at 2:50 AM Christopher Gustafson  wrote:

> Hi,
>
>
> Thanks for the explanation, I had not heard about the Raw State before,
> and it sounds like that is not surprising if it is rarely used, and doesn't
> seem to be included in the latest versions of the documentation. Is there
> perhaps some older version of the documentation you could point me to where
> I can see how Raw state can be used in Flink?
>
>
> Best Regards,
>
> Christopher Gustafson
> ----------
> *Från:* Seth Wiesman 
> *Skickat:* den 11 mars 2022 17:57:21
> *Till:* Christopher Gustafson
> *Kopia:* user@flink.apache.org
> *Ämne:* Re: Checkpointing in StateFun
>
> I assume you are talking about the checkpointing in the feedback package?
>
> StateFun only relies on Flink checkpointing for fault tolerance. All state
> is stored in standard checkpoint / savepoints and can be used to restore
> from failure, upgrade a job, rescale, etc. Just like any other snapshot.
>
> StateFun internally relies on a form of streaming iteration. The feedback
> package contains the code for that iteration which sends records back
> upstream. To ensure exactly-once semantics, in-flight records on the
> feedback edge need to be stored in the snapshot. It does so through Flinks
> raw keyed state abstraction. This is the same thing state backends use
> under the hood to snapshot out your keyed state in any rich function.
>
> Raw keyed state has been a stable part of Flinks API forever, but is
> rarely seen in user code. I often describe the statefun runtime to people
> as the world's most complex datastream application; you will find advanced
> usage patterns most people have not seen, but nothing that isn't well
> supported by Flink.
>
> Seth
>
> On Fri, Mar 11, 2022 at 2:24 AM Christopher Gustafson 
> wrote:
>
>> Hi,
>>
>>
>> I am doing some experiments with StateFun and different Flink state
>> backends. I was looking through the StateFun source code and saw that it is
>> doing some own logic when it comes to checkpointing. My initial
>> understanding was that checkpointing was implicitly leveraged with Flink. I
>> did not find a lot of documentation about this, so I am asking if you could
>> provide a description of how checkpointing works in StateFun, and how it is
>> different from regular Flink checkpointing?
>>
>>
>> Thanks in advance,
>>
>> Christopher Gustafson
>>
>


Re: Checkpointing in StateFun

2022-03-11 Thread Seth Wiesman
I assume you are talking about the checkpointing in the feedback package?

StateFun only relies on Flink checkpointing for fault tolerance. All state
is stored in standard checkpoint / savepoints and can be used to restore
from failure, upgrade a job, rescale, etc. Just like any other snapshot.

StateFun internally relies on a form of streaming iteration. The feedback
package contains the code for that iteration which sends records back
upstream. To ensure exactly-once semantics, in-flight records on the
feedback edge need to be stored in the snapshot. It does so through Flinks
raw keyed state abstraction. This is the same thing state backends use
under the hood to snapshot out your keyed state in any rich function.

Raw keyed state has been a stable part of Flinks API forever, but is rarely
seen in user code. I often describe the statefun runtime to people as the
world's most complex datastream application; you will find advanced usage
patterns most people have not seen, but nothing that isn't well supported
by Flink.

Seth

On Fri, Mar 11, 2022 at 2:24 AM Christopher Gustafson  wrote:

> Hi,
>
>
> I am doing some experiments with StateFun and different Flink state
> backends. I was looking through the StateFun source code and saw that it is
> doing some own logic when it comes to checkpointing. My initial
> understanding was that checkpointing was implicitly leveraged with Flink. I
> did not find a lot of documentation about this, so I am asking if you could
> provide a description of how checkpointing works in StateFun, and how it is
> different from regular Flink checkpointing?
>
>
> Thanks in advance,
>
> Christopher Gustafson
>


Re: Possible BUG in 1.15 SQL JSON_OBJECT()

2022-02-25 Thread Seth Wiesman
Thank you for reporting! That is definitely a bug, and I have opened a
ticket to fix which you can track here.

https://issues.apache.org/jira/browse/FLINK-26374

Seth

On Thu, Feb 24, 2022 at 4:18 PM Jonathan Weaver 
wrote:

> Using the latest SNAPSHOT BUILD.
>
> If I have a column definition as
>
>  .column(
> "events",
> DataTypes.ARRAY(
> DataTypes.ROW(
> DataTypes.FIELD("status",
> DataTypes.STRING().notNull()),
> DataTypes.FIELD("timestamp",
> DataTypes.STRING().notNull()),
> DataTypes.FIELD("increment_identifier",
> DataTypes.STRING().nullable()
>
> And a query as
>
> JSON_OBJECT('events' VALUE events) event_json
>
> Will generate JSON correctly ONLY if increment_identifier is NOT NULL but
> will throw a NullPointerException on the first record that has that column
> as null.
>
> Exception is not helpful.
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> at akka.dispatch.OnComplete.internal(Future.scala:300)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> 

Re: Read parquet data from S3 with Flink 1.12

2021-12-21 Thread Seth Wiesman
Hi Alexandre,

You are correct, BatchTableEnvironment does not exist in 1.14 anymore. In
1.15 we will have the state processor API ported to DataStream for exactly
this reason, it is the last piece to begin officially marking DataSet as
deprecated. As you can understand, this has been a multi year process and
there have been some rough edges as components are migrated.

The easiest solution is for you to use 1.12 DataSet <-> Table interop. Any
savepoint you create using Flink 1.12 you should be able to restore on a
1.14 DataStream application.

I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
hard requirement, rewriting your input data into another format could also
be a viable stop-gap solution.

Seth

On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> wrote:

> Hello,
>
> I also face the same issue as documented in a previous mail from the
> mailing list [1]
> Basically when using flink-parquet, I get:
>
>>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
>
> I have no idea what I need to do to fix this and could not find anything
> from the doc. I tried importing various hadoop libraries, but it always
> causes yet another issue.
>
> I think this might be the root cause of my problem.
>
> Best,
> Alex
>
> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
>
> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> wrote:
>
>> Hello Piotrek,
>> Thank you for the help.
>> Regarding the S3 issue I have followed the documentation for the plugins.
>> Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
>> Also, in this case, just reading regular plain text file works, I only
>> have an issue when using Parquet.
>>
>> I tried switching to Flink 1.14, however I am stumbling upon other
>> blockers.
>> To give more context, I am trying to build a Flink savepoint for cold
>> start data. So I am using the Flink State Processor API. But:
>>  -  Flink State Processor API is using the DataSet api which is now
>> marked as deprecated (Legacy)
>>  - the doc you shared regarding reading from Parquet uses the DataStream
>> API
>>  - the Flink State Processor API doc [1] states there is interoperability
>> of DataSet and Table API
>> 
>>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>>
>> Given that we can convert from DataStream to Table API, I was thinking I
>> could then convert from Table to DataSet API (though very cumbersome and
>> unsure if any performance / memory impact).
>> But for the Table to DataSet conversion, the doc is using a 
>> BatchTableEnvironment
>> class which does not seem to exist in Flink 1.14 anymore
>>
>> Any recommendations or anything I might have missed?
>>
>> Thank you.
>>
>> Best,
>> Alex
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>>
>> 
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>
>>
>> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Reading in the DataStream API (that's what I'm using you are doing) from
>>> Parquet files is officially supported and documented only since 1.14 [1].
>>> Before that it was only supported for the Table API. As far as I can tell,
>>> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
>>> already been in the code base since 1.12.x. I don't know how stable it was
>>> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
>>> a last resort you can try using the very least the latest version of 1.12.x
>>> branch as documented by 1.14 version, but I can not guarantee that it will
>>> be working.
>>>
>>> Regarding the S3 issue, have you followed the documentation? [2][3]
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>>>
>>>
>>> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
>>> alexandre.montecu...@grabtaxi.com> napisał(a):
>>>
 Hello everyone,
 I am struggling to read S3 parquet files from S3 with Flink Streaming
 1.12.2
 I had some difficulty simply reading from local parquet files. I
 finally managed that part, though the solution feels 

Re: Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread Seth Wiesman
No. The default max parallelism of 128 will be applied. If you try to
restore above that value, the restore will fail and you can simply restore
at a smaller value.

No data loss.

On Mon, Dec 20, 2021 at 2:28 AM 杨浩  wrote:

>
> Thanks for your replay. If we don't set the max parallelism, and we change
> the parallelism to a very big num, will the state loss?
>
>
>
>
>
> At 2021-11-27 01:20:49, "Yun Tang"  wrote:
> >Hi Yang,
> >
> >Flink keeps the max key groups the same no matter how parallelism changes, 
> >and use this to avoid state data lost [1]
> >
> >[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
> >
> >
> >Best
> >Yun Tang
> >
> >On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
> >> Hi,
> >>
> >> to rescale, you should take a savepoint, stop the job, then restart from
> >> the savepoint with your new desired parallelism. This way, no data will be
> >> lost.
> >>
> >> Best,
> >> Nico
> >>
> >> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
> >>
> >> > Will Flink loss some old Keyed State when changing the parallelism, like 
> >> > 2
> >> > -> 5, or 5->3?
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
>
>
>
>
>


Re: UDF and Broadcast State Pattern

2021-12-15 Thread Seth Wiesman
Hi Krzysztof,

There is a difference in semantics here between yourself and Caizhi. SQL
UDFs can be used statefully - see AggregateFunction and
TableAggregateFunction for examples. You even have access to ListView and
MapView which are backed by ListState and MapState accordingly. These
functions contain aggregates which do participate in checkpointing and are
strongly consistent.

What is not supported are more low level process function type operations
(custom state registration, user access to timers, broadcast state as
you've discovered). There have been some discussions about how to add this
sort of functionality in a SQL compliant manner but nothing concrete.

In the meantime, Flink SQL has strong interop with the DataStream API. You
can always transform a Table into a DataStream, do some low level
processing, and then transform it back into a table to run further SQL.

Seth

On Wed, Dec 15, 2021 at 3:52 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thank you,
> yes I was thinking about simply running my own thread in UDF and consume
> some queue something like that.
> Having some background with DataStreamAPI i was hoping that I can reuse
> same mechanisms (like Broadcast State Pattern or CoProcessFunction) in
> Flink SQL.
> However it seems there is a quite noticeable gap between what you can do
> with SQL and what you can control comparing to DataStreamAPI.
>
> Regarding UDF being stateless. I assume you mean that UDF does not
> participate in checkpoint mechanism and I cannot initialize a Flink state
> in UDF right?
> I'm wondering why it is not possible. Seems like an "obligatory" feature
> for Statefull stream processing platform that supports SQL. In a time when
> there is a huge interest with Flink SQL and everyone is talking about it,
> thing like state support is not available in SQL UDF is suprissing.
>
> Are there any plans, maybe FLIP to change it?
>
> Regards,
> Krzysztof Chmielewski
>
> śr., 15 gru 2021 o 02:36 Caizhi Weng  napisał(a):
>
>> Hi!
>>
>> Currently you can't use broadcast state in Flink SQL UDF because UDFs are
>> all stateless.
>>
>> However you mentioned your use case that you want to control the logic in
>> UDF with some information. If that is the case, you can just run a thread
>> in your UDF to read that information and change the behavior of the eval
>> method accordingly.
>>
>> Krzysztof Chmielewski  于2021年12月15日周三
>> 05:47写道:
>>
>>> Hi,
>>> Is there a way to build an UDF [1] for FLink SQL that can be used with
>>> Broadcast State Pattern [2]?
>>>
>>> I have a use case, where I would like to be able to use broadcast
>>> control stream to change logic in UDF.
>>>
>>> Regards,
>>> Krzysztof Chmielewski
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#user-defined-functions
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern
>>>
>>


Re: Sending an Alert to Slack, AWS sns, mattermost

2021-12-14 Thread Seth Wiesman
Sure,

Just implement `RichSinkFunction`. You will initialize your client inside
the open method and then send alerts from invoke.

Seth

On Mon, Dec 13, 2021 at 9:17 PM Robert Cullen  wrote:

> Yes, That's the correct use case.  Will this work with the DataStream
> API?  UDFs are for the Table API, correct?  Is there a custom sink that can
> be applied?  Such as this Fraud Detection example [1].  But in this use
> case instead of sending the alert to a log it sends the message to a
> webhook?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/datastream/
>
> On Mon, Dec 13, 2021 at 8:25 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Could you please elaborate more on your use case? Do you want to check
>> the records in a data stream and if some condition is met then send an
>> alert?
>>
>> If that is the case, you can use a UDF for checking and sending alerts.
>> See [1] for detailed explanation about UDF.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/
>>
>> Robert Cullen  于2021年12月14日周二 04:08写道:
>>
>>> Hello,
>>>
>>> I'm looking for some guidance on how to send alert notifications from a
>>> DataStream to a Slack channel and possibly other alerting tools (ie. AWS
>>> sns, mattermost)
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: [DISCUSS] Deprecate MapR FS

2021-12-09 Thread Seth Wiesman
+1

I actually thought we had already dropped this FS. If anyone is still
relying on it in production, the file system abstraction in Flink has been
incredibly stable over the years. They should be able to use the 1.14 MapR
FS with later versions of Flink.

Seth

On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
wrote:

> Hi all,
>
> Flink supports multiple file systems [1] which includes MapR FS. MapR as a
> company doesn't exist anymore since 2019, the technology and intellectual
> property has been sold to Hewlett Packard.
>
> I don't think that there's anyone who's using MapR anymore and therefore I
> think it would be good to deprecate this for Flink 1.15 and then remove it
> in Flink 1.16. Removing this from Flink will slightly shrink the codebase
> and CI runtime.
>
> I'm also cross posting this to the User mailing list, in case there's
> still anyone who's using MapR.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>


Re: GCS/Object Storage Rate Limiting

2021-12-08 Thread Seth Wiesman
Not sure if you've seen this, but Flinks file systems do support connection
limiting.

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/common/#connection-limiting

Seth

On Wed, Dec 8, 2021 at 12:18 PM Kevin Lam  wrote:

> Hey David,
>
> Thanks for the response. The retry eventually succeeds, but I was
> wondering if there was anything that people in the community have done to
> avoid GCS/S3 rate-limiting issues. The retries do result in it taking
> longer for all the task managers to recover and register.
>
> On Mon, Dec 6, 2021 at 3:42 AM David Morávek  wrote:
>
>> Hi Kevin,
>>
>> Flink comes with two schedulers for streaming:
>> - Default
>> - Adaptive (opt-in)
>>
>> Adaptive is still in experimental phase and doesn't support local
>> recover. You're most likely using the first one, so you should be OK.
>>
>> Can you elaborate on this a bit? We aren't changing the parallelism when
>>> restoring.
>>>
>>
>> Splitting / merging of the rocksdb based operator checkpoint is currently
>> an expensive operation. If the parallelism remains unchanged, you should be
>> OK, the majority of time for the operator state restore will be spend on
>> download of the rocksdb snapshot.
>>
>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>>> parallelism of 512.
>>>
>>
>> This could definitely generate lot of concurrent requests when restoring
>> the state.
>>
>> Does the restore operation fail, or the retry mechanism is sufficient to
>> work around this?
>>
>> D.
>>
>> On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam  wrote:
>>
>>> HI David,
>>>
>>> Thanks for your response.
>>>
>>> What's the DefaultScheduler you're referring to? Is that available in
>>> Flink 1.13.1 (the version we are using)?
>>>
>>> How large is the state you're restoring from / how many TMs does the job
 consume / what is the parallelism?
>>>
>>>
>>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>>> parallelism of 512.
>>>
>>> Also things could get even worse if the parallelism that has been used
 for taking the checkpoint is different from the one you're trying to
 restore with (especially with RocksDB).

>>>
>>> Can you elaborate on this a bit? We aren't changing the parallelism when
>>> restoring.
>>>
>>> On Thu, Dec 2, 2021 at 10:48 AM David Morávek  wrote:
>>>
 Hi Kevin,

 this happens only when the pipeline is started up from savepoint /
 retained checkpoint right? Guessing from the "path" you've shared it seems
 like a RockDB based retained checkpoint. In this case all task managers
 need to pull state files from the object storage in order to restore. This
 can indeed be a heavy operation especially when restore a large state with
 high parallelism.

 Recovery from failure should be faster (with DefaultScheduler) as we
 can re-use the local files that are already present on TaskManagers.

 How large is the state you're restoring from / how many TMs does the
 job consume / what is the parallelism?

 Also things could get even worse if the parallelism that has been used
 for taking the checkpoint is different from the one you're trying to
 restore with (especially with RocksDB).

 Best,
 D.

 On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam  wrote:

> Hi all,
>
> We're running a large (256 task managers with 4 task slots each) Flink
> Cluster with High Availability enabled, on Kubernetes, and use Google 
> Cloud
> Storage (GCS) as our object storage for the HA metadata. In addition, our
> Flink application writes out to GCS from one of its sinks via streaming
> file sink + GCS connector.
>
> We observed the following types of errors when running our application:
>
> ```
>
> INFO: Encountered status code 429 when sending GET request to URL '
> https://storage.googleapis.com/download/storage/v1/b//o/checkpoints%2F%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media=1638448883568946'.
> Delegating to response handler for possible retry. [CONTEXT
> ratelimit_period="10 SECONDS [skipped: 8]" ]
>
> ```
>
> ```
>  INFO: Encountered status code 503 when sending POST request to URL '
> https://storage.googleapis.com/upload/storage/v1/b//o?uploadType=multipart'.
> Delegating to response handler for possible retry.
> ```
>
> They typically happen upon cluster start-up, when all the task
> managers are registering with the jobmanager. We've also seen them occur 
> as
> a result of output from our sink operator as well.
>
> Has anyone else encountered similar issues? Any practices you can
> suggest?
>
> Advice appreciated!
>
> Thanks
>



Re: Dependency injection for TypeSerializer?

2021-11-10 Thread Seth Wiesman
Yes I did, thanks for sending it back :) Copying my previous reply for the
ML:

Hey Thomas,
>
> You are correct that there is no way to inject dynamic information into
> the TypeSerializer configured from the TypeSerializerSnapshot, but that
> should not be a problem for your use case.
>
> The type serializer instantiated from a TypeSerializerSnapshot is only
> used to perform schema migrations. Assuming the schema registry enforces
> all changes are backwards compatible, your snapshot instance can always
> return CompatibleAsIs and its serializer will never be used.
>
> The tradeoff here is that when the schema does change, Flink will not
> eagerly migrate all values in state but instead lazily migrate as state
> values are updated.
>
> Seth
>

Currently the TypeSerializerSnapshot logic is completely deterministic, and
my intuition is that we should not change that. Please let us know if what
I described does not work in practice and we can take it from there.

Seth

On Wed, Nov 10, 2021 at 3:20 AM Arvid Heise  wrote:

> Hi Thomas,
>
> Could you add a sketch of your preferred solution? From what I gathered,
> you have all the information available in your main (probably misunderstood
> that), so what's keeping you from adding the TypeSerializer as a field to
> your UDF?
>
> On Tue, Nov 9, 2021 at 11:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi,
>> In my past project I was able to use Spring as a DI provider for Flink
>> Jobs. It actually saves me a lot of hassle while writing/composing jobs and
>> process functions.
>> I was able to use all Spring's Bean annotations along with properties
>> files managed by Spring as it would be a "normal" spring app. The
>> dependencies that I was injecting via Spring were not
>> serialized/deserialized by Flink which actually was something that I wanted
>> to achieved. In some cases it is very hard or maybe even impossible to make
>> some 3rd party classes serializable.
>>
>> Things to highlight here:
>> 1. I did it only for StreamAPI i think it could work also for TableAPI
>> though.
>> 2.I was loading a Spring context from ProcessFunction::open method.
>> I was able to customize via Job parameters which Spring configuration I
>> want to load.
>> After doing this, all fields annotated with @Autowired were injected.
>> 3, I was using standard @Configuration classes
>>
>> Issues:
>> 1. Since i was using operator::open method to load the context, the
>> context will be loaded few times depends on the number of operators
>> deployed on particular Task Manager. This however could be improved.
>> 2. The important thing here was that all your classes have to be
>> "deployed" on every Task Manager/Job Manager in order to load them through
>> DI.
>> We achieved this by using what is called "Job session" cluster. Where our
>> custom Flink docker image was build in a way that it contains our job jar
>> with all dependencies needed.
>>
>> Because of that, we were not be able to use things like AWS EMR or
>> Kinesis.
>>
>> Cheers,
>> Krzysztof Chmielewski
>>
>> wt., 9 lis 2021 o 06:46 Thomas Weise  napisał(a):
>>
>>> Hi,
>>>
>>> I was looking into a problem that requires a configurable type
>>> serializer for communication with a schema registry. The service
>>> endpoint can change, so I would not want to make it part of the
>>> serializer snapshot but rather resolve it at graph construction time
>>> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
>>> be embedded into a checkpoint).
>>>
>>> TypeSerializer is instantiated via either TypeInformation or
>>> TypeSerializerSnapshot. While TypeInformation provides access to
>>> ExecutionConfig and therefore ability to access parameters from
>>> GlobalJobParameters that could be provided through the entry point,
>>> restoreSerializer requires the serializer to be constructed from the
>>> snapshot state alone.
>>>
>>> Ideally there would be a dependency injection mechanism for user code.
>>> Discussion in [1] indicated there isn't a direct solution. Has anyone
>>> come across a similar use case and found a way to work around this
>>> limitation? It might be possible to work with a configuration
>>> singleton that initializes from a file in a well known location, but
>>> that depends on the deployment environment and doesn't play nice with
>>> testing.
>>>
>>> Thanks,
>>> Thomas
>>>
>>> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>>>
>>


Re: to join or not to join, that is the question...

2021-11-08 Thread Seth Wiesman
There is no such restriction on connected streams; either input may modify
the keyed state. Regarding performance, the difference between the two
should be negligible and I would go with the option with the cleanest
semantics. If both streams are the same type *and* you do not care which
input an element was sourced from, use a union. Otherwise, pick a connected
stream.

Seth

On Fri, Nov 5, 2021 at 8:44 AM Marco Villalobos 
wrote:

> In my situation, the streams are of the same type, which means union is an
> option.
>
> However, will creating new stream with union perform more slowly than
> processing connected streams?
>
> I want to use the option that performs better.
>
> The logic on the data is actually very simple.  But both streams will need
> to modify the same keyed state. I thought that there was a restriction in
> connected streams in which only only stream can modify keyed state.
>
> On Fri, Nov 5, 2021 at 6:39 AM Timo Walther  wrote:
>
>> Union can be an option if you want to unify the streams first and then
>> apply a key by on the common stream.
>>
>> Otherwise connect() is the way to go. See an example for joining here:
>>
>>
>> https://github.com/twalthr/flink-api-examples/blob/main/src/main/java/com/ververica/Example_06_DataStream_Join.java
>>
>> Regards,
>> Timo
>>
>>
>> On 05.11.21 14:36, Dario Heinisch wrote:
>> > Union creates a new stream containing all elements of the unioned
>> > streams:
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/#union
>> >
>> >
>> > On 05.11.21 14:25, Marco Villalobos wrote:
>> >> Can two different streams flow to the same operator (an operator with
>> >> the same name, uid, and implementation) and then share keyed state or
>> >> will that require joining the streams first?
>> >>
>> >>
>> >
>>
>>


Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Seth Wiesman
In general I would strongly encourage you to find a way to `key` your
stream, it will make everything much simpler.

On Thu, Nov 4, 2021 at 6:05 PM Seth Wiesman  wrote:

> Why not?
>
> All those classes have a Symbol attribute, why can't you use that to key
> the stream?
>
> On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou 
> wrote:
>
>> Hi Seth,
>>
>> thank you for your answer.
>> In this case you are right and it would solve my problem. but actually my
>> case is a bit more complex and my mistake I wanted to provide a simple
>> example.
>>
>> The actual case is,
>>
>> I have DataStream< ServerAwareMessage  > inputStream as a source ,
>> Message is just an interface. The events can be of certain subtypes
>>
>> 1) class OrderRequest implements  ServerAwareMessage   {
>>String symbol
>>String orderType
>> }
>>
>> 2) class OrderActivated implements  ServerAwareMessage   {
>>String symbol
>>String orderType
>>long orderId
>> }
>>
>> 3) class DealPerformed implements  ServerAwareMessage   {
>>String symbol
>>String orderType
>> }
>>
>> 4) class OrderFilled implements  ServerAwareMessage   {
>>String symbol
>>String orderType
>>long orderId
>> }
>>
>> And here is the pattern sequence.
>>
>> Pattern.begin(OPEN,
>> AfterMatchSkipStrategy.skipToNext())
>> .where(new SimpleCondition () {
>> @Override
>> public boolean filter(ServerAwareMessage value)
>> throws Exception {
>>  return value instanceof OrderRequest
>>  }  )
>> .followedBy("OrderActivated ")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(ServerAwareMessage value,
>> Context ctx) throws Exception {
>> if(value.getMessage() instanceof  OrderActivated
>> ) {
>> var msg = ( OrderActivated )
>> value.getMessage();
>> var list =
>> StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
>> .filter(i -> i.getMessage() instanceof
>> OrderRequest  )
>> .collect(Collectors.toList());
>>return  list.stream().allMatch(i -> ((
>> OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>> ((
>> OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));
>>
>> }
>> return false;
>> }
>> })
>> .followedBy("DealPerformed")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(ServerAwareMessage value,
>> Context ctx) throws Exception {
>> if (value.getMessage() instanceof  DealPerformed
>> ) {
>> var order = ( DealPerformed  )
>> value.getMessage();
>> var list =
>> StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
>> ").spliterator(), false)
>> .filter(i -> i.getMessage() instanceof
>> OrderPerformed)
>> .collect(Collectors.toList());
>>
>> return list.stream().allMatch(i -> ((
>> OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
>> ((  OrderActivated
>> )i.getMessage()).getOrderType().equals(msg.getOrderType()));
>>
>> }
>> return false;
>> }
>> })
>> .followedBy("OrdeFilled")
>> .where(new IterativeCondition() {
>> @Override
>> public boolean filter(ServerAwareMessage value,
>> Context ctx) throws Exception {
>> if (value.getMessage() instanceof  OrderFilled  )
>> {
>> var order = ( OrderFilled  )
>> value.getMessage();
>> var list =
>> StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"

Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Seth Wiesman
Why not?

All those classes have a Symbol attribute, why can't you use that to key
the stream?

On Thu, Nov 4, 2021 at 5:51 PM Isidoros Ioannou  wrote:

> Hi Seth,
>
> thank you for your answer.
> In this case you are right and it would solve my problem. but actually my
> case is a bit more complex and my mistake I wanted to provide a simple
> example.
>
> The actual case is,
>
> I have DataStream< ServerAwareMessage  > inputStream as a source ,
> Message is just an interface. The events can be of certain subtypes
>
> 1) class OrderRequest implements  ServerAwareMessage   {
>String symbol
>String orderType
> }
>
> 2) class OrderActivated implements  ServerAwareMessage   {
>String symbol
>String orderType
>long orderId
> }
>
> 3) class DealPerformed implements  ServerAwareMessage   {
>String symbol
>String orderType
> }
>
> 4) class OrderFilled implements  ServerAwareMessage   {
>String symbol
>String orderType
>long orderId
> }
>
> And here is the pattern sequence.
>
> Pattern.begin(OPEN,
> AfterMatchSkipStrategy.skipToNext())
> .where(new SimpleCondition () {
> @Override
> public boolean filter(ServerAwareMessage value) throws
> Exception {
>  return value instanceof OrderRequest
>  }  )
> .followedBy("OrderActivated ")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(ServerAwareMessage value,
> Context ctx) throws Exception {
> if(value.getMessage() instanceof  OrderActivated
> ) {
> var msg = ( OrderActivated )
> value.getMessage();
> var list =
> StreamSupport.stream(ctx.getEventsForPattern(OPEN).spliterator(), false)
> .filter(i -> i.getMessage() instanceof
> OrderRequest  )
> .collect(Collectors.toList());
>return  list.stream().allMatch(i -> ((
> OrderRequest  )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
> ((
> OrderRequest)i.getMessage()).getOrderType().equals(msg.getOrderType()));
>
> }
> return false;
> }
> })
> .followedBy("DealPerformed")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(ServerAwareMessage value,
> Context ctx) throws Exception {
> if (value.getMessage() instanceof  DealPerformed
> ) {
> var order = ( DealPerformed  )
> value.getMessage();
> var list =
> StreamSupport.stream(ctx.getEventsForPattern(" OrderActivated
> ").spliterator(), false)
> .filter(i -> i.getMessage() instanceof
> OrderPerformed)
> .collect(Collectors.toList());
>
> return list.stream().allMatch(i -> ((
> OrderActivated   )i.getMessage()).getSymbol().equals(msg.getSymbol()) &&
> ((  OrderActivated
> )i.getMessage()).getOrderType().equals(msg.getOrderType()));
>
> }
> return false;
> }
> })
> .followedBy("OrdeFilled")
> .where(new IterativeCondition() {
> @Override
> public boolean filter(ServerAwareMessage value,
> Context ctx) throws Exception {
> if (value.getMessage() instanceof  OrderFilled  ) {
> var order = ( OrderFilled  )
> value.getMessage();
> var list =
> StreamSupport.stream(ctx.getEventsForPattern( "DealPerformed"
> ).spliterator(), false)
> .filter(i -> i.getMessage() instanceof
> OrderActivationRequestNewDue)
> .collect(Collectors.toList());
>
> return list.stream().allMatch(i -> ((
> DealPerformed  )i.getMessage()).getSymbol().equals(order.getSymbol()) &&
> (( DealPerformed
> )i.getMessage()).getOrderType().equals(order.getOrderType());
>
> }
> return false;
> }
> })
>
> In this case I can not group by unfortunately. so I may a receive a
> packet  { OrderRequest(1), OrderActivated (1) , OrderRequest (2),
> DealPerformed(1) , OrderActivated(2), OrderRequest(3), DealPerformed(2),
> OrderFilled(1), OrderFilled(2), OrderActivated(3)} and etc.
> For me it is crucial to match all the event sequence (1) (2), etc. and
> there is a case where the sequence of the Messages is incomplete , that
> means that an 

Re: IterativeCondition instead of SimpleCondition not matching pattern

2021-11-04 Thread Seth Wiesman
HI Isidoros,

If you want the pattern to be scoped to symbols, I suggest you use a
`keyBy` in your stream.

Constructing the pattern will now look like this:


KeyedStream keydInput = inputStream.keyBy(model ->
model.getSymbol);

PatternStream marketOpenPatternStream =
CEP.pattern(keydInput, pattern);


This way each symbol type will be evaluated independently.

Seth

On Thu, Nov 4, 2021 at 4:01 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Thanks for the update, the requirements make sense.
>
> Some follow up questions:
> * What time characteristic are you using? Processing or Event?
> * Can you describe a bit more what you mean by "input like the one I have
> commented bellow"? What is special about the one you have commented?
>
> Best,
> Austin
>
> On Thu, Nov 4, 2021 at 4:09 PM Isidoros Ioannou 
> wrote:
>
>>
>>
>> -- Forwarded message -
>> Από: Isidoros Ioannou 
>> Date: Πέμ, 4 Νοε 2021 στις 10:01 μ.μ.
>> Subject: Re: IterativeCondition instead of SimpleCondition not matching
>> pattern
>> To: Austin Cawley-Edwards 
>>
>>
>> Hi Austin,
>> thank you for your answer and I really appreciate your willingness to
>> help.
>>
>> Actually the desired output is the one below
>>
>> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5,
>> text='B', symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}],
>> fourth=[Model{id=8, text='D', symbol='AU'}]} {start=[Model{id=9, text='A',
>> symbol='GB'}], second=[Model{id=10, text='B', symbol='GB'}],
>> third=[Model{id=11, text='C', symbol='GB'}], fourth=[Model{id=12, text='D',
>> symbol='GB'}]}
>> I would like only to generate sequences of Models that have the same
>> symbol. I noticed that if an event does not come as input
>> like the one I have commented bellow, it breaks all the pattern match and
>> the desired output is never produced
>>
>> DataStream inputStream = env.fromElements(
>> Model.of(1, "A", "US"),
>> Model.of(2, "B", "US"),
>> Model.of(3, "C", "US"),
>> Model.of(4, "A", "AU"),
>> Model.of(5, "B", "AU"),
>> Model.of(6, "C", "AU"),
>>   //Model.of(7, "D", "US"),
>> Model.of(8, "D", "AU"),
>> Model.of(9, "A", "GB"),
>> Model.of(10, "B", "GB"),
>> Model.of(13, "D", "GB"),
>> Model.of(11, "C", "GB"),
>> Model.of(12, "D", "GB")
>>
>> Kind Regards,
>> Isidoros
>>
>>
>> Στις Πέμ, 4 Νοε 2021 στις 8:40 μ.μ., ο/η Austin Cawley-Edwards <
>> austin.caw...@gmail.com> έγραψε:
>>
>>> Hi Isidoros,
>>>
>>> Thanks for reaching out to the mailing list. I haven't worked with the
>>> CEP library in a long time but can try to help. I'm having a little trouble
>>> understanding the desired output + rules. Can you mock up the desired
>>> output like you have for the fulfilled pattern sequence?
>>>
>>> Best,
>>> Austin
>>>
>>> On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou 
>>> wrote:
>>>

 I face an issue when try to match some elements in a Pattern sequence.
 Flink 1.11.1 version. Here is my case:

 final StreamExecutionEnvironment env = 
 EnvironmentProvider.getEnvironment();
 DataStream inputStream = env.fromElements(
 Model.of(1, "A", "US"),
 Model.of(2, "B", "US"),
 Model.of(3, "C", "US"),
 Model.of(4, "A", "AU"),
 Model.of(5, "B", "AU"),
 Model.of(6, "C", "AU"),
   //Model.of(7, "D"),
 Model.of(8, "D", "AU"),
 Model.of(9, "A", "GB"),
 Model.of(10, "B", "GB"),
 Model.of(13, "D", "GB"),
 Model.of(11, "C", "GB"),
 Model.of(12, "D", "GB")


 
 ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
 .forceNonParallel();

 Pattern pattern = Pattern.begin("start", 
 AfterMatchSkipStrategy.skipToNext())
 .where(new IterativeCondition() {
 @Override
 public boolean filter(Model value, Context ctx) 
 throws Exception {
 return value.getText().equalsIgnoreCase("A");
 }
 }).followedBy("second")
 .where(new IterativeCondition() {
 @Override
 public boolean filter(Model value, Context ctx) 
 throws Exception {

 return value.getText().equalsIgnoreCase("B");
 }
 }).followedBy("third")
 .where(new IterativeCondition() {
 @Override
 public boolean filter(Model value, Context ctx) 
 throws Exception {

 return value.getText().equalsIgnoreCase("C");
 }
 }).followedBy("fourth")
 .where(new IterativeCondition() {
  

Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Seth Wiesman
Hi Marc,

I think you will find this is less efficient than just using keyed state.
Remember state backends are local, reading and writing is extremely cheap.
HashMapStateBackend is just an in-memory data structure and
EmbeddedRocksDBStateBackend only works against local disk. Additionally,
the embedded rocksdb state backend already supports incremental
checkpointing, so when an asynchronous checkpoint does occur you are not
paying transfer cost on slow changing state values.

Seth



On Tue, Oct 12, 2021 at 10:12 AM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Unfortunately, I don't really have the hand on the custom state solution
> since it is managed by an existing system which cannot be easily modified.
>
> What I finally did for the "data state" in my CoFlatMapFunction is to use a*
> list-style operator state* to store the partitioned state for a key per
> element in the list with an *union redistribution* scheme in case of
> restore/redistribution.
> Not sure if it's really efficient (need to do more tests) but all
> operators are then receiving the same whole custom state from which the
> partitioned state for the assigned keys can then be retrieved inside every
> operator parallel task besides the other keyed state (control state):
>
>
>
>
> *// Control state partitioned by userId (keyed state) private
> ValueState controlState; // Data state partitioned by userId
> (operator state) private ListState dataState;*
>
> To avoid "state explosion", I also added a custom TTL-based cleanup
> mechanism for this operator state to remove elements in the list which are
> not used for some time.
> However, I am still interested in any other better solution if available
> in Flink.
>
> Thank you for your help.
>
> Best Regards,
> Marc
>
>
> Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> thanks for clarifying, I had misunderstood some parts.
>> Unfortunately, I don't think there is a way to update keyed state (for
>> multiple keys even) outside of a keyed context.
>>
>> I will ask if someone else has an idea, but allow me to ask one
>> counter-question first: Did you actually run tests to verify that using the
>> custom state solution is more efficient than using Flink's keyed state
>> regularly (in the end, you would even have to include the state
>> synchronization in the performance test)? Efficient stateful stream
>> processing is one of the key features of Flink, and you are essentially
>> trying to override a specific piece of it with custom logic.
>>
>> Best regards,
>> Nico
>>
>> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:
>>
>>> Hello Nicolaus,
>>>
>>> Thank you for your quick feedback, sorry if I am not clear enough.
>>> Actually in the documented example, the state which is updated in the
>>> snapshotState method is an operator state and not a keyed state:
>>>
>>> *public void initializeState(FunctionInitializationContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition =
>>> context.getOperatorStateStore().getOperatorState(new
>>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>>
>>>
>>>
>>>
>>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition.add(localCount);*
>>>
>>> *}*
>>>
>>>
>>> It seems that the method is then only called once per operator parallel
>>> task and not once per key.
>>> On my side I have two keyed states with same key (e.g., userId) in a
>>> CoFlatMapFunction:
>>>
>>>
>>>
>>>
>>> *// Control state partitioned by userId private ValueState
>>> controlState; // Data state partitioned by userId coming from the
>>> ser/deserialization of a custom system having a partitioned state private
>>> ValueState dataState;*
>>>
>>> and I would like to do something like that to update dataState in a
>>> keyed context for every key and every checkpoint:
>>>
>>>
>>>
>>> *public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>>> // Not a keyed context here ! }*
>>>
>>> instead of saving dataState in the flatMap2 function for every received
>>> event:
>>>
>>>
>>> *public void flatMap1(Control control, Collector out) {*
>>>
>>> *   controlState.update(control); *
>>>
>>> *}*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public void flatMap2(Event event, Collector out) {  //
>>> Perform some event transformations based on controlState  ProcessedEvent
>>> result = customSystem.process(controlState.value() , event);  // Save
>>> internal custom system state after processing: can be costly if high event
>>> throughput
>>> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
>>> // Output the processed event  out.collect(result); }*
>>>
>>>
>>> So basically, I want to be able to synchronize the partitioned state of
>>> my custom system with the checkpoints done by Flink.
>>>
>>>
>>> Best 

Re: How to add Flink a Flink connector to stateful functions

2021-09-28 Thread Seth Wiesman
I just want to add that the StateFun documentation does cover using custom
Flink connectors[1].

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/modules/io/flink-connectors/#flink-connectors


On Tue, Sep 28, 2021 at 2:52 AM Christian Krudewig (Corporate Development) <
christian.krude...@dpdhl.com> wrote:

> Hello Igal,
>
>
>
> Thanks for replying in detail and also so quickly.
>
>
>
> It’ll take me some time to try it out, thank you!
>
>
>
> Best,
>
>
>
> Christian
>
>
>
>
>
> *--*
>
>
> *Dr. Christian Krudewig*Corporate Development – Data Analytics
>
> *Deutsche Post DHL*
> Headquarters
> Charles-de-Gaulle-Str. 20
> 53113 Bonn
> Germany
>
> Phone: +49 (0) 228 – 189 63389
>
> christian.krude...@dpdhl.com
>
> [image: cid:image002.png@01D65A98.90D07E00]
> 
> *Learn more **here
> *
>
> Deutsche Post AG; Registered office Bonn; Register court Bonn; HRB 6792
>
> Board of Management: Dr. Frank Appel, Chairman; Ken Allen, Oskar de Bok,
> Melanie Kreis, Dr. Tobias Meyer, Dr. Thomas Ogilvie, John Pearson, Tim
> Scharwath
>
> Chairman of the Supervisory Board: Dr. Nikolaus von Bornhard
>
> This message is from Deutsche Post AG and may contain confidential
> business information. It is intended solely for the use of the individual
> to whom it is addressed. If you are not the intended recipient please
> contact the sender and delete this message and any attachment from your
> system. Unauthorized publication, use, dissemination, forwarding, printing
> or copying of this E-Mail and its attachments is strictly prohibited.
>
>
>
> *Von:* Igal Shilman 
> *Gesendet:* Dienstag, 28. September 2021 08:14
> *An:* Christian Krudewig (Corporate Development) <
> christian.krude...@dpdhl.com>
> *Cc:* i...@ververica.com; ro...@apache.org; user@flink.apache.org
> *Betreff:* Re: How to add Flink a Flink connector to stateful functions
>
>
>
> Hello Christian,
>
>
>
> I'm happy to hear that you are trying out StateFun and like the toolset!
>
>
>
> Currently StateFun supports "out of the box" only Kafka/Kinesis egresses,
> simply because so far folks didn't requested anything else. I can create a
> JIRA issue for that and we'll see how the community responds.
>
>
>
> Meanwhile, exposing existing Flink connectors as Sinks, is also possible
> using the link you provided.
>
> You can see for example our e2e test does it [1]
>
>
>
> The way it works is:
>
> 1. You indeed need to create a Java application that depends on the
> specific Flink connector that you are using.
>
>
>
> 2. The application needs to contain a StatefulFunctionModule that binds
> this Egress.
>
>
>
> 3. Then you create a JAR and you can start statefun using the official
> Docker image: apache/flink-statefun by mounting your module into the
> modules/ path, for example:
>
> /opt/statefun/modules/my_module/
>
> Alternatively you can create your own Docker image that derives from
> StateFun but only adds that jar into the modules directory. [2]
>
>
>
> I hope that it helps,
>
> Igal
>
>
>
> [1]
>
>
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java#L40
>
>
>
> [2]
>
>
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/resources/Dockerfile#L20
>
>
>
> On Tue 28. Sep 2021 at 07:40, Christian Krudewig (Corporate Development) <
> christian.krude...@dpdhl.com> wrote:
>
> Hello Roman,
>
> Well, if that's the way to do it, I can manage to maintain a fork of the
> statefun repo with these tiny changes. But first my question is if that is
> the way it should be done? Or if there is another way to activate these
> connectors.
>
> Best,
>
> Christian
>
> -Ursprüngliche Nachricht-
> Von: Roman Khachatryan 
> Gesendet: Dienstag, 28. September 2021 00:31
> An: Christian Krudewig (Corporate Development) <
> christian.krude...@dpdhl.com>; Igal Shilman 
> Cc: user@flink.apache.org
> Betreff: Re: How to add Flink a Flink connector to stateful functions
>
> Hi,
>
> > Does that mean that I need to build the stateful functions java
> application and afterwards the docker image?
> Yes, you have to rebuild the application after updating the pom, as well
> as its docker image.
>
> Is your concern related to synchronizing local docker images with the
> official repo?
> If so, wouldn't using a specific statefun image version solve this issue?
>
> Regards,
> Roman
>
> On Mon, Sep 27, 2021 at 9:29 PM Christian Krudewig (Corporate
> Development)  wrote:
> >
> > Hello everyone,
> >
> >
> >
> > Currently I’m busy setting up a pipeline with Stateful Functions using a
> deployment of the standard docker image “apache/flink-statefun” to
> kubernetes. It has been going smoothly so far and I love the whole toolset.
> But now I want to add Egress modules for 

Re: Built-in functions to manipulate MULTISET type

2021-09-20 Thread Seth Wiesman
The type strategy can be generic over the input and output types, so you
can write something generic that say given a multiset of some type T this
function returns an array of some type T. This is the exact same logic
built-in functions use and is just as expressive as anything Flink could
provide.

Seth

On Mon, Sep 20, 2021 at 1:26 AM Kai Fu  wrote:

> Hi Seth,
>
> This is really helpful and inspiring, thank you for the information.
>
> On Sun, Sep 19, 2021 at 11:06 PM Seth Wiesman  wrote:
>
>> Hi,
>>
>> I agree it would be great to see these functions built-in, but you do not
>> need to write a UDF for each type. You can overload a UDFs type inference
>> and have the same capabilities as built-in functions, which means
>> supporting generics.
>>
>>
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
>>
>> On Sat, Sep 18, 2021 at 7:42 AM Yuval Itzchakov 
>> wrote:
>>
>>> Hi Jing,
>>>
>>> I recall there is already an open ticket for built-in aggregate functions
>>>
>>> On Sat, Sep 18, 2021, 15:08 JING ZHANG  wrote:
>>>
>>>> Hi Yuval,
>>>> You could open a JIRA to track this if you think some functions should
>>>> be added as built-in functions in Flink.
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>> Yuval Itzchakov  于2021年9月18日周六 下午3:33写道:
>>>>
>>>>> The problem with defining a UDF is that you have to create one
>>>>> overload per key type in the MULTISET. It would be very convenient to have
>>>>> functions like Snowflakes ARRAY_AGG.
>>>>>
>>>>> On Sat, Sep 18, 2021, 05:43 JING ZHANG  wrote:
>>>>>
>>>>>> Hi Kai,
>>>>>> AFAIK, there is no built-in function to extract the keys in MULTISET
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/>
>>>>>>  to
>>>>>> be an ARRAY. Define a UTF is a good solution.
>>>>>>
>>>>>> Best,
>>>>>> JING ZHANG
>>>>>>
>>>>>> Kai Fu  于2021年9月18日周六 上午7:35写道:
>>>>>>
>>>>>>> Hi team,
>>>>>>>
>>>>>>> We want to know if there is any built-in function to extract the
>>>>>>> keys in MULTISET
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/>
>>>>>>> to be an ARRAY. There is no such function as far as we can find, except 
>>>>>>> to
>>>>>>> define a simple wrapper UDF for that, please advise.
>>>>>>>
>>>>>>> --
>>>>>>> *Best wishes,*
>>>>>>> *- Kai*
>>>>>>>
>>>>>>
>
> --
> *Best wishes,*
> *- Kai*
>


Re: Built-in functions to manipulate MULTISET type

2021-09-19 Thread Seth Wiesman
Hi,

I agree it would be great to see these functions built-in, but you do not
need to write a UDF for each type. You can overload a UDFs type inference
and have the same capabilities as built-in functions, which means
supporting generics.

https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java

On Sat, Sep 18, 2021 at 7:42 AM Yuval Itzchakov  wrote:

> Hi Jing,
>
> I recall there is already an open ticket for built-in aggregate functions
>
> On Sat, Sep 18, 2021, 15:08 JING ZHANG  wrote:
>
>> Hi Yuval,
>> You could open a JIRA to track this if you think some functions should be
>> added as built-in functions in Flink.
>>
>> Best,
>> JING ZHANG
>>
>> Yuval Itzchakov  于2021年9月18日周六 下午3:33写道:
>>
>>> The problem with defining a UDF is that you have to create one overload
>>> per key type in the MULTISET. It would be very convenient to have functions
>>> like Snowflakes ARRAY_AGG.
>>>
>>> On Sat, Sep 18, 2021, 05:43 JING ZHANG  wrote:
>>>
 Hi Kai,
 AFAIK, there is no built-in function to extract the keys in MULTISET
 
  to
 be an ARRAY. Define a UTF is a good solution.

 Best,
 JING ZHANG

 Kai Fu  于2021年9月18日周六 上午7:35写道:

> Hi team,
>
> We want to know if there is any built-in function to extract the keys
> in MULTISET
> 
> to be an ARRAY. There is no such function as far as we can find, except to
> define a simple wrapper UDF for that, please advise.
>
> --
> *Best wishes,*
> *- Kai*
>



Re: CEP library support in Python

2021-09-15 Thread Seth Wiesman
Honestly, I don't think you need CEP or MATCH_RECOGNIZE for that use case.
It can be solved with a simple process function that tracks the state for
each id. Output a 1 when a job completes and a -1 if canceled. Output the
sum. You can use a simple timer to clear the state for a job after 6 months
have passed.

Seth

On Wed, Sep 15, 2021 at 12:34 PM Pedro Silva  wrote:

> Hello,
>
> As anyone used streaming sql pattern matching as shown in this email
> thread to count certain transitions on a stream?
> Is it feasible?
>
> Thank you,
> Pedro Silva
>
> On 13 Sep 2021, at 11:16, Pedro Silva  wrote:
>
> 
> Hello Seth,
>
> Thank you very much for your reply. I've taken a look at MATCH_RECOGNIZE
> but I have the following doubt. Can I implement a state machine that detect
> patterns with multiple end states?
> To give you a concrete example:
>
> I'm trying to count the number of *Jobs* that have been *cancelled* and
> *completed*. The state machine associated with this Job concept is as
> follows:
> Started -> On-Going (Multiple Progress messages) -> Closed -> Completed
> \
>
> \--\\\-
> > Cancelled
>
> At any point the Job can be cancelled from the previous state.
> This cancel message can take anywhere from 1-2 weeks to be received.
> The duration of this state machine (Job lifecycle) is roughly 6 months.
>
> How can I keep a count of the number of Jobs that have been completed but
> not cancelled such that when a cancel appears on a previously (completed |
> closed)  I decrease my counter but not when a cancel appears after a
> started or progress state (no counter increment or decrement) ?
>
> I hope this example was clear.
>
> Thank you for your time!
> Pedro Silva
>
>
> Em sex., 10 de set. de 2021 às 20:18, Seth Wiesman 
> escreveu:
>
>> Hi Pedro,
>>
>> The DataStream CEP library is not available in Python but you can use
>> `MATCH_RECOGNIZE` in the table API which is implemented on-top of the CEP
>> library from Python.
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/match_recognize/
>>
>>
>> Seth
>>
>> On Fri, Sep 10, 2021 at 11:34 AM Pedro Silva 
>> wrote:
>>
>>> Hello,
>>>
>>> Is Flink's CEP library
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/cep/> 
>>> available
>>> in python? From the documentation I see no references so I'm guessing the
>>> answer is no but wanted some confirmation from the community or developers.
>>>
>>> Are there plans to support this library in python or alternatively,
>>> another library altogether that can be used in python?
>>>
>>> Thank you and have a nice weekend,
>>> Pedro Silva
>>>
>>


Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-15 Thread Seth Wiesman
I just want to chime in that if you really do need to drop a partition,
Flink already supports a solution.

If you manually stop the job with a savepoint and restart it with a new UID
on the source operator, along with passing the --allowNonRestoredState flag
to the client, the source will disregard existing state and start fresh.
You can then use the start position configuration to determine where to
begin when restarting on the existing partitions.


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/production_ready/#set-uuids-for-all-operators
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/cli/#starting-a-job-from-a-savepoint
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#starting-offset

On Wed, Sep 15, 2021 at 6:09 AM David Morávek  wrote:

> I'll try to be more direct with the answer as you already have the context
> on what the issue is.
>
> When this happens we basically have these options:
>
> 1) We can throw an exception (with good wording, so user knows what's
> going on) and fail the job. This forces user to take an immediate action
> and fix the issue.
> 2) We can log a warning and keeping the job running. Most users won't
> notice this unless they are using tools such as Sentry / StackDriver with
> automatic alerting. In most cases this will hide a real problem, that could
> be really hard to discover / repair in later stages.
>
> So in other wording, Flink doesn't forget this in order to "proactively
> guard user against some serious troubles".
>
> Can you please elaborate little bit more about the use case and why it
> needs to be implemented the way it is? Maybe there could be an alternative
> solution to this.
>
> Best,
> D.
>
>
> On Tue, Sep 14, 2021 at 7:25 PM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> Thanks David. What you are saying makes sense. But, I keep hearing I
>> shouldn't delete the topic externally, and I keep asking why doesn't Flink
>> forget about the topic IF it has in fact been deleted externally (for
>> whatever reason).
>>
>> I think I will drop this now.
>>
>> On Tue, Sep 14, 2021 at 5:50 PM David Morávek  wrote:
>>
>>> We are basically describing the same thing with Fabian, just a different
>>> wording.
>>>
>>> The problem is that if you delete the topic externally, you're making an
>>> assumption that downstream processor (Flink in this case) has already
>>> consumed and RELIABLY processed all of the data from that topic (which may
>>> not be true). This would effectively lead to AT_MOST_ONCE delivery
>>> guarantees (in other words, we are OK with loosing data), which is a
>>> trade-off that _in_my_opinion_ we shouldn't make here.
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <
>>> cpa...@gmail.com> wrote:
>>>
 Hi all,

 Thank you for the replies, they are much appreciated.

 I'm sure I'm missing something obvious here, so bear with me...

 Fabian, regarding:

 "Flink will try to recover from the previous checkpoint which is
 invalid by now because the partition is not available anymore."

 The above would happen because the partition is not available anymore
 in Kafka (right?), and not because Flink's partition discoverer has removed
 it from its cache (i.e. even if Flink leaves it there, the topic doesn't
 exist in Kafka anymore, so that's the source of the problem in the scenario
 you outlined). In other words, what would be the *extra* harm from Flink
 cleaning up the partition from its cache after it knows that the partition
 is gone - this is the part I still don't understand.

 David, similarly:

 "actual topic deletion would need to be performed by Flink (not by the
 3rd party system as suggested in the original question)"

 The situation is that the topic has, for better or worse, already been
 deleted. So my question is one of cleanup, i.e. how is it useful for Flink
 to continue remembering the partition of an already-deleted topic? (the
 checkpoint is invalid regardless, right?)



 On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský  wrote:

> On 9/14/21 3:57 PM, David Morávek wrote:
>
> Hi Jan,
>
> Notion of completeness is just one part of the problem. The second
> part is that once you remove the Kafka topic, you are no longer able to
> replay the data in case of failure.
>
> So you basically need a following workflow to ensure correctness:
>
> 1) Wait until there are no more elements in the topic (this can be
> done by checking watermark for that partition as you're suggesting)
> 2) Take a checkpoint N
> 3) Delete the topic (this effectively makes all the checkpoints < N
> invalid)
>
> Agree.
>
>
> If you switch order of 2) and 3) you have no way to recover from
> failure.
>
> Also 

Re: CEP library support in Python

2021-09-10 Thread Seth Wiesman
Hi Pedro,

The DataStream CEP library is not available in Python but you can use
`MATCH_RECOGNIZE` in the table API which is implemented on-top of the CEP
library from Python.

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/match_recognize/


Seth

On Fri, Sep 10, 2021 at 11:34 AM Pedro Silva  wrote:

> Hello,
>
> Is Flink's CEP library
>  
> available
> in python? From the documentation I see no references so I'm guessing the
> answer is no but wanted some confirmation from the community or developers.
>
> Are there plans to support this library in python or alternatively,
> another library altogether that can be used in python?
>
> Thank you and have a nice weekend,
> Pedro Silva
>


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Seth Wiesman
Hi David,

I was also able to reproduce the behavior, but was able to get
significant performance improvements by reducing the number of slots on
each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime
execution of DataSet over DataStream. In particular, Flink's DataStream
operators are aware of the resource requirements of the state backend and
include RocksDB in its internal memory configurations. In the state
processor api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances
within the same JVM, you are actually running a single native process with
multiple logical instances. I _think_ we are seeing contention amongst the
logical RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to
continue investigating. If my suspicion for the slowness is correct, we
will need to migrate to the new Source API and improve this as part of
DataStream integration. This migration is something we'd like to do
regardless, but I don't have a timeline to share.

*Aside: Why is writing still relatively fast? *

Even with these factors accounted for, I do still expect writing to be
faster than reading. This is due to both how RocksDB internal data
structures work, along with some peculiarities of how to state processor
API has to perform reads.

1. RocksDB internally uses a data structure called a log structured merge
tree (or LSM). This means writes are always implemented as appends, so
there is no seek required. Additionally, writes go into an in-memory data
structure called a MemTable that is flushed to disk asynchronously.
Because there may be multiple entries for a given key, RocksDB needs to
search for the most recent value and potentially read from disk. This may
be alleviated by enabling bloom filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state
(ValueState, ListState, etc) as its own column family (table). A key only
exists in a table if it has a non-null value. This means not all keys exist
in all column families for a given operator. The state-proc-api wants to
make it appear as if each operator is composed of a single table with
multiple columns. To do this, we perform a full table scan on one column
family and then do point lookups of that key on the others. However, we
still need to find the keys that may only exist in other tables. The trick
we perform is to delete keys from rocksDB after each read, so we can do
full table scans on all column families but never see any duplicates. This
means the reader is performing multiple reads and writes on every call to
`readKey` and is more expensive than it may appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski  wrote:

> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):
>
>> Hi,
>>
>> I'm investigating why a job we use to inspect a flink state is a lot
>> slower than the bootstrap job used to generate it.
>>
>> I use RocksdbDB with a simple keyed value state mapping a string key to a
>> long value. Generating the bootstrap state from a CSV file with 100M
>> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
>> allowed). But another job that does the opposite (converts this state into
>> a CSV file) takes several hours. I would have expected these two job
>> runtimes to be in the same ballpark.
>>
>> I wrote a simple test case[1] to reproduce the problem. This program has
>> 3 jobs:
>> - CreateState: generate a keyed state (string->long) using the state
>> processor api
>> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
>> - ReadState: reads all the keys using the state processor api
>>
>> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
>> StreamJob are done in less than a minute.
>> ReadState is much slower (> 30minutes) on my system. The RocksDB state
>> appears to be restored relatively quickly but after that the slots are
>> performing at very different speeds. Some slots finish quickly but some
>> others struggle to advance.
>> Looking at the thread dumps I always see threads in
>> org.rocksdb.RocksDB.get:
>>
>> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
>> (org.apache.flink.state.api.input.KeyedStateInputFormat)) 

Re: State Processor API with EmbeddedRocksDBStateBackend

2021-08-11 Thread Seth Wiesman
Hi Xianwen,

Looks like the State Processor API needs to be updated for the new state
backend factory stack. For now, just use RocksDBStateBackend and it will
work as intended.

I've opened a ticket: https://issues.apache.org/jira/browse/FLINK-23728


Seth

On Wed, Aug 11, 2021 at 2:08 AM xianwen jin  wrote:

> Hi Yun,
>
> Here's the stack trace
>
> java.util.concurrent.ExecutionException: java.io.IOException: Size of the
> state is larger than the maximum permitted memory-backed state.
> Size=100091190 , maxSize=5242880 . Consider using a different state
> backend, like the File System State backend.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> ~[?:1.8.0_292]
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.state.api.output.SnapshotUtils.snapshot(SnapshotUtils.java:67)
> ~[?:?]
> at
> org.apache.flink.state.api.output.operators.KeyedStateBootstrapOperator.endInput(KeyedStateBootstrapOperator.java:90)
> ~[?:?]
> at
> org.apache.flink.state.api.output.BoundedStreamTask.processInput(BoundedStreamTask.java:107)
> ~[?:?]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:80)
> ~[?:?]
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:113)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:519)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> Caused by: java.io.IOException: Size of the state is larger than the
> maximum permitted memory-backed state. Size=100091190 , maxSize=5242880 .
> Consider using a different state backend, like the File System State
> backend.
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:61)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:141)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:121)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:75)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.FullSnapshotAsyncWriter.get(FullSnapshotAsyncWriter.java:87)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
> ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
> at 

Re: How would Flink job react to change of partitions in Kafka topic?

2021-06-23 Thread Seth Wiesman
It will just work as long as you enable partition discovery.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#partition-discovery

On Tue, Jun 22, 2021 at 1:32 PM Thomas Wang  wrote:

> Hi,
>
> I'm wondering if anyone has changed the number of partitions of a source
> Kafka topic.
>
> Let's say I have a Flink job read from a Kafka topic which used to have 32
> partitions. If I change the number of partitions of that topic to 64, can
> the Flink job still guarantee the exactly-once semantics?
>
> Thanks.
>
> Thomas
>


Re: Reading Flink states from svaepoint uning State Processor API

2021-06-01 Thread Seth Wiesman
Hi Min,

The only requirement is that your state descriptors be configured
identically as those used in your datastream API. So if you registered
custom TypeInformation / serializer in your streaming job you will need
those here as well. I would also look at the ExecutionConfig on your
DataStream app as that can dictate how your serializers are configured.

Seth

On Tue, Jun 1, 2021 at 10:24 AM Till Rohrmann  wrote:

> Hi Min,
>
> Usually, you should be able to provide type information and thereby a
> serializer via the StateDescriptors which you create to access the state.
> If this is not working, then you need to give us a bit more context to
> understand what's not working.
>
> I am also pulling in Seth who is the original author of the state
> processor API.
>
> Cheers,
> Till
>
> On Mon, May 31, 2021 at 4:00 PM Tan, Min  wrote:
>
>> Hi,
>>
>>
>>
>> I am using Flink 1.10.1 and try to read the flink states from a savepoint
>> using Flink state processor API.
>>
>> It works well when state types are the normal Java type or Java POJOs.
>>
>>
>>
>> When Avro generated Java classes are used as the state type, it does not
>> read any states anymore.
>>
>>
>>
>> Are any additional customer serializers required in this situation?
>>
>>
>>
>> Regards,
>>
>> Min
>>
>>
>>
>>
>>
>


Re: [Statefun] Exception occurs during function chaining / Async function

2021-03-01 Thread Seth Wiesman
Hi Le,

I believe the issue is the bounded source[1]. Stateful Functions only
supports unbounded inputs.

Additionally, you can remove all the `synchronized` blocks from your code;
statefun handles all synchronization for you.

Seth

[1]
https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134#file-example-java-L249-L251

On Fri, Feb 26, 2021 at 8:28 PM Le Xu  wrote:

> Hello!
>
> I'm getting an exception running a modified version of datastream/statefun
> example. (See exception details that follow.) The example was adapted from
> the original datastream example provided in statefun repo. I was trying to
> play with the example by chaining two functions (with the 1st function
> simulating async IO function invocation). However I'm getting an exception
> saying that the mailbox is closed before the operation finishes. I'm adding
> the source code of the example here
> 
> (Please ignore the printing statement and variable usage not making any
> sense since I'm debugging). The example works without chaining the second
> function or without using AsyncIO in any of the functions. Any ideas why
> this happens? Any suggestions would be appreciated.
>
>
> Thanks!
>
> Le
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
> at
> org.apache.flink.statefun.examples.datastream.Example.main(Example.java:142)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 8 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 885ad02322104a6e36b011fadc704d2a)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> 

Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-26 Thread Seth Wiesman
Strong +1

Having two planners is confusing to users and the diverging semantics make
it difficult to provide useful learning material. It is time to rip the
bandage off.

Seth

On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:

>  change.>
>
> Hi Timo,
>
> First of all I want to thank you for introducing this planner design back
> in 1.9, this is a great work
> that allows lots of blink features to be merged to Flink in a reasonably
> short time. It greatly
> accelerates the evolution speed of Table & SQL.
>
> Everything comes with a cost, as you said, right now we are facing the
> overhead of maintaining
> two planners and it causes bugs and also increases imbalance between these
> two planners. As
> a developer and also for the good of all Table & SQL users, I also think
> it's better for us to be more
> focused on a single planner.
>
> Your proposed roadmap looks good to me, +1 from my side and thanks
> again for all your efforts!
>
> Best,
> Kurt
>
>
> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > since Flink 1.9 we have supported two SQL planners. Most of the original
> > plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> > completed and many additional features have been added exclusively to
> > the new planner. The new planner is now in a much better shape than the
> > legacy one.
> >
> > In order to avoid user confusion, reduce duplicate code, and improve
> > maintainability and testing times of the Flink project as a whole we
> > would like to propose the following steps to complete FLIP-32:
> >
> > In Flink 1.13:
> > - Deprecate the `flink-table-planner` module
> > - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> >
> > In Flink 1.14:
> > - Drop `flink-table-planner` early
> > - Drop many deprecated interfaces and API on demand
> > - Rename `flink-table-planner-blink` to `flink-table-planner`
> > - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> > - Remove references of "Blink" in the code base
> >
> > This will have an impact on users that still use DataSet API together
> > with Table API. With this change we will not support converting between
> > DataSet API and Table API anymore. We hope to compensate the missing
> > functionality in the new unified TableEnvironment and/or the batch mode
> > in DataStream API during 1.14 and 1.15. For this, we are looking for
> > further feedback which features are required in Table API/DataStream API
> > to have a smooth migration path.
> >
> > Looking forward to your feedback.
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> >
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-26 Thread Seth Wiesman
Strong +1

Having two planners is confusing to users and the diverging semantics make
it difficult to provide useful learning material. It is time to rip the
bandage off.

Seth

On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:

>  change.>
>
> Hi Timo,
>
> First of all I want to thank you for introducing this planner design back
> in 1.9, this is a great work
> that allows lots of blink features to be merged to Flink in a reasonably
> short time. It greatly
> accelerates the evolution speed of Table & SQL.
>
> Everything comes with a cost, as you said, right now we are facing the
> overhead of maintaining
> two planners and it causes bugs and also increases imbalance between these
> two planners. As
> a developer and also for the good of all Table & SQL users, I also think
> it's better for us to be more
> focused on a single planner.
>
> Your proposed roadmap looks good to me, +1 from my side and thanks
> again for all your efforts!
>
> Best,
> Kurt
>
>
> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > since Flink 1.9 we have supported two SQL planners. Most of the original
> > plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> > completed and many additional features have been added exclusively to
> > the new planner. The new planner is now in a much better shape than the
> > legacy one.
> >
> > In order to avoid user confusion, reduce duplicate code, and improve
> > maintainability and testing times of the Flink project as a whole we
> > would like to propose the following steps to complete FLIP-32:
> >
> > In Flink 1.13:
> > - Deprecate the `flink-table-planner` module
> > - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> >
> > In Flink 1.14:
> > - Drop `flink-table-planner` early
> > - Drop many deprecated interfaces and API on demand
> > - Rename `flink-table-planner-blink` to `flink-table-planner`
> > - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> > - Remove references of "Blink" in the code base
> >
> > This will have an impact on users that still use DataSet API together
> > with Table API. With this change we will not support converting between
> > DataSet API and Table API anymore. We hope to compensate the missing
> > functionality in the new unified TableEnvironment and/or the batch mode
> > in DataStream API during 1.14 and 1.15. For this, we are looking for
> > further feedback which features are required in Table API/DataStream API
> > to have a smooth migration path.
> >
> > Looking forward to your feedback.
> >
> > Regards,
> > Timo
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> >
>


Re: [Statefun] Dynamic behavior

2021-02-23 Thread Seth Wiesman
I don't think there is anything statefun specific here and I would follow
Igals advice.

Let's say you have a state value called `Behavior` that describes the
behavior of an instance. There is a default behavior but any given instance
may have a customized behavior. What I would do is the following.

Create a state in the TransactionManager called `behavior` that stores the
instance's customized behavior if it exists. When a transaction comes in,
read the behavior state. If it exists (is not None in the case of Python)
then use that. If not, then fall back to the default instance.

The default instance can be provided one of several ways depending on the
specifics of your use case:

1) hard-coded in the function.
2) dynamically loaded via a background thread as a global. so long as that
default is immutable this is safe
3) dynamically loaded via the function instance on first use. stateful
functions have strong support for making async requests so you could simply
query the behavior for that instance on first use from a 3rd party service.

Seth


On Tue, Feb 23, 2021 at 10:55 AM Miguel Araújo  wrote:

> Hi Seth,
>
> Thanks for your comment. I've seen that repository in the past and it was
> really helpful to "validate" that this was the way to go.
> I think my question is not being addressed there though: how could one add
> dynamic behavior to your TransactionManager? In this case, state that is
> available to all TransactionManager instances when they receive a message
> of type Transaction for the first time.
>
> Seth Wiesman  escreveu no dia terça, 23/02/2021 à(s)
> 16:02:
>
>> Hey Miguel,
>>
>> What you are describing is exactly what is implemented in this repo. The
>> TransactionManager function acts as an orchestrator to work with the other
>> functions. The repo is structured as an exercise but the full solution
>> exists on the branch `advanced-solution`.
>>
>> https://github.com/ververica/flink-statefun-workshop
>>
>> On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo 
>> wrote:
>>
>>> Another possibility I am considering is handling this in Flink using a
>>> broadcast and adding all the information needed to the event itself. I'm a
>>> little concerned about the amount of data that will be serialized and sent
>>> on every request though, as I'll need to include information about all
>>> available remote functions, for instance.
>>>
>>> Miguel Araújo  escreveu no dia terça, 23/02/2021
>>> à(s) 09:14:
>>>
>>>> Hi Gordon, Igal,
>>>>
>>>> Thanks for your replies.
>>>> PubSub would be a good addition, I have a few scenarios where that
>>>> would be useful.
>>>>
>>>> However, after reading your answers I realized that your proposed
>>>> solutions (which address the most obvious interpretation of my question) do
>>>> not necessarily solve my problem. I should have just stated what it was,
>>>> instead of trying to propose a solution by discussing broadcast...
>>>>
>>>> I'm trying to implement an "orchestrator" function which, given an
>>>> event, will trigger multiple remote function calls, aggregate their results
>>>> and eventually call yet more functions (based on a provided dependency
>>>> graph). Hence, this orchestrator function has state per event_id and each
>>>> function instance is short-lived (a couple seconds at most, ideally
>>>> sub-second). The question then is not about how to modify a long-running
>>>> function instance (which PubSub would enable), but rather how to have the
>>>> dependency graph available to new functions.
>>>>
>>>> Given this, Igal's answer seems promising because we have the
>>>> FunctionProvider instantiating a local variable and passing it down on
>>>> every instantiation. I'm assuming there is one FunctionProvider per
>>>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>>>> data coming from a Flink DataStream, or receiving StateFun messages?
>>>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>>>
>>>> I really appreciate your help.
>>>>
>>>> Miguel
>>>>
>>>> Igal Shilman  escreveu no dia segunda, 22/02/2021
>>>> à(s) 12:09:
>>>>
>>>>> Hi Miguel,
>>>>>
>>>>> I think that there are a couple of ways to achieve this, and it really
>>>>> depends on your specific use case, and the trade-offs
>>>>> that you are willing to accept.
>>>>&g

Re: [Statefun] Dynamic behavior

2021-02-23 Thread Seth Wiesman
Hey Miguel,

What you are describing is exactly what is implemented in this repo. The
TransactionManager function acts as an orchestrator to work with the other
functions. The repo is structured as an exercise but the full solution
exists on the branch `advanced-solution`.

https://github.com/ververica/flink-statefun-workshop

On Tue, Feb 23, 2021 at 8:34 AM Miguel Araújo  wrote:

> Another possibility I am considering is handling this in Flink using a
> broadcast and adding all the information needed to the event itself. I'm a
> little concerned about the amount of data that will be serialized and sent
> on every request though, as I'll need to include information about all
> available remote functions, for instance.
>
> Miguel Araújo  escreveu no dia terça, 23/02/2021
> à(s) 09:14:
>
>> Hi Gordon, Igal,
>>
>> Thanks for your replies.
>> PubSub would be a good addition, I have a few scenarios where that would
>> be useful.
>>
>> However, after reading your answers I realized that your proposed
>> solutions (which address the most obvious interpretation of my question) do
>> not necessarily solve my problem. I should have just stated what it was,
>> instead of trying to propose a solution by discussing broadcast...
>>
>> I'm trying to implement an "orchestrator" function which, given an event,
>> will trigger multiple remote function calls, aggregate their results and
>> eventually call yet more functions (based on a provided dependency graph).
>> Hence, this orchestrator function has state per event_id and each function
>> instance is short-lived (a couple seconds at most, ideally sub-second). The
>> question then is not about how to modify a long-running function instance
>> (which PubSub would enable), but rather how to have the dependency graph
>> available to new functions.
>>
>> Given this, Igal's answer seems promising because we have the
>> FunctionProvider instantiating a local variable and passing it down on
>> every instantiation. I'm assuming there is one FunctionProvider per
>> TaskManager. Is there an easy way to have the FunctionProvider receiving
>> data coming from a Flink DataStream, or receiving StateFun messages?
>> Otherwise, I could have it subscribe to a Kafka topic directly.
>>
>> I really appreciate your help.
>>
>> Miguel
>>
>> Igal Shilman  escreveu no dia segunda, 22/02/2021
>> à(s) 12:09:
>>
>>> Hi Miguel,
>>>
>>> I think that there are a couple of ways to achieve this, and it really
>>> depends on your specific use case, and the trade-offs
>>> that you are willing to accept.
>>>
>>> For example, one way to approach this:
>>> - Suppose you have an external service somewhere that returns a
>>> representation of the logic to be interpreted by
>>> your function at runtime (I think that is the scenario you are
>>> describing)
>>> - Then, you can write a background task (a thread) that periodically
>>> queries that service, and keeps in memory the latest version.
>>> - You can initialize this background task in your FunctionProvider
>>> implementation, or even in your StatefulModule if you wish.
>>> - Then, make sure that your dynamic stateful function has an access to
>>> the latest value fetched by your client (for example via a shared reference
>>> like a j.u.c.AtomicReference)
>>> - Then on receive, you can simply get that reference and re-apply your
>>> rules.
>>>
>>> Take a look at [1] for example (it is not exactly the same, but I
>>> believe that it is close enough)
>>>
>>> [1]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java
>>>
>>> Good luck,
>>> Igal.
>>>
>>>
>>> On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi,

 FWIW, there is this JIRA that is tracking a pubsub / broadcast
 messaging primitive in StateFun:
 https://issues.apache.org/jira/browse/FLINK-16319

 This is probably what you are looking for. And I do agree, in the case
 that the control stream (which updates the application logic) is high
 volume, redeploying functions may not work well.

 I don't think there really is a "recommended" way of doing the
 "broadcast control stream, join with main stream" pattern with StateFun at
 the moment, at least without FLINK-16319.
 On the other hand, it could be possible to use stateful functions to
 implement a pub-sub model in user space for the time being. I've actually
 left some ideas for implementing that in the comments of FLINK-16319.

 Cheers,
 Gordon


 On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo 
 wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast
> in Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
> 

Re: Flink SQL OVER window

2021-01-29 Thread Seth Wiesman
You need to use TUMBLE_ROWTIME to extract a time attribute from a window,
TUMBLE_END is just a timestamp.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#selecting-group-window-start-and-end-timestamps

Seth

On Fri, Jan 29, 2021 at 9:14 AM Patrick Angeles 
wrote:

> Forgot to mention, I am using Flink 1.12.
>
> On Fri, Jan 29, 2021 at 10:11 AM Patrick Angeles 
> wrote:
>
>> Fairly new to Flink here so this might be a newbie mistake, but here's
>> the problem. I created the following table and view:
>>
>>
>>> CREATE TABLE test (
>>>
>>> event_time TIMESTAMP(3),
>>>
>>> symbol STRING,
>>>
>>> price  DOUBLE,
>>>
>>> WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE
>>>
>>> ) WITH (
>>>
>>> ...
>>>
>>> ) ;
>>>
>>>
>>>
>>> CREATE VIEW test_view AS (
>>>
>>> SELECT
>>>
>>> symbol,
>>>
>>> TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start,
>>>
>>> AVG (price) AS avg_price,
>>>
>>> MIN (price) AS min_price,
>>>
>>> MAX (price) AS max_price
>>>
>>> FROM
>>>
>>> test
>>>
>>> WHERE event_time IS NOT NULL
>>>
>>> GROUP BY
>>>
>>> TUMBLE(event_time, INTERVAL '1' MINUTE), symbol
>>>
>>> ) ;
>>>
>>
>> Describe shows:
>>
>> Flink SQL> describe test ;
>>>
>>>
>>> +++--+-+++
>>>
>>> |   name |   type | null | key | extras |
>>> watermark |
>>>
>>>
>>> +++--+-+++
>>>
>>> | event_time | TIMESTAMP(3) *ROWTIME* | true | ||
>>> `event_time` - INTERVAL '1' MINUTE |
>>>
>>> | symbol | STRING | true | ||
>>>   |
>>>
>>> |  price | DOUBLE | true | ||
>>>   |
>>>
>>>
>>> +++--+-+++
>>>
>>> 3 rows in set
>>>
>>>
>>> Flink SQL> describe test_view ;
>>>
>>> +---++--+-++---+
>>>
>>> |  name |   type | null | key | extras | watermark |
>>>
>>> +---++--+-++---+
>>>
>>> |symbol | STRING | true | ||   |
>>>
>>> |   t_start | TIMESTAMP(3) *ROWTIME* | true | ||   |
>>>
>>> | avg_price | DOUBLE | true | ||   |
>>>
>>> | min_price | DOUBLE | true | ||   |
>>>
>>> | max_price | DOUBLE | true | ||   |
>>>
>>> +---++--+-++---+
>>>
>>> 5 rows in set
>>>
>>
>> When I run a query over the view, I get the following error:
>>
>> Flink SQL> SELECT
>>>
>>> > symbol,
>>>
>>> > t_start,
>>>
>>> > avg_price,
>>>
>>> > min_price,
>>>
>>> > max_price,
>>>
>>> > FIRST_VALUE (avg_price) OVER x AS prev_avg_price
>>>
>>> > FROM test_view
>>>
>>> > WINDOW x AS (
>>>
>>> > PARTITION BY symbol
>>>
>>> > ORDER BY t_start
>>>
>>> > ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
>>>
>>> > ) ;
>>>
>>> >
>>>
>>> *[ERROR] Could not execute SQL statement. Reason:*
>>>
>>> *org.apache.flink.table.api.TableException: OVER windows' ordering in
>>> stream mode must be defined on a time attribute.*
>>>
>>
>> Right now, to get around this, I need to materialize the results off test
>> table into a new table that matches the view. Seems that this ought to be
>> doable doing everything in one job instead of the intermediate
>> materialization step. Am I missing something?
>>
>> Thanks in advance.
>>
>> Patrick
>>
>>
>>
>>
>


Re: Annotating AggregateFunction accumulator type with @DataTypeHint

2021-01-26 Thread Seth Wiesman
Yes, the FunctionHint annotation has an accumulator field. There is an
example in its JavaDoc.

Seth

On Tue, Jan 26, 2021 at 6:39 AM Yuval Itzchakov  wrote:

> Hi, I have an aggregate function of the form:
>
> class Foo extends AggregateFunction[Array[Json], util.List[Json]]
>
> I want to treat the accumulator as a "RAW" type, since Json is an abstract
> class and this fails at runtime.
>
> Is there any way to annotate the AggregateFunction accumulator type? All
> the examples in the docs refer to Scalar functions.
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: question about timers

2021-01-20 Thread Seth Wiesman
Yes,

Processing time timers that should have fired will fire immediately in
order.

Event time timers are never *late*, they will just fire when the watermark
advances.

Seth

On Tue, Jan 19, 2021 at 3:58 PM Marco Villalobos 
wrote:

> If there are timers that have been checkpointed (we use rocksdb), and the
> system goes down, and then the system goes back up after the timers should
> have fired, do those timers that were skipped still fire, even though we
> are past that time?
>
> example:
>
> for example, if the current time is 1:00 p.m.  And the timer is supposed
> to fire at 1:15 p.m.
> and the system crashes at 1:00 p.m., but is brought back up at 1:20 p.m.
>
> Does the timer still fire?
>


Re: Remote Stateful Function Scalability

2020-10-20 Thread Seth Wiesman
As a note, I wrote that concepts section before remote functions were
implemented. I've made a note to myself to go through and update it.

Seth

On Sat, Oct 17, 2020 at 9:29 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Elias,
>
> On Sun, Oct 18, 2020 at 6:16 AM Elias Levy 
> wrote:
>
>> After reading the Stateful Functions documentation, I am left wondering
>> how remote stateful functions scale.
>>
>> The documentation mentions that the use of remote functions allows the
>> state and compute tiers to scale independently. But the documentation seems
>> to imply that only a single instance of a function type can execute at a
>> time per worker ("*When an application starts, each parallel worker of
>> the framework will create one physical object per function type. This
>> object will be used to execute all logical instances of that type that are
>> run by that particular worker.*") That would seem to tie and limit the
>> parallelism of the compute layer to that of the storage layer even when
>> using remote functions.
>>
>
> Your observation is correct only for embedded functions, not for remote
> functions.
> For remote functions, in the StateFun workers each physical object per
> function type acts as an asynchronous invocation dispatcher to the type's
> remote function service.
>
> Just to quickly brief what the dispatcher does:
> The dispatcher *only ensures sequential invocation per logical address*
> (function type + logical instance ID / key).
> Invocations for different logical addresses (different types / different
> keys) can happen concurrently.
>
> If an invocation request for a logical address is in-flight, and other
> messages targeted for that address arrive, they are buffered in a backlog
> (state) until the pending request completes.
> Upon completion, the backlog is flushed and all buffered messages are sent
> to the remote function as a single batch invocation request.
> Backpressure is applied once the backlog size reaches a threshold.
>
> All in all, in vanilla Flink-land terms, this works similarly to Flink's
> AsyncIO without the stream order preserved.
>
> So, to conclude by answering your specific questions:
>
>
>>
>> Can a worker execute multiple concurrent remote stateful functions of
>> different types?
>>
>
> Yes.
>
>
>>
>> Can a worker execute multiple concurrent remote stateful functions of the
>> same type with different keys?
>>
>
> Yes.
>
>
>>
>> If a worker can execute multiple concurrent remote stateful functions of
>> the same type with different keys, does it ensure their output is ordered
>> like its inputs?
>>
>
> No, currently StateFun handles outgoing messages (i.e. messages going to
> other functions / egresses) only based on the order that the concurrent
> invocation requests complete.
> However, I believe that it should be possible to support an ordered mode
> here at the cost of extra latency (early completes need to be buffered,
> checkpoint overhead etc.).
>
> Hope this helps clarify some things!
>
> Cheers,
> Gordon
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-16 Thread Seth Wiesman
+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:

> @Arvid Heise I also do not remember exactly what were all the
> problems. The fact that we added some more bulk formats to the
> streaming file sink definitely reduced the non-supported features. In
> addition, the latest discussion I found on the topic was [1] and the
> conclusion of that discussion seems to be to remove it.
>
> Currently, I cannot find any obvious reason why keeping the
> BucketingSink, apart from the fact that we do not have a migration
> plan unfortunately. This is why I posted this to dev@ and user@.
>
> Cheers,
> Kostas
>
> [1]
> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
>
> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >
> > I remember this conversation popping up a few times already and I'm in
> > general a big fan of removing BucketingSink.
> >
> > However, until now there were a few features lacking in StreamingFileSink
> > that are present in BucketingSink and that are being actively used (I
> can't
> > exactly remember them now, but I can look it up if everyone else is also
> > suffering from bad memory). Did we manage to add them in the meantime? If
> > not, then it feels rushed to remove it at this point.
> >
> > On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> wrote:
> >
> > > @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> > > to migrate from the BucketingSink to the StreamingFileSink. It may be
> > > possible but it will require some effort because the logic would be
> > > "read the old state, commit it, and start fresh with the
> > > StreamingFileSink."
> > >
> > > On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> > > wrote:
> > > >
> > > > On 13.10.20 14:01, David Anderson wrote:
> > > > > I thought this was waiting on FLIP-46 -- Graceful Shutdown
> Handling --
> > > and
> > > > > in fact, the StreamingFileSink is mentioned in that FLIP as a
> > > motivating
> > > > > use case.
> > > >
> > > > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.
> Thanks
> > > > for the reminder, we should close FLIP-46 now with an explanatory
> > > > message to avoid confusion.
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > 
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
>


[Announce] Flink Forward Global Is Just Around the Corner

2020-10-09 Thread Seth Wiesman
Hi Everyone,

Flink Forward Global is just around the corner - October 19th to the 22nd
-  and our amazing line up of keynotes has been announced.


   - Peter Chalif from Citi will explore how Apache Flink has been
   fundamentally changing Big Data in the financial services industry
   - Ricky Saltzer from Epic Games will share exactly how Flink helps his
   team solve one the data engineering problems an online video game faces at
   Fornite scale
   - Renu Jiang from LinkedIn will explain how their team is handling
   extremely large stateful jobs, simplifying the APIs with SQL, and exploring
   new directions of unifying batch and stream processing
   - Xiaowei Jiang from Alibaba will explore how the combination of Hybrid
   Serving / Analytical Processing and Flink can support various use cases
   such as real-time data warehouse, real-time machine learning and more
   - One final Keynote for Ververica will be announced next week.

As a reminder, the event is virtual and free to attend[1]. There are also a
limited number of paid training slots available. Looking forward to seeing
everyone virtually soon!

https://www.flink-forward.org/

Seth Wiesman
- Committer Apache Flink
- Program Chair Flink Forward Virtual 2020


Re: [DISCUSS] Drop Scala 2.11

2020-09-10 Thread Seth Wiesman
@glen

Yes, we would absolutely migrate statefun. StateFun can be compiled with
Scala 2.12 today, I'm not sure why it's not cross released.

@aljoscha :)

@mathieu Its on the roadmap but it's non-trivial and I'm not aware of
anyone actively working on it.

On Thu, Sep 10, 2020 at 10:09 AM Matthieu Bonneviot
 wrote:

> That makes sense.
> We are using 2.12 for our production
> Also, for flink scala 2.12 support, it is in fact limited to scala 2.12.7.
> It is binary incompatible with version 2.12 above (
> https://issues.apache.org/jira/browse/FLINK-12461 )
> That would be great to at least move to a more recent 2.12 version, and
> ideally to 2.13.
>
> Is there any scala support plan available?
>
> Matthieu
>
>
> On Thu, Sep 10, 2020 at 5:00 PM Aljoscha Krettek 
> wrote:
>
> > Yes! I would be in favour of this since it's blocking us from upgrading
> > certain dependencies.
> >
> > I would also be in favour of dropping Scala completely but that's a
> > different story.
> >
> > Aljoscha
> >
> > On 10.09.20 16:51, Seth Wiesman wrote:
> > > Hi Everyone,
> > >
> > > Think of this as a pre-flip, but what does everyone think about
> dropping
> > > Scala 2.11 support from Flink.
> > >
> > > The last patch release was in 2017 and in that time the scala community
> > has
> > > released 2.13 and is working towards a 3.0 release. Apache Kafka and
> > Spark
> > > have both dropped 2.11 support in recent versions. In fact, Flink's
> > > universal Kafka connector is stuck on 2.4 because that is the last
> > version
> > > with scala 2.11 support.
> > >
> > > What are people's thoughts on dropping Scala 2.11? How many are still
> > using
> > > it in production?
> > >
> > > Seth
> > >
> >
> >
>
> --
> Matthieu Bonneviot
> Senior RD Engineer, DataDome
> M +33 7 68 29 79 34  <+33+7+68+29+79+34>
> E matthieu.bonnev...@datadome.co  
> W www.datadome.co
> <
> http://www.datadome.co?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
> >
>
> [image: facebook]
> <
> https://www.facebook.com/datadome/?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
> >
> [image:
> linkedin]
> <
> https://fr.linkedin.com/company/datadome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
> >
> [image:
> twitter]
> <
> https://twitter.com/data_dome?utm_source=WiseStamp_medium=email_term=_content=_campaign=signature
> >
>


[DISCUSS] Drop Scala 2.11

2020-09-10 Thread Seth Wiesman
Hi Everyone,

Think of this as a pre-flip, but what does everyone think about dropping
Scala 2.11 support from Flink.

The last patch release was in 2017 and in that time the scala community has
released 2.13 and is working towards a 3.0 release. Apache Kafka and Spark
have both dropped 2.11 support in recent versions. In fact, Flink's
universal Kafka connector is stuck on 2.4 because that is the last version
with scala 2.11 support.

What are people's thoughts on dropping Scala 2.11? How many are still using
it in production?

Seth


Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Seth Wiesman
Generally +1

The one use case I've seen of union state I've seen in production (outside
of sources and sinks) is as a "poor mans" broadcast state. This was
obviously before that feature was added which is now a few years ago so I
don't know if those pipelines still exist. FWIW, if they do the state
processor api can provide a migration path as it supports rewriting union
state as broadcast state.

Seth

On Wed, Sep 9, 2020 at 10:21 AM Arvid Heise  wrote:

> +1 to getting rid of non-keyed state as is in general and for union state
> in particular. I had a hard time to wrap my head around the semantics of
> non-keyed state when designing the rescale of unaligned checkpoint.
>
> The only plausible use cases are legacy source and sinks. Both should also
> be reworked in deprecated.
>
> My main question is how to represent state in these two cases. For sources,
> state should probably be bound to splits. In that regard, split (id) may
> act as a key. More generally, there should be probably a concept that
> supersedes keys and includes splits.
>
> For sinks, I can see two cases:
> - Either we are in a keyed context, then state should be bound to the key.
> - Or we are in a non-keyed context, then state might be bound to the split
> (?) in case of a source->sink chaining.
> - Maybe it should also be a new(?) concept like output partition.
>
> It's not clear to me if there are more cases and if we can always find a
> good way to bind state to some sort of key, especially for arbitrary
> communication patterns (which we may need to replace as well potentially).
>
> On Wed, Sep 9, 2020 at 4:09 PM Aljoscha Krettek 
> wrote:
>
> > Hi Devs,
> >
> > @Users: I'm cc'ing the user ML to see if there are any users that are
> > relying on this feature. Please comment here if that is the case.
> >
> > I'd like to discuss the deprecation and eventual removal of UnionList
> > Operator State, aka Operator State with Union Redistribution. If you
> > don't know what I'm talking about you can take a look in the
> > documentation: [1]. It's not documented thoroughly because it started
> > out as mostly an internal feature.
> >
> > The immediate main reason for removing this is also mentioned in the
> > documentation: "Do not use this feature if your list may have high
> > cardinality. Checkpoint metadata will store an offset to each list
> > entry, which could lead to RPC framesize or out-of-memory errors." The
> > insidious part of this limitation is that you will only notice that
> > there is a problem when it is too late. Checkpointing will still work
> > and a program can continue when the state size is too big. The system
> > will only fail when trying to restore from a snapshot that has union
> > state that is too big. This could be fixed by working around that issue
> > but I think there are more long-term issues with this type of state.
> >
> > I think we need to deprecate and remove API for state that is not tied
> > to a key. Keyed state is easy to reason about, the system can
> > re-partition state and also re-partition records and therefore scale the
> > system in and out. Operator state, on the other hand is not tied to a
> > key but an operator. This is a more "physical" concept, if you will,
> > that potentially ties business logic closer to the underlying runtime
> > execution model, which in turns means less degrees of freedom for the
> > framework, that is Flink. This is future work, though, but we should
> > start with deprecating union list state because it is the potentially
> > most dangerous type of state.
> >
> > We currently use this state type internally in at least the
> > StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However,
> > we're in the process of hopefully getting rid of it there with our work
> > on sources and sinks. Before we fully remove it, we should of course
> > signal this to users by deprecating it.
> >
> > What do you think?
> >
> > Best,
> > Aljoscha
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


[Announce] Flink Forward Global Program is now Live

2020-08-13 Thread Seth Wiesman
Hi Everyone



*The Flink Forward Global 2020 program is now online* and with 2 full days
of exciting Apache Flink content, curated by our program committee[1]! Join
us on October 21-22 to learn more about the newest technology updates, and
hear use cases from Intel, Razorpay, Workday, Microsoft, and other
industry-leading companies.



Here are some program highlights:

   - Fault Tolerance 2.0 for Apache Flink
   - Building an End-to-End Analytics Pipeline with PyFlink
   - Building Unified Streaming Platform at Uber
   - Flink powers Houzz Realtime Pipeline and Applications
   - Tale of Stateful Stream to Stream Processing


The conference is free to attend, and tickets are online[2].




We are also offering two days of hands-on training sessions on October
19-20[3]:

   - 2-part Apache Flink Developer Training
   - 2-part SQL Developer Training
   - Stateful Functions Training
   - Runtime & Operations Training
   - Introduction to Tuning & Troubleshooting
   - Advanced Tuning & Troubleshooting


We look forward to connecting with the global Flink community in just 2
months!


[1] https://www.flink-forward.org/global-2020/conference-program

[2]
https://www.eventbrite.com/e/flink-forward-global-virtual-2020-tickets-113775477516

[3] https://www.flink-forward.org/global-2020/training-program


Seth Wiesman

Flink Forward Global Program Committee Chair

Committer Apache Flink


Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Seth Wiesman
Just to summarize the conversation so far:

The state processor api reads data from a 3rd party system - such as JDBC
in this example - and generates a savepoint file that is written out to
some DFS.  This savepoint can then be used to when starting a flink
streaming application. It is a two-step process, creating the savepoint in
one job and then starting a streaming application from that savepoint in
another.

These jobs do not have to be a single application, and in general, I
recommend they be developed as two separate jobs. The reason being,
bootstrapping state is a one-time process while your streaming application
runs forever. It will simplify your development and operations in the long
term if you do not mix concerns.

Concerning the NullPointerException:

The max parallelism must be at least 128. I've opened a ticket to track and
resolve this issue.

Seth

On Mon, Aug 10, 2020 at 6:38 PM Marco Villalobos 
wrote:

> I think there is a bug in Flink when running locally without a cluster.
>
> My code worked in a cluster, but failed when run locally.
>
> My code does not save null values in Map State.
>
> > On Aug 9, 2020, at 11:27 PM, Tzu-Li Tai  wrote:
> >
> > Hi,
> >
> > For the NullPointerException, what seems to be happening is that you are
> > setting NULL values in your MapState, that is not allowed by the API.
> >
> > Otherwise, the code that you showed for bootstrapping state seems to be
> > fine.
> >
> >> I have yet to find a working example that shows how to do both
> >> (bootstrapping state and start a streaming application with that state)
> >
> > Not entirely sure what you mean here by "doing both".
> > The savepoint written using the State Processor API (what you are doing
> in
> > the bootstrap() method) is a savepoint that may be restored from as you
> > would with a typical Flink streaming job restore.
> > So, usually the bootstrapping part happens as a batch "offline" job,
> while
> > you keep your streaming job as a separate job. What are you trying to
> > achieve with having both written within the same job?
> >
> > Cheers,
> > Gordon
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-10 Thread Seth Wiesman
I think this sounds good. +1

On Wed, Aug 5, 2020 at 8:37 PM jincheng sun 
wrote:

> Hi David, Thank you for sharing the problems with the current document,
> and I agree with you as I also got the same feedback from Chinese users. I
> am often contacted by users to ask questions such as whether PyFlink
> supports "Java UDF" and whether PyFlink supports "xxxConnector". The root
> cause of these problems is that our existing documents are based on Java
> users (text and API mixed part). Since Python is newly added from 1.9, many
> document information is not friendly to Python users. They don't want to
> look for Python content in unfamiliar Java documents. Just yesterday, there
> were complaints from Chinese users about where is all the document entries
> of  Python API. So, have a centralized entry and clear document structure,
> which is the urgent demand of Python users. The original intention of FLIP
> is do our best to solve these user pain points.
>
> Hi Xingbo and Wei Thank you for sharing PySpark's status on document
> optimization. You're right. PySpark already has a lot of Python user
> groups. They also find that Python user community is an important position
> for multilingual support. The centralization and unification of Python
> document content will reduce the learning cost of Python users, and good
> document structure and content will also reduce the Q & A burden of the
> community, It's a once and for all job.
>
> Hi Seth, I wonder if your concerns have been resolved through the previous
> discussion?
>
> Anyway, the principle of FLIP is that in python document should only
> include Python specific content, instead of making a copy of the Java
> content. And would be great to have you to join in the improvement for
> PyFlink (Both PRs and Review PRs).
>
> Best,
> Jincheng
>
>
> Wei Zhong  于2020年8月5日周三 下午5:46写道:
>
>> Hi Xingbo,
>>
>> Thanks for your information.
>>
>> I think the PySpark's documentation redesigning deserves our attention.
>> It seems that the Spark community has also begun to treat the user
>> experience of Python documentation more seriously. We can continue to pay
>> attention to the discussion and progress of the redesigning in the Spark
>> community. It is so similar to our working that there should be some ideas
>> worthy for us.
>>
>> Best,
>> Wei
>>
>>
>> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>>
>> Hi,
>>
>> I found that the spark community is also working on redesigning pyspark
>> documentation[1] recently. Maybe we can compare the difference between our
>> document structure and its document structure.
>>
>> [1] https://issues.apache.org/jira/browse/SPARK-31851
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>>
>> Best,
>> Xingbo
>>
>> David Anderson  于2020年8月5日周三 上午3:17写道:
>>
>>> I'm delighted to see energy going into improving the documentation.
>>>
>>> With the current documentation, I get a lot of questions that I believe
>>> reflect two fundamental problems with what we currently provide:
>>>
>>> (1) We have a lot of contextual information in our heads about how Flink
>>> works, and we are able to use that knowledge to make reasonable inferences
>>> about how things (probably) work in cases we aren't so familiar with. For
>>> example, I get a lot of questions of the form "If I use  will
>>> I still have exactly once guarantees?" The answer is always yes, but they
>>> continue to have doubts because we have failed to clearly communicate this
>>> fundamental, underlying principle.
>>>
>>> This specific example about fault tolerance applies across all of the
>>> Flink docs, but the general idea can also be applied to the Table/SQL and
>>> PyFlink docs. The guiding principles underlying these APIs should be
>>> written down in one easy-to-find place.
>>>
>>> (2) The other kind of question I get a lot is "Can I do  with ?"
>>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>>> very difficult to answer because it is frequently the case that one has to
>>> reason about why a given feature doesn't seem to appear in the
>>> documentation. It could be that I'm looking in the wrong place, or it could
>>> be that someone forgot to document something, or it could be that it can in
>>> fact be done by applying a general mechanism in a specific way that I
>>> haven't thought of -- as in this case, where one can use a JDBC sink from
>>> Python if one thinks to use DDL.
>>>
>>> So I think it would be helpful to be explicit about both what is, and
>>> what is not, supported in PyFlink. And to have some very clear organizing
>>> principles in the documentation so that users can quickly learn where to
>>> look for specific facts.
>>>
>>> Regards,
>>> David
>>>
>>>
>>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>>> wrote:
>>>
 Hi Seth and David,

 I'm very happy to have your reply and suggestions. I would like to
 share my thoughts here:


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-10 Thread Seth Wiesman
I think this sounds good. +1

On Wed, Aug 5, 2020 at 8:37 PM jincheng sun 
wrote:

> Hi David, Thank you for sharing the problems with the current document,
> and I agree with you as I also got the same feedback from Chinese users. I
> am often contacted by users to ask questions such as whether PyFlink
> supports "Java UDF" and whether PyFlink supports "xxxConnector". The root
> cause of these problems is that our existing documents are based on Java
> users (text and API mixed part). Since Python is newly added from 1.9, many
> document information is not friendly to Python users. They don't want to
> look for Python content in unfamiliar Java documents. Just yesterday, there
> were complaints from Chinese users about where is all the document entries
> of  Python API. So, have a centralized entry and clear document structure,
> which is the urgent demand of Python users. The original intention of FLIP
> is do our best to solve these user pain points.
>
> Hi Xingbo and Wei Thank you for sharing PySpark's status on document
> optimization. You're right. PySpark already has a lot of Python user
> groups. They also find that Python user community is an important position
> for multilingual support. The centralization and unification of Python
> document content will reduce the learning cost of Python users, and good
> document structure and content will also reduce the Q & A burden of the
> community, It's a once and for all job.
>
> Hi Seth, I wonder if your concerns have been resolved through the previous
> discussion?
>
> Anyway, the principle of FLIP is that in python document should only
> include Python specific content, instead of making a copy of the Java
> content. And would be great to have you to join in the improvement for
> PyFlink (Both PRs and Review PRs).
>
> Best,
> Jincheng
>
>
> Wei Zhong  于2020年8月5日周三 下午5:46写道:
>
>> Hi Xingbo,
>>
>> Thanks for your information.
>>
>> I think the PySpark's documentation redesigning deserves our attention.
>> It seems that the Spark community has also begun to treat the user
>> experience of Python documentation more seriously. We can continue to pay
>> attention to the discussion and progress of the redesigning in the Spark
>> community. It is so similar to our working that there should be some ideas
>> worthy for us.
>>
>> Best,
>> Wei
>>
>>
>> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>>
>> Hi,
>>
>> I found that the spark community is also working on redesigning pyspark
>> documentation[1] recently. Maybe we can compare the difference between our
>> document structure and its document structure.
>>
>> [1] https://issues.apache.org/jira/browse/SPARK-31851
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>>
>> Best,
>> Xingbo
>>
>> David Anderson  于2020年8月5日周三 上午3:17写道:
>>
>>> I'm delighted to see energy going into improving the documentation.
>>>
>>> With the current documentation, I get a lot of questions that I believe
>>> reflect two fundamental problems with what we currently provide:
>>>
>>> (1) We have a lot of contextual information in our heads about how Flink
>>> works, and we are able to use that knowledge to make reasonable inferences
>>> about how things (probably) work in cases we aren't so familiar with. For
>>> example, I get a lot of questions of the form "If I use  will
>>> I still have exactly once guarantees?" The answer is always yes, but they
>>> continue to have doubts because we have failed to clearly communicate this
>>> fundamental, underlying principle.
>>>
>>> This specific example about fault tolerance applies across all of the
>>> Flink docs, but the general idea can also be applied to the Table/SQL and
>>> PyFlink docs. The guiding principles underlying these APIs should be
>>> written down in one easy-to-find place.
>>>
>>> (2) The other kind of question I get a lot is "Can I do  with ?"
>>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>>> very difficult to answer because it is frequently the case that one has to
>>> reason about why a given feature doesn't seem to appear in the
>>> documentation. It could be that I'm looking in the wrong place, or it could
>>> be that someone forgot to document something, or it could be that it can in
>>> fact be done by applying a general mechanism in a specific way that I
>>> haven't thought of -- as in this case, where one can use a JDBC sink from
>>> Python if one thinks to use DDL.
>>>
>>> So I think it would be helpful to be explicit about both what is, and
>>> what is not, supported in PyFlink. And to have some very clear organizing
>>> principles in the documentation so that users can quickly learn where to
>>> look for specific facts.
>>>
>>> Regards,
>>> David
>>>
>>>
>>> On Tue, Aug 4, 2020 at 1:01 PM jincheng sun 
>>> wrote:
>>>
 Hi Seth and David,

 I'm very happy to have your reply and suggestions. I would like to
 share my thoughts here:


Re: [Announce] Flink Forward Global Lineup Released

2020-08-03 Thread Seth Wiesman
+ link

https://www.flink-forward.org/global-2020/speakers

On Mon, Aug 3, 2020 at 11:25 AM Seth Wiesman  wrote:

> Hi everyone!
>
> I'm very excited to announce that the speaker lineup for Flink Forward
> Global has been released. This is a fully online conference on October
> 21-22 and tickets are free. The lineup includes 45+ speakers from the
> companies like of Spotify, AWS, Netflix, Houzz, Workday hear their Flink
> experiences and community talks to discuss the latest technology
> developments.
>
> Additionally, there are 6 half-day workshops (paid) to learn how to
> develop, deploy, operate, and troubleshoot your Flink applications, as well
> as other topics, such as Stateful Functions and Flink SQL. Seats for
> training are limited.
>
> Thank you to everyone who submitted a talk along with our amazing Program
> Committee who helped put this lineup together.
>
> Best,
>
> Seth Wiesman
> - Program Committee Chair - Flink Forward Global
> - Committer Apache Flink
>


[Announce] Flink Forward Global Lineup Released

2020-08-03 Thread Seth Wiesman
Hi everyone!

I'm very excited to announce that the speaker lineup for Flink Forward
Global has been released. This is a fully online conference on October
21-22 and tickets are free. The lineup includes 45+ speakers from the
companies like of Spotify, AWS, Netflix, Houzz, Workday hear their Flink
experiences and community talks to discuss the latest technology
developments.

Additionally, there are 6 half-day workshops (paid) to learn how to
develop, deploy, operate, and troubleshoot your Flink applications, as well
as other topics, such as Stateful Functions and Flink SQL. Seats for
training are limited.

Thank you to everyone who submitted a talk along with our amazing Program
Committee who helped put this lineup together.

Best,

Seth Wiesman
- Program Committee Chair - Flink Forward Global
- Committer Apache Flink


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-03 Thread Seth Wiesman
Hi Jincheng,

I'm very excited to see the enthusiasm for documentation work but I am
concerned about the communities long term ability to maintain this
contribution. In particular, I'm concerned that this proposal duplicates a
lot of content that will quickly get out of sync. So far the community does
not have a great track record for maintaining documentation after its
initial contribution.

In particular, I do not believe the following items need to be copied:

DataTypes
Built-in functions
Connectors
SQL
Catalogs
Configurations

Another issue is that this proposal feels like it is documenting PyFlink
separately from the rest of the project. Things like the cookbook and
tutorial should be under the Try Flink section of the documentation.

Seth


On Mon, Aug 3, 2020 at 1:08 AM jincheng sun 
wrote:

> Would be great if you could join the contribution of PyFlink
> documentation @Marta !
> Thanks for all of the positive feedback. I will start a formal vote then
> later...
>
> Best,
> Jincheng
>
>
> Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:
>
> > Hi jincheng,
> >
> > Thanks for the discussion. +1 for the FLIP.
> >
> > A well-organized documentation will greatly improve the efficiency and
> > experience for developers.
> >
> > Best,
> > Shuiqiang
> >
> > Hequn Cheng  于2020年8月1日周六 上午8:42写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for raising the discussion. +1 for the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is
> mixed
> >> with Java/Scala documentation. It is hard for users to have an overview
> of
> >> all the PyFlink documents. As more and more functionalities are added
> into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference
> in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang 
> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to
> know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).
> According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from
> it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - 

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-03 Thread Seth Wiesman
Hi Jincheng,

I'm very excited to see the enthusiasm for documentation work but I am
concerned about the communities long term ability to maintain this
contribution. In particular, I'm concerned that this proposal duplicates a
lot of content that will quickly get out of sync. So far the community does
not have a great track record for maintaining documentation after its
initial contribution.

In particular, I do not believe the following items need to be copied:

DataTypes
Built-in functions
Connectors
SQL
Catalogs
Configurations

Another issue is that this proposal feels like it is documenting PyFlink
separately from the rest of the project. Things like the cookbook and
tutorial should be under the Try Flink section of the documentation.

Seth


On Mon, Aug 3, 2020 at 1:08 AM jincheng sun 
wrote:

> Would be great if you could join the contribution of PyFlink
> documentation @Marta !
> Thanks for all of the positive feedback. I will start a formal vote then
> later...
>
> Best,
> Jincheng
>
>
> Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:
>
> > Hi jincheng,
> >
> > Thanks for the discussion. +1 for the FLIP.
> >
> > A well-organized documentation will greatly improve the efficiency and
> > experience for developers.
> >
> > Best,
> > Shuiqiang
> >
> > Hequn Cheng  于2020年8月1日周六 上午8:42写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for raising the discussion. +1 for the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is
> mixed
> >> with Java/Scala documentation. It is hard for users to have an overview
> of
> >> all the PyFlink documents. As more and more functionalities are added
> into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference
> in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang 
> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to
> know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).
> According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from
> it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - 

Re: Flink state reconciliation

2020-07-30 Thread Seth Wiesman
That is doable via the state processor API, though Arvid's idea does sound
simpler :)

You could read the operator with the rules, change the data as necessary
and then rewrite it out as a new savepoint to start the job.


On Thu, Jul 30, 2020 at 5:24 AM Arvid Heise  wrote:

> Another idea: since your handling on Flink is idempotent, would it make
> sense to also periodically send the whole rule set anew?
>
> Going further, depending on the number of rules, their size, and the
> update frequency. Would it be possible to always transfer the complete rule
> set and just discard the old state on update (or do the reconsolidation in
> Flink).
>
> On Wed, Jul 29, 2020 at 2:49 PM Александр Сергеенко <
> aleksandr.sergee...@gmail.com> wrote:
>
>> Hi Kostas
>>
>> Thanks for a possible help!
>>
>> пт, 24 июл. 2020 г., 19:08 Kostas Kloudas :
>>
>>> Hi Alex,
>>>
>>> Maybe Seth (cc'ed) may have an opinion on this.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Jul 23, 2020 at 12:08 PM Александр Сергеенко
>>>  wrote:
>>> >
>>> > Hi,
>>> >
>>> > We use so-called "control stream" pattern to deliver settings to the
>>> Flink job using Apache Kafka topics. The settings are in fact an unlimited
>>> stream of events originating from the master DBMS, which acts as a single
>>> point of truth concerning the rules list.
>>> >
>>> > It may seems odd, since Flink guarantees the "exactly once" delivery
>>> semantics, while a service, which provides the rules publishing mechanism
>>> to Kafka is written using Akka Streams and guarantees the "at least once"
>>> semantics while the rule handling inside Flink Job implemented in an
>>> idempotent manner, but: we have to manage some cases when we need to
>>> execute a reconciliation between the current rules stored at the master
>>> DBMS and the existing Flink State.
>>> >
>>> > We've looked at the Flink's tooling and found out that the State
>>> Processor API can possibly solve our problem, so we basically have to
>>> implement a periodical process, which unloads the State to some external
>>> file (.csv) and then execute a comparison between the set and the
>>> information given at the master system.
>>> > Basically it looks like the lambda architecture approach while Flink
>>> is supposed to implement the kappa architecture and in that case our
>>> reconciliation problem looks a bit far-fetched.
>>> >
>>> > Are there any best practices or some patterns addressing such
>>> scenarios in Flink?
>>> >
>>> > Great thanks for any possible assistance and ideas.
>>> >
>>> > -
>>> > Alex Sergeenko
>>> >
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-30 Thread Seth Wiesman
+1 Its time to drop DataSet

Flavio, those issues are expected. This FLIP isn't just to drop DataSet but
to also add the necessary enhancements to DataStream such that it works
well on bounded input.

On Thu, Jul 30, 2020 at 8:49 AM Flavio Pompermaier 
wrote:

> Just to contribute to the discussion, when we tried to do the migration we
> faced some problems that could make migration quite difficult.
> 1 - It's difficult to test because of
> https://issues.apache.org/jira/browse/FLINK-18647
> 2 - missing mapPartition
> 3 - missing   DataSet runOperation(CustomUnaryOperation
> operation)
>
> On Thu, Jul 30, 2020 at 12:40 PM Arvid Heise  wrote:
>
> > +1 of getting rid of the DataSet API. Is DataStream#iterate already
> > superseding DataSet iterations or would that also need to be accounted
> for?
> >
> > In general, all surviving APIs should also offer a smooth experience for
> > switching back and forth.
> >
> > On Thu, Jul 30, 2020 at 9:39 AM Márton Balassi  >
> > wrote:
> >
> > > Hi All,
> > >
> > > Thanks for the write up and starting the discussion. I am in favor of
> > > unifying the APIs the way described in the FLIP and deprecating the
> > DataSet
> > > API. I am looking forward to the detailed discussion of the changes
> > > necessary.
> > >
> > > Best,
> > > Marton
> > >
> > > On Wed, Jul 29, 2020 at 12:46 PM Aljoscha Krettek  >
> > > wrote:
> > >
> > >> Hi Everyone,
> > >>
> > >> my colleagues (in cc) and I would like to propose this FLIP for
> > >> discussion. In short, we want to reduce the number of APIs that we
> have
> > >> by deprecating the DataSet API. This is a big step for Flink, that's
> why
> > >> I'm also cross-posting this to the User Mailing List.
> > >>
> > >> FLIP-131: http://s.apache.org/FLIP-131
> > >>
> > >> I'm posting the introduction of the FLIP below but please refer to the
> > >> document linked above for the full details:
> > >>
> > >> --
> > >> Flink provides three main SDKs/APIs for writing Dataflow Programs:
> Table
> > >> API/SQL, the DataStream API, and the DataSet API. We believe that this
> > >> is one API too many and propose to deprecate the DataSet API in favor
> of
> > >> the Table API/SQL and the DataStream API. Of course, this is easier
> said
> > >> than done, so in the following, we will outline why we think that
> having
> > >> too many APIs is detrimental to the project and community. We will
> then
> > >> describe how we can enhance the Table API/SQL and the DataStream API
> to
> > >> subsume the DataSet API's functionality.
> > >>
> > >> In this FLIP, we will not describe all the technical details of how
> the
> > >> Table API/SQL and DataStream will be enhanced. The goal is to achieve
> > >> consensus on the idea of deprecating the DataSet API. There will have
> to
> > >> be follow-up FLIPs that describe the necessary changes for the APIs
> that
> > >> we maintain.
> > >> --
> > >>
> > >> Please let us know if you have any concerns or comments. Also, please
> > >> keep discussion to this ML thread instead of commenting in the Wiki so
> > >> that we can have a consistent view of the discussion.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > 
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
>


Re: Is Flink HIPAA certified

2020-07-01 Thread Seth Wiesman
Hi Prasanna,

There are Flink use cases in the US healthcare space, unfortunately, I do
not have any public references that I will be able to provide.

Some important Flink features that are relevant when working in a field
that requires compliance:


   - SSL:
   https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html
   - Kerberos Authentication:
   
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/security-kerberos.html
   - State TTL:
   
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl

Seth

On Wed, Jul 1, 2020 at 1:18 AM Prasanna kumar 
wrote:

> Thanks Marta,
>
> Prasanna.
>
> On Wed 1 Jul, 2020, 11:35 Marta Paes Moreira,  wrote:
>
>> Hi, Prasanna.
>>
>> We're not aware of any Flink users in the US healthcare space (as far as
>> I know).
>>
>> I'm looping in Ryan from AWS, as he might be able to tell you more about
>> how you can become HIPAA-compliant with Flink [1].
>>
>> Marta
>>
>> [1]
>> https://docs.aws.amazon.com/kinesisanalytics/latest/java/akda-java-compliance.html
>>
>> On Sat, Jun 27, 2020 at 9:41 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi Community ,
>>>
>>> Could anyone let me know if Flink is used in US healthcare tech space ?
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>


Re: Dynamic partitioner for Flink based on incoming load

2020-06-24 Thread Seth Wiesman
You can achieve this in Flink 1.10 using the StreamingFileSink.

I’d also like to note that Flink 1.11 (which is currently going through
release testing and should be available imminently) has support for exactly
this functionality in the table API.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html


On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik 
wrote:

> Hello!
>
> We are working an a Flink Streaming job that reads data from multiple
> Kafka topics and writes them to DFS. We are using StreamingFileSink with
> custom implementation for GCS FS and it generates a lot of files as streams
> are partitioned among multiple JMs. In the ideal case we should have at
> most 1 file per kafka topic per interval. We also have some heavy topics
> and some pretty light ones, so the solution should also be smart to utilize
> resources efficiently.
>
> I was thinking we can partition based on how much data is ingested in the
> last minute or so to make sure: messages from the same topic are routed to
> the same (or minimal number of ) file if there are enough resources to do
> so. Think bin packing.
>
> Is it a good idea? Is there a built in way to achieve it? If not, is there
> a way to push state into the partitioner (or even kafka client to
> repartition in the source)? I was thinking that I can run a side stream
> that will calculate data volumes and then broadcast it into the main
> stream, so partitioner can make a decision, but it feels a bit complex.
>
> Another way is to modify kafka client to track messages per topics and
> make decision at that layer.
>
> Am I on the right path?
>
> Thank you
>


Re: RichAggregationFunction

2020-06-24 Thread Seth Wiesman
Hi Steven,

AggregationFunctions (along with Reduce and other “pre aggregation”
functions) are not allowed to be Rich.

In general if you need to go outside the predefined bounds of what the
window operator provides I’d encourage you to take a look at a
KeyedProcessFunction.

Seth

On Wed, Jun 24, 2020 at 3:07 PM Arvid Heise  wrote:

> Hi Steven,
>
> could you please provide more information. Which Flink version are you
> using? Why isn't RichAggregationFunction working for you?
>
> In general, you always have the option to use a custom window assigner and
> delegate most of the calls to some provided implementation. Then you modify
> the behavior as you'd see it fit.
>
> On Tue, Jun 23, 2020 at 11:40 PM Steven Nelson 
> wrote:
>
>> I am trying to add some custom metrics to my window (because the window
>> is causing a lot of backpressure). However I can't seem to use a
>> RichAggregationFunction instead of an AggregationFunction. I am trying to
>> see how long things get held in our EventTimeSessionWindows.withGap window.
>> Is there another option for doing this?
>>
>> -Steve
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> 
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [Announce] Flink Forward Call for Proposals Extended

2020-06-24 Thread Seth Wiesman
As a reminder, the CfP for Flink Forward is open until this Sunday, June
28th.

If you've never spoken at a conference before and are thinking about
submitting, out amazing event manager Laura just wrote an article on dev.to
about why virtual conferences are the best way to get started.

[1]
https://dev.to/anavasiliuk/5-reasons-why-you-should-consider-presenting-at-flink-forward-global-virtual-2020-4jk

Seth

On Fri, Jun 19, 2020 at 10:07 AM Israel Ekpo  wrote:

> Thanks Seth for sharing this.
>
> I am looking forward to the event.
>
> On Fri, Jun 19, 2020 at 10:54 AM Seth Wiesman  wrote:
>
>> Hi Everyone!
>>
>>
>> The Call for Presentations for Flink Forward has been extended until *Sunday,
>> June 28, 11:59 pm PST*. We know that tech conferences are not a priority
>> for everyone at this moment, so we wanted to ensure everyone has time to
>> work on their ideas.
>>
>>
>> As a reminder, Flink Forward Global Conference 2020 will take place
>> virtually, October 19-21.
>>
>>
>> If you would like to reserve your seat
>> <https://www.flink-forward.org/global-2020> for the conference &
>> training, pre-registration is also open. You can secure a spot on our
>> Apache Flink training program and make sure you are registered for the
>> conference days too. More information on the training schedule and pricing
>> will follow soon. The conference itself will be free to attend!
>>
>>
>> Conference tracks include:
>>
>>- Use Cases
>>- Operations
>>- Technology Deep Dive
>>- Ecosystem
>>- Community and Industry Impact
>>
>>
>> You can find more detailed track descriptions and the form to submit a
>>
>> proposal on the Flink Forward website at:
>>
>>
>> https://www.flink-forward.org/global-2020/call-for-presentations
>>
>>
>> Best,
>>
>>
>> Seth
>>
>> PC Chair - Flink Forward Global 2020
>>
>>
>>
>>
>>
>>


[Announce] Flink Forward Call for Proposals Extended

2020-06-19 Thread Seth Wiesman
Hi Everyone!


The Call for Presentations for Flink Forward has been extended until *Sunday,
June 28, 11:59 pm PST*. We know that tech conferences are not a priority
for everyone at this moment, so we wanted to ensure everyone has time to
work on their ideas.


As a reminder, Flink Forward Global Conference 2020 will take place
virtually, October 19-21.


If you would like to reserve your seat
 for the conference & training,
pre-registration is also open. You can secure a spot on our Apache Flink
training program and make sure you are registered for the conference days
too. More information on the training schedule and pricing will follow
soon. The conference itself will be free to attend!


Conference tracks include:

   - Use Cases
   - Operations
   - Technology Deep Dive
   - Ecosystem
   - Community and Industry Impact


You can find more detailed track descriptions and the form to submit a

proposal on the Flink Forward website at:


https://www.flink-forward.org/global-2020/call-for-presentations


Best,


Seth

PC Chair - Flink Forward Global 2020


Re: Stateful functions Harness

2020-05-27 Thread Seth Wiesman
Hi Boris,

Example usage of flink sources and sink is available in the
documentation[1].

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/flink-connectors.html

On Wed, May 27, 2020 at 1:08 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Thats not exactly the usage question that I am asking
> When I am writing IO module I have to write Ingress and Egress spec.
> You have an example for Kafka, which looks like
>
> def getIngressSpec: IngressSpec[GreetRequest] =
>   KafkaIngressBuilder.forIdentifier(GREETING_INGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withTopic("names")
> .withDeserializer(classOf[GreetKafkaDeserializer])
> .withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
> .build
>
> def getEgressSpec: EgressSpec[GreetResponse] =
>   KafkaEgressBuilder.forIdentifier(GREETING_EGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withSerializer(classOf[GreetKafkaSerializer])
> .build
>
> How is it going to look if I am using SourceSinkModule?
> Do I just specify stream names? Something else?
>
>
>
>
>
> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>
>
> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> I think I figured this out.
>> The project seems to be missing
>>
>> resources
>> 
>> /META-INF
>> 
>> /services directory, which should contain services
>>
>
> Yes, the functions / ingresses / regresses etc. are not discoverable if
> the service file isnt present in the classpath.
>
> For the examples, if you are running it straight from the repo, should all
> have that service file defined and therefore readily runnable.
>
> If you are creating your own application project, you'll have to add that
> yourself.
>
>
>> Another question:
>> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>>
>> Class, which I think allows to use existing data streams as
>> ingress/egress.
>>
>> Are there any examples of its usage
>>
>
> On the Harness class, there is a withFlinkSourceFunction method in which
> you can directly add a Flink source function as the ingress.
>
> If you want to use that directly in a normal application (not just
> execution in IDE with the Harness), you can define your ingesses/egresses
> by binding SourceFunctionSpec / SinkFunctionSpec.
> Please see how they are being used in the Harness class for examples.
>
> Gordon
>
>
>>
>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> The example is working fine on my side (also using IntelliJ).
>> This could most likely be a problem with your project setup in the IDE,
>> where the classpath isn't setup correctly.
>>
>> What do you see when you right click on the
>> statefun-flink-harness-example directory (in the IDE) --> Open Module
>> Settings, and then under the "Sources" / "Dependencies" tab?
>> Usually this should all be automatically setup correctly when importing
>> the project.
>>
>> Gordon
>>
>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> The project
>>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>> Does not work in Intellij.
>>>
>>> The problem is that when running in Intellij, method public static Modules
>>> loadFromClassPath() {
>>> Does not pick up classes, which are local in Intellij
>>>
>>> Any work arounds?
>>>
>>>
>>>
>>>
>>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
>>> Hi,
>>>
>>> Sorry, I need to correct my comment on using the Kafka ingress / egress
>>> with the Harness.
>>>
>>> That is actually doable, by adding an extra dependency to
>>> `statefun-flink-distribution` in your Harness program.
>>> That pulls in all the other required dependencies required by the Kafka
>>> ingress / egress, such as the source / sink providers and Flink Kafka
>>> connectors.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
 Are you getting an exception from running the Harness?
 The Harness should already have the required configurations, such as
 the parent first classloading config.

 Otherwise, if you would like to add your own configuration, use the
 `withConfiguration` method on the `Harness` class.

 On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
 boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional
> required config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, 

[Announce] Flink Forward Global 2020 - Call for Proposals

2020-05-14 Thread Seth Wiesman
Hi Everyone!


After a successful Virtual Flink Forward in April, we have decided to
present our October edition in the same way. In these uncertain times, we
are conscious of everyone's health and safety and want to make sure our
events are accessible for everyone.


Flink Forward Global Conference 2020 will take place virtually, October
19-21, and the Call for Presentations and pre-registration is now open!


Flink Forward Global will include a virtual training day, followed by two
days of keynotes and breakout sessions. The conference will be free to
attend, and there will be a limited number of paid spots available for the
training.


Conference tracks include:

   - Use Cases
   - Operations
   - Technology Deep Dive
   - Ecosystem
   - Community and Industry Impact


You can find more detailed track descriptions and the form to submit a

proposal on the Flink Forward website at:


https://www.flink-forward.org/global-2020/call-for-presentations


The deadline for submissions is June 19th, 2020. If you have any questions
please feel free to reach out to me here or directly.


Best,


Seth

PC Chair - Flink Forward Global 2020

Twitter: @sjwiesman


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Seth Wiesman
Gordon is correct. Additionally, if you are using flink 1.10 you may be
running into a known bug that has been resolved in 1.10.1 which will be
released soon.

Seth

https://issues.apache.org/jira/browse/FLINK-16313


On Fri, May 8, 2020 at 5:19 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> The last time I saw this error, was that there was a mismatch in the used
> flink-state-processor-api version and other core Flink dependencies.
> Could you confirm that?
>
> Also, are you seeing this assertion error consistently, or only
> occasionally?
> cc'ing Seth, maybe he has other clues on the cause.
>
> Cheers,
> Gordon
>
> On Fri, May 8, 2020 at 3:06 PM luisfaamaral 
> wrote:
>
>> No one? :)
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: How to list timers registered in timer service?

2020-05-04 Thread Seth Wiesman
Hi Lasse,

In the state processor api, KeyedStateReaderFunction#readKey has a
parameter called `Context` which you can use to read the registered event
time and proc time timers for a key.

Best,

Seth

On Fri, May 1, 2020 at 2:57 AM Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> I have a case where I want to have one timer for each key. I can see my
> state is slowly growing for this operator and as my key is more or less
> constant so I want to investigate what happens.
>
> I looked at state processor api but I can’t see any functionality to read
> the timers registered on timerservice and timer service don’t have a
> functionality to list registered timers.
> Any idea how to read current registered timers.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>


Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Seth Wiesman
Hi All,

There is a bug in the builder that prevents it from compiling in scala due
to differences in type inference between java and scala[1]. It as already
been resolved for 1.10.1 and 1.11. In the meantime, just go ahead and use
casts or construct the object in a java class.

Seth

[1] https://issues.apache.org/jira/browse/FLINK-16684

On Tue, Apr 21, 2020 at 7:33 AM Leonard Xu  wrote:

> Hi, Averell
>
> I found you’re using scala so I reproduced your case local in Scala
> 2.11.12 with Flink 1.10.0 and it works too.
> From your picture it’s wired  that line`.withBucketAssigner(new
> DateTimeBucketAssigner)` hint is `Any`,
> it should be `RowFormatBuilder` otherwise you can not call `#build`
> function from an `Any` object.
> You can debug this line and to see what happened.
>
>
> Best,
> Leonard
>
>
>
> > 在 2020年4月21日,17:47,Averell  写道:
> >
> > Hello Leonard, Sivaprasanna,
> >
> > But my code was working fine with Flink v1.8.
> > I also tried with a simple String DataStream, and got the same error.
> > /StreamingFileSink
> >  .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
> >  .withRollingPolicy(DefaultRollingPolicy.builder().build())
> >  .withBucketAssigner(new DateTimeBucketAssigner)
> >  .build()/
> > (screenshot below)
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkError.png>
>
> >
> > It's weird. At first I thought it's something wrong with IntelliJ, but I
> got
> > the same error when running mvn from commandline.
> >
> >
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: Cannot register for Flink Forward Conference

2020-04-20 Thread Seth Wiesman
Hi Eleanore,

There was a misconfiguration on the website if you try again everything
should work.

Seth

On Mon, Apr 20, 2020 at 1:39 PM Eleanore Jin  wrote:

> Hi community,
>
> My colleagues tried to register for the Flink forward conference:
> https://www.bigmarker.com/series/flink-forward-virtual-confer1/series_summit?__hssc=37051071.2.1587407738905&__hstc=37051071.db5b837955b42a71990541edc07d7beb.1587407738904.1587407738904.1587407738904.1=3382611e-f2c7-4bc8-a1f8-3ab5d3322f61%7Cbf3a04dd-df45-4ccf-83b0-8046dc8ab7d2
>
> But they saw the following message, and if they click continue, it goes
> back to the page. I just wonder does the website still allow registration?
> If so, can someone please help with this?
>
> Thanks a lot!
> Eleanore
>
> [image: image.png]
>


Re: TypeInformation composition ?

2020-04-11 Thread Seth Wiesman
If the type information for T is stored in a member variable called myTypeInfo 
you can do something like this.

import org.apache.flink.api.common.typeinfo.Types;

Types.TUPLE(Types.LONG, myTypeInfo);

Seth 

> On Apr 11, 2020, at 11:06 AM, Laurent Exsteens  
> wrote:
> 
> 
> Hello,
> 
> I have a generic ProcessFunction using list state, for which I receive the 
> type information as constructor parameter (since it is not possible to create 
> the type information in the class due to type Erasure).
> 
> I now need to keep not only the data, but also the timestamp at which they 
> appear. But I don't find a way to create a list state of Tuple2 
> (where T is the generic type, for which I receive the type information as 
> parameter.
> 
> I would like to create that state descriptor getting only those informations 
> (in order to keep the interface of the class simple.
> 
> Right now I manage by creating 2 state: one for T and one for Long.
> 
> But I wonder if I could create that composite state,. For exemple n'y 
> composing the type informations, but I didn't find a way to do that.
> 
> Any other idea is also welcome.
> 
> Thanks in advance.
> 
> Regards,
> 
> Laurent.
> 
> ♻ Be green, keep it on the screen


Re: [PROPOSAL] Contribute training materials to Apache Flink

2020-04-09 Thread Seth Wiesman
Hi David,

+1 to add to the project.

I agree that flink.apache.org and flink playgrounds respectively are the
best places to host this content.

On Thu, Apr 9, 2020 at 2:56 PM Niels Basjes  wrote:

> Hi,
>
> Sounds like a very nice thing to have as part of the project ecosystem.
>
> Niels
>
> On Thu, Apr 9, 2020 at 8:10 PM David Anderson  wrote:
>
>> Dear Flink Community!
>>
>> For some time now Ververica has been hosting some freely available Apache
>> Flink training materials at https://training.ververica.com. This includes
>> tutorial content covering the core concepts of the DataStream API, and
>> hands-on exercises that accompany those explanations.
>>
>> Website: https://training.ververica.com
>> Website repo: https://github.com/dataArtisans/flink-training
>> Exercises: repo: https://github.com/ververica/flink-training-exercises
>>
>> We would like to contribute this training content to Apache Flink. By
>> doing
>> so, we hope to make it even easier for folks to get started with Flink.
>> Especially during this time when so many are working from home, we'd like
>> to get this self-paced training course out where more people will see it.
>>
>> If the community wants these training materials, then this also raises the
>> question of where to put them. We are thinking it would be best to
>> integrate the website content into flink.apache.org, and to add the
>> exercises to flink-playgrounds -- but these points can be discussed
>> separately once we've established that the community wants this content.
>>
>> Looking forward to hearing what you think!
>>
>> Best regards,
>> David
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: ListState with millions of elements

2020-04-08 Thread Seth Wiesman
There is a limitation in RocksDB's JNI bridge that will cause applications
to fail if list state exceeds 2GB. I am not aware of anyone working on this
issue.

Seth.

[1] https://github.com/facebook/rocksdb/issues/2383

On Wed, Apr 8, 2020 at 12:02 PM Aaron Levin  wrote:

> Hello friendly Flink community!
>
> I'm curious if anyone has operational experience with jobs that store
> ListState where occasionally, due to skew, some small number of lists
> stored in ListState (stored in RocksDB) will have millions of elements.
> Here are the stats:
>
> * millions of keys
> * p95 size of list in ListState is ~2.
> * some small number of keys (less than 100) may have lists whose size is
> on the order of tens of thousands and up to millions.
> * state is stored in RocksDB
>
> Are there any known issues or limitations with storing or fetching that
> much list state out of RocksDB? I realize fetching from RocksDB and
> deserializing will be costly when hitting a key with a list of a million
> elements, but is there anything else we should consider?
>
> Thanks!
>
> Best,
>
> Aaron Levin
>


Re: Creating singleton objects per task manager

2020-04-07 Thread Seth Wiesman
Hi Kristoff,

You are correct that, that was a typo :)
At most one instance per slot. 

Seth  

> On Apr 7, 2020, at 9:41 AM, KristoffSC  wrote:
> 
> Hi Seth,
> I would like to piggyback on this question :)
> 
> You wrote:
> "I would strongly encourage you to create one instance of your object per
> ProcessFunction, inside of open. That would be one instance per slot which
> is not equal to the parallelism of your operator."
> 
> Especially the second part "That would be one instance per slot which is not
> equal to the parallelism of your operator"
> 
> For my understanding the number of processFunction instances is equal to the
> parallelism level of this operator. Paralleled instances are not deployed on
> the same task slot, therefore if you create objects in open() method then
> you will have as many objects as there are processFunction instances which
> in my understanding is equal to the parallelism level for this operator.
> 
> Thanks,
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State Processor API with Beam

2020-04-06 Thread Seth Wiesman
Hi Stephen,

You will need to implement a custom operator and user the `transform`
method. It's not just that you need to specify the namespace type but you
will also need to look into the beam internals to see how it stores data in
flink state, how it translates between beam serializers and flink
serializers, etc.

Seth

On Mon, Apr 6, 2020 at 1:02 PM Stephen Patel  wrote:

> I've got an apache beam pipeline running on flink (1.9.1).
>
> I've been attempting to read a RocksDB savepoint taken from this
> beam-on-flink pipeline, using the state processor api, however it seems to
> have some incompatibilities around namespaces.  Beam for instance uses a
> String namespace, while the KeyedStateInputFormat uses the VoidNamespace.
> This manifests as an exception:
>
> Caused by: org.apache.flink.util.StateMigrationException: The new namespace 
> serializer must be compatible.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:524)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>   at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>   at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>   at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>
> Is there any way to let the namespace type (and value) be specified by the
> user?
>


Re: Creating singleton objects per task manager

2020-04-06 Thread Seth Wiesman
Hi Salva,

One TaskManager == One JVM. There is nothing Flink specific here, you can
just create a singleton how you would in any other JVM application. But be
careful, if your singleton does any sort of locking/coordination it will
quickly become the bottleneck in your application. I would strongly
encourage you to create one instance of your object per ProcessFunction,
inside of open. That would be one instance per slot which is not equal to
the parallelism of your operator. This would not create an object per key
if that is your concern.

On Mon, Apr 6, 2020 at 5:26 AM Salva Alcántara 
wrote:

> I need to create a singleton (manager) object to be used within all the
> parallel instances of my UDF operator (a `ProcessFunction`). What is the
> proper way of creating such a singleton object per task manager?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: PartitionNotFoundException when restarting from checkpoint

2018-03-14 Thread Seth Wiesman
Unfortunately the stack trace was swallowed by the java timer in the 
LocalInputChannel[1], the real error is forwarded out to the main thread but I 
couldn’t figure out how to see that in my logs.

However, I believe I am close to having a reproducible example. Run a 1.4 
DataStream, sinking to kafka 0.11 and cancel with a savepoint. If you then shut 
down the kafka daemon on a single broker but keep the rest proxy up you should 
see this error when you resume.

[1] 
https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L151

[cid:image001.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>



From: Fabian Hueske <fhue...@gmail.com>
Date: Tuesday, March 13, 2018 at 8:02 PM
To: Seth Wiesman <swies...@mediamath.com>, Stefan Richter 
<s.rich...@data-artisans.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: PartitionNotFoundException when restarting from checkpoint

Hi Seth,
Thanks for sharing how you resolved the problem!
The problem might have been related to Flink's key groups which are used to 
assign key ranges to tasks.
Not sure why this would be related to ZooKeeper being in a bad state. Maybe 
Stefan (in CC) has an idea about the cause.
Also, it would be helpful if you could share the stacktrace of the exception 
(in case you still have it).
Best, Fabian

2018-03-13 14:35 GMT+01:00 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>>:
It turns out the issue was due to our zookeeper installation being in a bad 
state. I am not clear enough on flink’s networking internals to explain how 
this manifested as a partition not found exception, but hopefully this can 
serve as a starting point for other’s who run into the same issue.

[cid:image002.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>



From: Seth Wiesman <swies...@mediamath.com<mailto:swies...@mediamath.com>>
Date: Friday, March 9, 2018 at 11:53 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: PartitionNotFoundException when restarting from checkpoint

Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)  Are partitions a part of state or are the ephemeral to the job

2)  If they are not part of state, where would the task managers be getting 
that partition id to begin with

3)  Right now we are logging everything under 
org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network, is 
there anywhere else to look

Thank you,

[cid:image003.png@01D3BB7D.472CF0B0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>





Re: PartitionNotFoundException when restarting from checkpoint

2018-03-13 Thread Seth Wiesman
It turns out the issue was due to our zookeeper installation being in a bad 
state. I am not clear enough on flink’s networking internals to explain how 
this manifested as a partition not found exception, but hopefully this can 
serve as a starting point for other’s who run into the same issue.

[cid:image001.png@01D3BAAE.915F15C0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>



From: Seth Wiesman <swies...@mediamath.com>
Date: Friday, March 9, 2018 at 11:53 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: PartitionNotFoundException when restarting from checkpoint

Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)  Are partitions a part of state or are the ephemeral to the job

2)  If they are not part of state, where would the task managers be getting 
that partition id to begin with

3)  Right now we are logging everything under 
org.apache.flink.runtime.io.network, is there anywhere else to look

Thank you,

[cid:image002.png@01D3BAAE.915F15C0]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>




PartitionNotFoundException when restarting from checkpoint

2018-03-09 Thread Seth Wiesman
Hi,

We are running Flink 1.4.0 with a yarn deployment on ec2 instances, rocks dB 
and incremental checkpointing, last night a job failed and became stuck in a 
restart cycle with a PartitionNotFound. We tried restarting the checkpoint on a 
fresh Flink session with no luck. Looking through the logs we can see that the 
specified partition is never registered with the ResultPartitionManager.

My questions are:

1)   Are partitions a part of state or are the ephemeral to the job

2)   If they are not part of state, where would the task managers be 
getting that partition id to begin with

3)   Right now we are logging everything under 
org.apache.flink.runtime.io.network, is there anywhere else to look

Thank you,

[cid:image001.png@01D3B79D.36E45B00]

Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New York, NY 
10007
swies...@mediamath.com<mailto:fl...@mediamath.com>




Re: 'Custom' mapping function on keyed WindowedStream

2018-02-26 Thread Seth Wiesman
I had to solve a similar problem, we use a process function with rocksdb and 
map state for the sub keys. So while we hit rocks on every element, only the 
specified sub keys are ever read from disk. 

Seth Wiesman| Software Engineer4 World Trade Center, 46th Floor, New York, NY 
10007swies...@mediamath.com <mailto:fl...@mediamath.com> 
 

On 2/26/18, 6:32 AM, "Marchant, Hayden " <hayden.march...@citi.com> wrote:

I would like to create a custom aggregator function for a windowed 
KeyedStream which I have complete control over - i.e. instead of implementing 
an AggregatorFunction, I would like to control the lifecycle of the flink state 
by implementing the CheckpointedFunction interface, though I still want this 
state to be per-key, per-window. 

I am not sure which function I should be calling on the WindowedStream in 
order to invoke this custom functionality. I see from the documentation that 
CheckpointedFunction is for non-keyed state - which I guess eliminates this 
option.

A little background - I have logic that needs to hold a very large state in 
the operator - lots of counts by sub-key. Since only a sub-set of these 
aggregations are updated, I was interesting in trying out incremental 
checkpointing in RocksDB. However, I don't want to be hitting RocksDB I/O on 
every update of state since we need very low latency, and instead wanted to 
hold the state in Java Heap and then update the Flink state on checkpoint - i.e 
something like CheckpointedFunction.
My assumption is that any update I make to RocksDB backed state will hit 
the local disk - if this is wrong then I'll be happy

What other options do I have?

Thanks,
Hayden Marchant





Re: CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

2018-01-12 Thread Seth Wiesman
Here is the stack trace: 

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of 
scala.collection.immutable.$colon$colon to field 
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo.fieldNames of type 
scala.collection.Seq in instance of 
com.mediamath.reporting.PerformanceJob$$anon$3
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
... 4 more


Seth Wiesman| Software Engineer
4 World Trade Center, 46th Floor, New 
York, NY 10007
swies...@mediamath.com 


 

On 1/12/18, 9:12 AM, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:

Hi Seth,

Thanks a lot for the report!

I think your observation is expected behaviour, if there really is a binary
incompatible change between Scala minor releases.
And yes, the type information macro in the Scala API is very sensitive to
the exact Scala version used. I had in the past also observed generated case
class serializers by the macro to be incompatible across different Scala
minor releases.

Just curious, what exactly is the deserialization failure you observed when
using parent-first classloading?
Perhaps we should properly document these surprises somewhere in the
documentation ...

Cheers,
Gordon





--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

2018-01-11 Thread Seth Wiesman
This is less of a question and more of a PSA.

It looks like there is some sort of binary incompatible change in the scala 
standard library class `scala.collection.immutable.::`  between point releases 
of scala 2.11. CaseClassTypeInfo generated by the type information macro will 
fail to deserialize in user code with parent first class loading if the 
application is not compiled with 2.11.12.  The following will work with Child 
First Class Loading but fail with Parent First.


case class CustomClass(a: Int, b: Float)

class CustomMapFunction[T >: Null : TypeInformation] extends 
MapFunction[String, T] {
  override def map(value: String) = {
val typeInfo = implicitly[TypeInformation[T]]

// custom deserialization here
null
  }
}


env
  .fromCollection(Iterator[String](""))
  .map(new CustomMapFunction[CustomClass])
  .print()



[cid:image001.png@01D38AC3.21940180]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007





Re: Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Quick follow up question. Is there some way to notify a TimestampAssigner that 
is consuming from an idle source?

[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Seth Wiesman <swies...@mediamath.com>
Date: Wednesday, December 13, 2017 at 12:04 PM
To: Timo Walther <twal...@apache.org>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther <twal...@apache.org>
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? 
processWatermark is only called if a minimum watermark arrived from all 
partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007







Re: Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of 
partitions is much less than the parallelism of the program so there would be 
many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740A.880106E0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther <twal...@apache.org>
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? 
processWatermark is only called if a minimum watermark arrived from all 
partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D3740A.880106E0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007







Watermark in broadcast

2017-12-13 Thread Seth Wiesman
Hi,

How are watermarks propagated during a broadcast partition? I have a 
TwoInputStreamTransformation that takes a broadcast stream as one of its 
inputs. Both streams are assigned timestamps and watermarks before being 
connected however I only ever see watermarks from my non-broadcast stream. Is 
this expected behavior? Currently I have overridden processWatermark1 to 
unconditionally call processWatermark but that does not seem like an ideal 
solution.

Thank you,
[cid:image001.png@01D37402.F5C0B480]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007





Re: DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Not a problem, thanks for the quick feedback.

https://issues.apache.org/jira/browse/FLINK-7999

Seth Wiesman

From: Fabian Hueske <fhue...@gmail.com>
Date: Monday, November 6, 2017 at 9:14 AM
To: Seth Wiesman <swies...@mediamath.com>
Cc: user <user@flink.apache.org>
Subject: Re: DataStream to Table Api idioms

Hi Seth,

I think the Table API is not there yet to address you use case.

1. Allowed lateness cannot be configured but it is on the list of features that 
we plan to add in the future.
2. Custom triggers are not supported. We are planning to add an option to 
support your use case (early firing and updates).
3. The window joins that will be release with 1.4 require constant boundaries 
(left.time > right.time - X and left.time < right.time + Y).
Variable join window boundaries have not been considered yet and would be quite 
tricky to implement. Would you mind opening a JIRA issue for this feature?

Best, Fabian

2017-11-06 14:56 GMT+01:00 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>>:
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman




DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman



Re: serialization error when using multiple metrics counters

2017-10-09 Thread Seth Wiesman
A scala class contains a single lazy val it is implemented using a boolean flag 
to track if the field has been evaluated. When a class contains, multiple lazy 
val’s it is implemented as a bit mask shared amongst the variables. This can 
lead to inconsistencies as to whether serialization forces evaluation of the 
field, in general lazy val’s should always be marked @transient for expected 
behavior.

Seth

From: Stephan Ewen 
Date: Monday, October 9, 2017 at 2:44 PM
To: Kostas Kloudas 
Cc: Colin Williams , user 

Subject: Re: serialization error when using multiple metrics counters

Interesting, is there a quirk in Scala that using multiple lazy variables 
results possibly in eager initialization of some?

On Mon, Oct 9, 2017 at 4:37 PM, Kostas Kloudas 
> wrote:
Hi Colin,

Are you initializing your counters from within the open() method of you rich 
function?
In other words, are you calling

counter = getRuntimeContext.getMetricGroup.counter(“my counter”)

from within the open().

The counter interface is not serializable. So if you instantiate the counters 
outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the 
exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

Thanks,
Kostas

On Oct 7, 2017, at 11:34 PM, Colin Williams 
> 
wrote:

I've created a RichMapFunction in scala with multiple counters like:

   lazy val successCounter = 
getRuntimeContext.getMetricGroup.counter("successfulParse")
   lazy val failedCounter = 
getRuntimeContext.getMetricGroup.counter("failedParse")
   lazy val errorCounter = 
getRuntimeContext.getMetricGroup.counter("errorParse")

which I increment in the map function. While testing I noticed that I have no 
issues with using a single counter. However with multiple counters I get a 
serialization error using more than one counter.

Does anyone know how I can use multiple counters from my RichMapFunction, or 
what I'm doing wrong?

[info]   
org.apache.flink.api.common.InvalidProgramException:
 The implementation of the RichMapFunction is not serializable. The object 
probably contains or references non serializable fields.
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
[info]   at 
org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
[info]   at 
ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info]   Cause: java.io.NotSerializableException: 
org.apache.flink.metrics.SimpleCounter
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
[info]   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
[info]   at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
[info]   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
[info]   at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
[info]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
[info]   at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
[info]   at 
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
[info]   ...
[info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a 
Error -> ParseResult[LineProtocol] *** FAILED ***
[info]   
org.apache.flink.api.common.InvalidProgramException:
 The implementation of the RichMapFunction is not serializable. The object 
probably contains or references non serializable fields.
[info]   at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
[info]   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)

Re: [POLL] Who still uses Java 7 with Flink ?

2017-07-13 Thread Seth Wiesman
+1 for dropping java 7 

On 7/13/17, 4:59 AM, "Konstantin Knauf"  wrote:

+1 for dropping Java 7

On 13.07.2017 10:11, Niels Basjes wrote:
> +1 For dropping java 1.7
> 
> On 13 Jul 2017 04:11, "Jark Wu"  wrote:
> 
>> +1 for dropping Java 7
>>
>> 2017-07-13 9:34 GMT+08:00 ☼ R Nair (रविशंकर नायर) <
>> ravishankar.n...@gmail.com>:
>>
>>> +1 for dropping Java 1.7.
>>>
>>> On Wed, Jul 12, 2017 at 9:10 PM, Kurt Young  wrote:
>>>
 +1 for droppint Java 7, we have been using Java 8 for more than one 
year
 in Alibaba and everything work fine.

 Best,
 Kurt

 On Thu, Jul 13, 2017 at 2:53 AM, Bowen Li 
 wrote:

> +1 for dropping Java 7
>
> On Wed, Jul 12, 2017 at 9:04 AM, Gyula Fóra 
> wrote:
>
>> +1 for dropping 1.7 from me as well.
>>
>> Gyula
>>
>> On Wed, Jul 12, 2017, 17:53 Ted Yu  wrote:
>>
>>> +1 on dropping support for Java 1.7
>>>
>>>  Original message 
>>> From: Robert Metzger 
>>> Date: 7/12/17 8:36 AM (GMT-08:00)
>>> To: d...@flink.apache.org
>>> Cc: user 
>>> Subject: Re: [POLL] Who still uses Java 7 with Flink ?
>>>
>>> +1 to drop Java 7 support
>>>
>>> I believe that we can move to Java 8 for the argument you've stated.
>>> ElasticSearch 5, Spark 2.2  require Java 8 already, Hadoop 3.0.0
> will
>>> require it as well.
>>>
>>> On Wed, Jul 12, 2017 at 4:02 PM, Driesprong, Fokko
> >>
>>> wrote:
>>>
 Hi,

 I would be in favor of dropping Java 7 as we don't use it in our
> hadoop
 infra (and we are a bank). Also, Spark 2.2 has been released today,
 which doesn't
 support Java 7  ses/spark-release-2-2-0.
>> html
>
 anymore, and Flink should not lack behind :-)

 Cheers, Fokko

 2017-07-12 15:56 GMT+02:00 Stephan Ewen :

> Bumping this thread again.
>
> There are several strong points for dropping Java 7 support,
> apart
>> from
 the
> fact that it is not maintained
>
>   - We could really use the Java 8 default methods feature in
 interfaces to
> evolve the API without breaking backwards compatibility
>
>   - Easier build setup for Scala 2.12 (which requires Java 8),
> no need
 to
> manage the tricky combinations of Java / Scala versions
>
>   - Ability to use vanilla Akka (rather than Flakka) which
> requires
 Java 8.
> - Fewer problems for users that use Akka in the Flink
> applications
> - Flakka currently does not support Scala 2.12
> - Newer Akka versions shade protobuf, which is important
>
> I think these together make a pretty good case for bumping the
>> required
> Java version to Java 8.
>
> It would just help both Flink users (dependency management, Scala
 versions)
> and developers (build simplification) a lot.
> Unless we see users stepping forward and making a case that it
> will be
> impossible for them to upgrade to Java 8, I suggest to go
> forward with
> this.
>
> Best,
> Stephan
>
>
>
> On Thu, Jun 8, 2017 at 9:36 PM, Haohui Mai 
>> wrote:
>
>> +1
>>
>> There are several high impacts security vulnerabilities in JDK
> 7 and
 will
>> not be addressed.
>>
>> As a result we completely moved away from JDK 7.
>>
>> +1 on separating the tasks of supporting Scala 2.12 and JDK 8
> in two
> steps.
>>
>>
>> On Thu, Jun 8, 2017 at 9:53 AM Greg Hogan 
>> wrote:
>>
>>> Is this not two different issues?
>>> - adding builds for Scala 2.12
>>> - upgrading to Java version 1.8
>>>
>>> It may be time to switch, but I haven’t seen anything in
>> FLINK-5005
> which

OperatorState partioning when recovering from failure

2017-05-04 Thread Seth Wiesman
I am curious about how operator state is repartitioned to subtasks when a job 
is resumed from a checkpoint or savepoint. The reason is that I am having 
issues with the ContinuousFileReaderOperator when recovering from a failure.

I consume most of my data from files off S3. I have a custom file monitor that 
understands how to walk my directory structure and outputs 
TimestampedFileSplits downstream in chronological order to the stock 
ContinuousFileReaderOperator. The reader consumes those splits and stores them 
a priority queue based on their last modified time ensuring that files are read 
in chronological order which is exactly what I want. The problem is when 
recovering, the unread splits being partitioned out to each of the subtasks 
seem to be heavily skewed in terms of last modified time.

While each task may have a similar number of files I find then one or two will 
have a disproportionate number of old files. This in turn holds back my 
watermark (sometimes for several hours depending on the number of unread 
splits) which keeps timers from firing, windows from purging, etc.

I was hoping there were some way I could add a custom partitioner to ensure 
that splits are uniformly distributed in a temporal manner or if someone had 
other ideas of how I could mitigate the problem.

Thank you,

Seth Wiesman



Re: Checkpointing with RocksDB as statebackend

2017-02-27 Thread Seth Wiesman
Vinay,

The bucketing sink performs rename operations during the checkpoint and if it 
tries to rename a file that is not yet consistent that would cause a 
FileNotFound exception which would fail the checkpoint.

Stephan,

Currently my aws fork contains some very specific assumptions about the 
pipeline that will in general only hold for my pipeline. This is because there 
were still some open questions that  I had about how to solve consistency 
issues in the general case. I will comment on the Jira issue with more specific.

Seth Wiesman

From: vinay patil <vinay18.pa...@gmail.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Monday, February 27, 2017 at 1:05 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Checkpointing with RocksDB as statebackend

Hi Seth,
Thank you for your suggestion.
But if the issue is only related to S3, then why does this happen when I 
replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS only )
Stephan,
Another issue I see is when I set env.setBufferTimeout(-1) , and keep the 
checkpoint interval to 10minutes, I have observed that nothing gets written to 
sink (tried with S3 as well as HDFS), atleast I was expecting pending files 
here.
This issue gets worst when checkpointing is disabled  as nothing is written.


Regards,
Vinay Patil

On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User Mailing 
List archive.] <[hidden 
email]> wrote:
Hi Seth!

Wow, that is an awesome approach.

We have actually seen these issues as well and we are looking to eventually 
implement our own S3 file system (and circumvent Hadoop's S3 connector that 
Flink currently relies on): https://issues.apache.org/jira/browse/FLINK-5706

Do you think your patch would be a good starting point for that and would you 
be willing to share it?

The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to fork 
officially, if necessary...

Greetings,
Stephan



On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden 
email]<http://user/SendEmail.jtp?type=node=11943=0>> wrote:
Just wanted to throw in my 2cts.

I’ve been running pipelines with similar state size using rocksdb which 
externalize to S3 and bucket to S3. I was getting stalls like this and ended up 
tracing the problem to S3 and the bucketing sink. The solution was two fold:


1)   I forked hadoop-aws and have it treat flink as a source of truth. Emr 
uses a dynamodb table to determine if S3 is inconsistent. Instead I say that if 
flink believes that a file exists on S3 and we don’t see it then I am going to 
trust that flink is in a consistent state and S3 is not. In this case, various 
operations will perform a back off and retry up to a certain number of times.


2)   The bucketing sink performs multiple renames over the lifetime of a 
file, occurring when a checkpoint starts and then again on notification after 
it completes. Due to S3’s consistency guarantees the second rename of file can 
never be assured to work and will eventually fail either during or after a 
checkpoint. Because there is no upper bound on the time it will take for a file 
on S3 to become consistent, retries cannot solve this specific problem as it 
could take upwards of many minutes to rename which would stall the entire 
pipeline. The only viable solution I could find was to write a custom sink 
which understands S3. Each writer will write file locally and then copy it to 
S3 on checkpoint. By only interacting with S3 once per file it can circumvent 
consistency issues all together.

Hope this helps,

Seth Wiesman

From: vinay patil <[hidden 
email]<http://user/SendEmail.jtp?type=node=11943=1>>
Reply-To: "[hidden email]<http://user/SendEmail.jtp?type=node=11943=2>" 
<[hidden email]<http://user/SendEmail.jtp?type=node=11943=3>>
Date: Saturday, February 25, 2017 at 10:50 AM
To: "[hidden email]<http://user/SendEmail.jtp?type=node=11943=4>" 
<[hidden email]<http://user/SendEmail.jtp?type=node=11943=5>>
Subject: Re: Checkpointing with RocksDB as statebackend

HI Stephan,
Just to avoid the confusion here, I am using S3 sink for writing the data, and 
using HDFS for storing checkpoints.
There are 2 core nodes (HDFS) and two task nodes on EMR

I replaced s3 sink with HDFS for writing data in my last test.
Let's say the checkpoint interval is 5 minutes, now within 5minutes of run the 
state size grows to 30GB ,  after checkpointing the 30GB state that is 
maintained in rocksDB has to be copied to HDFS, right ?  is this causing the 
pipeline to stall ?

Regards,
Vinay Patil

On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
Hi Stephan,
To verify if S3 is making teh pipeline stall, I have replaced the S3 sink with 
HDFS and kept minimum pause between checkpoints to 5minutes, still I see the 
same issue with checkpoints getting failed.
If I k

Re: Checkpointing with RocksDB as statebackend

2017-02-27 Thread Seth Wiesman
Just wanted to throw in my 2cts.

I’ve been running pipelines with similar state size using rocksdb which 
externalize to S3 and bucket to S3. I was getting stalls like this and ended up 
tracing the problem to S3 and the bucketing sink. The solution was two fold:


1)   I forked hadoop-aws and have it treat flink as a source of truth. Emr 
uses a dynamodb table to determine if S3 is inconsistent. Instead I say that if 
flink believes that a file exists on S3 and we don’t see it then I am going to 
trust that flink is in a consistent state and S3 is not. In this case, various 
operations will perform a back off and retry up to a certain number of times.


2)   The bucketing sink performs multiple renames over the lifetime of a 
file, occurring when a checkpoint starts and then again on notification after 
it completes. Due to S3’s consistency guarantees the second rename of file can 
never be assured to work and will eventually fail either during or after a 
checkpoint. Because there is no upper bound on the time it will take for a file 
on S3 to become consistent, retries cannot solve this specific problem as it 
could take upwards of many minutes to rename which would stall the entire 
pipeline. The only viable solution I could find was to write a custom sink 
which understands S3. Each writer will write file locally and then copy it to 
S3 on checkpoint. By only interacting with S3 once per file it can circumvent 
consistency issues all together.

Hope this helps,

Seth Wiesman

From: vinay patil <vinay18.pa...@gmail.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Saturday, February 25, 2017 at 10:50 AM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Checkpointing with RocksDB as statebackend

HI Stephan,
Just to avoid the confusion here, I am using S3 sink for writing the data, and 
using HDFS for storing checkpoints.
There are 2 core nodes (HDFS) and two task nodes on EMR

I replaced s3 sink with HDFS for writing data in my last test.
Let's say the checkpoint interval is 5 minutes, now within 5minutes of run the 
state size grows to 30GB ,  after checkpointing the 30GB state that is 
maintained in rocksDB has to be copied to HDFS, right ?  is this causing the 
pipeline to stall ?

Regards,
Vinay Patil

On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden 
email]> wrote:
Hi Stephan,
To verify if S3 is making teh pipeline stall, I have replaced the S3 sink with 
HDFS and kept minimum pause between checkpoints to 5minutes, still I see the 
same issue with checkpoints getting failed.
If I keep the  pause time to 20 seconds, all checkpoints are completed , 
however there is a hit in overall throughput.



Regards,
Vinay Patil

On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User Mailing 
List archive.] <[hidden 
email]> wrote:
Flink's state backends currently do a good number of "make sure this exists" 
operations on the file systems. Through Hadoop's S3 filesystem, that translates 
to S3 bucket list operations, where there is a limit in how many operation may 
happen per time interval. After that, S3 blocks.

It seems that operations that are totally cheap on HDFS are hellishly expensive 
(and limited) on S3. It may be that you are affected by that.

We are gradually trying to improve the behavior there and be more S3 aware.

Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.

Best,
Stephan


On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden 
email]<http://user/SendEmail.jtp?type=node=11891=0>> wrote:

Hi Stephan,

So do you mean that S3 is causing the stall , as I have mentioned in my 
previous mail, I could not see any progress for 16minutes as checkpoints were 
getting failed continuously.

On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing List 
archive.]" <[hidden email]<http://user/SendEmail.jtp?type=node=11887=0>> 
wrote:
Hi Vinay!

True, the operator state (like Kafka) is currently not asynchronously 
checkpointed.

While it is rather small state, we have seen before that on S3 it can cause 
trouble, because S3 frequently stalls uploads of even data amounts as low as 
kilobytes due to its throttling policies.

That would be a super important fix to add!

Best,
Stephan


On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden 
email]<http://user/SendEmail.jtp?type=node=11885=0>> wrote:
Hi,

I have attached a snapshot for reference:
As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3 it
is stuck at the Kafka source after 50%
(The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
15GB )

Within 10minutes 15M records were processed, and for the next 16minutes the
pipeline is stuck , I don't see any progress beyond 15M because of
checkpoints getting failed consistently.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11882

Re: List State in RichWindowFunction leads to RocksDb memory leak

2017-02-24 Thread Seth Wiesman
Also while I’ve got you, is it possible to get the job id from the runtime 
context?

Seth Wiesman

From: Seth Wiesman <swies...@mediamath.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Friday, February 24, 2017 at 2:51 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

Appreciate you getting back to me.

ProcessWindowFunction does look interesting and expect that it will be what I 
move to in the future. However, even if it did currently have the functionality 
that I need today I don’t think I would be comfortable moving to a snapshot 
version so soon after migrating to 1.2.

With the count window: I was actually using a time window with a count trigger 
(stream.timeWindow().allowedLateness().trigger(Count.of(1))). The issue 
appeared to have less to do with state size expanding and more to do with 
checkpoint buffers being blocked somewhere along the pipeline. I decided to 
move away from this idea shortly after sending my last email so I don’t have 
any real insight into what was wrong.

I understand not wanting to break things for people who expect state to be 
global and do not expect to see any api’s change ☺.

The solution I ended up setting on was copying the window operator and giving 
the window function access to the trigger context; luckily it was a fairly 
trivial change to make. With that I am able to keep everything scoped to the 
correct namespace and clean everything up when the window is discarded. Is the 
plan for context in ProcessWindowFunction eventually have access to scoped 
partitioned state or just timing? There are several things I have coming down 
the pipeline that require coordination between window evaluations.

Thank you again for all the help.

Seth Wiesman


From: Aljoscha Krettek <aljos...@apache.org>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Friday, February 24, 2017 at 12:09 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

Hi Seth,
yes, this is a thorny problem but I actually see one additional possible 
solution (that will, however, break other possible use cases.

First, regarding your solution 1):
We are working on adding this for ProcessWindowFunction: 
https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a 
more powerful interface that allows querying more context about a window 
firing. This will replace the current WindowFunction in the future. 
Unfortunately this doesn't help you with your current situation.

About 2), do you have any idea why the state is getting so big? Do you see the 
state of the second (count) window operator growing very large? The problem 
with count windows is that they never get garbage collected if you don't reach 
the count required by a Trigger. If you have an evolving key space this means 
that your state will possibly grow forever.

The third solution that I can think of is to make state of a window function 
implicitly scoped to both the key and window. Right now, state is "global" 
across time and only scoped to a key. If we also scoped to the window we could 
keep track of all state created for a window and then garbage collect that once 
the window expires. This, however, will break things for people that rely on 
this state being global. I'll bring this up on the dev mailing list to see what 
people think about it? Are you also following that one? So that you could chime 
in.

I'm afraid I don't have a good solution for you before Flink 1.3 come out, 
other than writing your own custom operator or copying the WindowOperator.

What do you think?

Best,
Aljoscha
On Thu, 23 Feb 2017 at 16:12 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>> wrote:
I am working on a program that uses a complex window and have run into some 
issues. It is a 1 hour window with 7 days allowed lateness including a custom 
trigger that gives us intermediate results every 5 minutes of processing time 
until the end of 7 days event time when a final fire is triggered and the 
window is purged. The window functions are an incremental reduce function as 
well as a RichWindowFunction which performs some final computation before 
outputting each result. I am building up a collection of objects so each time 
the RichWindowFunction is run I want to take a diff with the previous set to 
only output elements that have changed.

Example:

//In reality I am working with more complex objects than ints.
class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, 
Key, TimeWindow] {
@transient var state: ListState[Int]= _

override def open(parameters: Configuration): Unit = {
val info = new ListStateDescriptor(“previous”, 
createTypeInformation[Int])
 

Re: List State in RichWindowFunction leads to RocksDb memory leak

2017-02-24 Thread Seth Wiesman
Appreciate you getting back to me.

ProcessWindowFunction does look interesting and expect that it will be what I 
move to in the future. However, even if it did currently have the functionality 
that I need today I don’t think I would be comfortable moving to a snapshot 
version so soon after migrating to 1.2.

With the count window: I was actually using a time window with a count trigger 
(stream.timeWindow().allowedLateness().trigger(Count.of(1))). The issue 
appeared to have less to do with state size expanding and more to do with 
checkpoint buffers being blocked somewhere along the pipeline. I decided to 
move away from this idea shortly after sending my last email so I don’t have 
any real insight into what was wrong.

I understand not wanting to break things for people who expect state to be 
global and do not expect to see any api’s change ☺.

The solution I ended up setting on was copying the window operator and giving 
the window function access to the trigger context; luckily it was a fairly 
trivial change to make. With that I am able to keep everything scoped to the 
correct namespace and clean everything up when the window is discarded. Is the 
plan for context in ProcessWindowFunction eventually have access to scoped 
partitioned state or just timing? There are several things I have coming down 
the pipeline that require coordination between window evaluations.

Thank you again for all the help.

Seth Wiesman


From: Aljoscha Krettek <aljos...@apache.org>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Friday, February 24, 2017 at 12:09 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: List State in RichWindowFunction leads to RocksDb memory leak

Hi Seth,
yes, this is a thorny problem but I actually see one additional possible 
solution (that will, however, break other possible use cases.

First, regarding your solution 1):
We are working on adding this for ProcessWindowFunction: 
https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction is a 
more powerful interface that allows querying more context about a window 
firing. This will replace the current WindowFunction in the future. 
Unfortunately this doesn't help you with your current situation.

About 2), do you have any idea why the state is getting so big? Do you see the 
state of the second (count) window operator growing very large? The problem 
with count windows is that they never get garbage collected if you don't reach 
the count required by a Trigger. If you have an evolving key space this means 
that your state will possibly grow forever.

The third solution that I can think of is to make state of a window function 
implicitly scoped to both the key and window. Right now, state is "global" 
across time and only scoped to a key. If we also scoped to the window we could 
keep track of all state created for a window and then garbage collect that once 
the window expires. This, however, will break things for people that rely on 
this state being global. I'll bring this up on the dev mailing list to see what 
people think about it? Are you also following that one? So that you could chime 
in.

I'm afraid I don't have a good solution for you before Flink 1.3 come out, 
other than writing your own custom operator or copying the WindowOperator.

What do you think?

Best,
Aljoscha
On Thu, 23 Feb 2017 at 16:12 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>> wrote:
I am working on a program that uses a complex window and have run into some 
issues. It is a 1 hour window with 7 days allowed lateness including a custom 
trigger that gives us intermediate results every 5 minutes of processing time 
until the end of 7 days event time when a final fire is triggered and the 
window is purged. The window functions are an incremental reduce function as 
well as a RichWindowFunction which performs some final computation before 
outputting each result. I am building up a collection of objects so each time 
the RichWindowFunction is run I want to take a diff with the previous set to 
only output elements that have changed.

Example:

//In reality I am working with more complex objects than ints.
class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, 
Key, TimeWindow] {
@transient var state: ListState[Int]= _

override def open(parameters: Configuration): Unit = {
val info = new ListStateDescriptor(“previous”, 
createTypeInformation[Int])
state = getRuntimeContext.getListState(info)
}

override def apply(key: Key, window: TimeWindow, input: 
Iterable[Collection[Int]], out: Collector[Int]): Unit = {
val current = input.iterator.next
val previous = state.get().iterator.asScala.toSet
previous.clear()

for (elem <- current) {
if (!previous.contains(elem)) {
  

List State in RichWindowFunction leads to RocksDb memory leak

2017-02-23 Thread Seth Wiesman
I am working on a program that uses a complex window and have run into some 
issues. It is a 1 hour window with 7 days allowed lateness including a custom 
trigger that gives us intermediate results every 5 minutes of processing time 
until the end of 7 days event time when a final fire is triggered and the 
window is purged. The window functions are an incremental reduce function as 
well as a RichWindowFunction which performs some final computation before 
outputting each result. I am building up a collection of objects so each time 
the RichWindowFunction is run I want to take a diff with the previous set to 
only output elements that have changed.

Example:

//In reality I am working with more complex objects than ints.
class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int, 
Key, TimeWindow] {
@transient var state: ListState[Int]= _

override def open(parameters: Configuration): Unit = {
val info = new ListStateDescriptor(“previous”, 
createTypeInformation[Int])
state = getRuntimeContext.getListState(info)
}

override def apply(key: Key, window: TimeWindow, input: 
Iterable[Collection[Int]], out: Collector[Int]): Unit = {
val current = input.iterator.next
val previous = state.get().iterator.asScala.toSet
previous.clear()

for (elem <- current) {
if (!previous.contains(elem)) {
out.collect(elem)
}

state.add(elem) //store for the next run
}
}
}

The issue with this is that it causes a memory leak with RocksDb. When the 
WindowOperator executes 
clearAllState<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L527>
 at the end of the windows lifetime it does not clear the ListState or any 
other type of custom partitioned state that may have been created. This causes 
my state size to grow indefinitely. It appears to me that a RichWindowFunction 
should have a clear method, similar to triggers, for cleaning up state when the 
window is destroyed.

Barring that I can envision two ways of solving this problem but have come 
short of successfully implementing them.


1)   If I had access to the watermark from within apply I could use that in 
conjuction with the TimeWindow passed in and be able to tell if it was my final 
EventTimeTimer that had gone off allowing me to manually clear the state:

ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {
state.add(elem) // I know that my window is not 
finished so I can store state.
 }


2)   Pass my elements into a second window with a count trigger of 1 and a 
custom evictor which always keeps the two most recent elements and then do my 
diff there.

Semantically this seems to work but in practice it causes my checkpoint times 
to grow 10x and I seem to fail every 5th-7th checkpoint.

I am curious if anyone here has any ideas of what I might be able to do to 
solve this problem.

Thank you,

Seth Wiesman


Re: state size in relation to cluster size and processing speed

2016-12-23 Thread Seth Wiesman
Watermarks are generated using the PeriodicWatermarkAssigner using a timestamp 
field from within the records. We are processing log data from an S3 bucket and 
logs are always processed in chronological order using a custom 
ContinuousFileMonitoringFunction but the standard ContinousFileReaderOperator. 
Certainly with a larger cluster splits would be processed more quickly and as 
such the watermark would advance at a quicker pace. Why do you think a more 
quickly advancing watermark would affect state size in this case?

Seth Wiesman

From: Aljoscha Krettek <aljos...@apache.org>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Friday, December 23, 2016 at 1:43 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: state size in relation to cluster size and processing speed

Hi,
how are you generating your watermarks? Could it be that they advance faster 
when the job is processing more data?

Cheers,
Aljoscha

On Fri, 16 Dec 2016 at 21:01 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>> wrote:
Hi,

I’ve noticed something peculiar about the relationship between state size and 
cluster size and was wondering if anyone here knows of the reason. I am running 
a job with 1 hour tumbling event time windows which have an allowed lateness of 
7 days. When I run on a 20-node cluster with FsState I can process 
approximately 1.5 days’ worth of data in an hour with the most recent 
checkpoint being ~20gb.  Now if I run the same job with the same configurations 
on a 40-node cluster I can process 2 days’ worth of data in 20 min (expected) 
but the state size is only ~8gb. Because allowed lateness is 7 days no windows 
should be purged yet and I would expect the larger cluster which has processed 
more data to have a larger state. Is there some why a slower running job or a 
smaller cluster would require more state?

This is more of a curiosity than an issue. Thanks’ in advance for any insights 
you may have.

Seth Wiesman


state size in relation to cluster size and processing speed

2016-12-16 Thread Seth Wiesman
Hi,

I’ve noticed something peculiar about the relationship between state size and 
cluster size and was wondering if anyone here knows of the reason. I am running 
a job with 1 hour tumbling event time windows which have an allowed lateness of 
7 days. When I run on a 20-node cluster with FsState I can process 
approximately 1.5 days’ worth of data in an hour with the most recent 
checkpoint being ~20gb.  Now if I run the same job with the same configurations 
on a 40-node cluster I can process 2 days’ worth of data in 20 min (expected) 
but the state size is only ~8gb. Because allowed lateness is 7 days no windows 
should be purged yet and I would expect the larger cluster which has processed 
more data to have a larger state. Is there some why a slower running job or a 
smaller cluster would require more state?

This is more of a curiosity than an issue. Thanks’ in advance for any insights 
you may have.

Seth Wiesman


Custom Window Assigner With Lateness

2016-11-08 Thread Seth Wiesman
Is it possible in a custom window assigner to determine if an object has 
appeared after the watermark has passed? I want to have a standard event time 
tumbling window but custom logic for late data. From what I can tell there is 
no way from within the WindowAssigner interface to determine if an element 
arrived after the watermark. Is this currently possible to do in flink?

Thank you,

Seth Wiesman


ValueState in RichCoFlatMap, possible 1.2-SNAPSHOT regression

2016-10-20 Thread Seth Wiesman
eline1 = env.generateSequence(0, 1000)

val pipeline2 = env.fromElements("even", "odd")

pipeline1.connect(pipeline2)
  .keyBy(
elem ⇒ elem % 2 == 0,
elem ⇒ elem == "even"
  ).flatMap(FlatMapper)
  .print()

env.execute("Example")
  }

}

but this results in this precondition 
failing<https://github.com/apache/flink/blob/6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java#L88>
 on updates.

Seth Wiesman