Re: TTL in pyflink does not seem to work

2024-03-09 Thread David Anderson
My guess is that this only fails when pyflink is used with the heap state
backend, in which case one possible workaround is to use the RocksDB state
backend instead. Another workaround would be to rely on timers in the
process function, and clear the state yourself.

David

On Fri, Mar 8, 2024 at 12:29 AM lorenzo.affetti.ververica.com via user <
user@flink.apache.org> wrote:

> Hello Ivan!
>
> Could you please create a JIRA issue out of this?
> That seem the proper place where to discuss this.
>
> It seems a bug as the two versions of the code you posted look identical,
> and the behavior should be consistent.
> On Mar 7, 2024 at 13:09 +0100, Ivan Petrarka ,
> wrote:
>
> Note, that in Java code, it prints `State: Null`, `State: Null`, as I was
> expecting in, unlike pyflink code
> On Mar 7, 2024 at 15:59 +0400, Ivan Petrarka ,
> wrote:
>
> Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to
> work. I have reproduced the exact same code in Java and it works!
>
> Is this a pyflink bug? If so - how can I report it? If not - what can I
> try to do?
>
> Flink: 1.18.0
> image: flink:1.18.0-scala_2.12-java11
>
> Code to reproduce. I expect this code to print: 
> all the time. But it prints  and state value
>
> ```python
> import time
>
> from datetime import datetime
>
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
> StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
>
>
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> state_descriptor = ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
>
> state_descriptor.enable_time_to_live(
> ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> .cleanup_incrementally(cleanup_size=10,
> run_cleanup_for_every_record=True)
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
>  .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build()
> )
>
> self.state = runtime_context.get_state(state_descriptor)
>
> def process_element(self, value: int, ctx:
> KeyedProcessFunction.Context):
> current_state = self.state.value()
>
> print(datetime.now(), current_state)
>
> if current_state is None:
> self.state.update(str(datetime.now()))
>
> time.sleep(1.5)
>
>
> if __name__ == "__main__":
> # - Init environment
>
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>
> # - Setup pipeline
>
> (
> environment.set_parallelism(1)
> .from_collection(
> collection=list(range(10)),
> )
> .key_by(lambda value: 0)
> .process(Processor())
>
>
>
> )
>
> # - Execute pipeline
>
> environment.execute("ttl_test")
>
>
>
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> import java.io.IOException;
> import java.time.LocalDateTime;
>
> public class GameHistoryProcessor extends KeyedProcessFunction String, String> {
>
>
> private transient ValueState state;
>
>
> @Override
> public void open(Configuration parameters) {
> var stateTtlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> //.cleanupFullSnapshot()
> .cleanupIncrementally(10, true)
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
>  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> var stateDescriptor = new ValueStateDescriptor<>("state",
> String.class);
> stateDescriptor.enableTimeToLive(stateTtlConfig);
>
> state = getRuntimeContext().getState(stateDescriptor);
>
> }
>
> @Override
> public void processElement(String event, Context context,
> Collector collector) throws IOException, InterruptedException {
> var state = state.value();
> System.out.println("State: " + state);
>
> if (state == null) {
> state = LocalDateTime.now().toString();
> state.update(state);
> }
>
> Thread.sleep(1500);
> }
> }```
>
>


Re: The fault tolerance and recovery mechanism in batch mode within Apache Flink.

2024-02-16 Thread David Anderson
With streaming execution, the entire pipeline is always running, which is
necessary so that results can be continuously produced. But with batch
execution, the job graph can be segmented into separate pipelined stages
that can be executed sequentially, each running to completion before the
next begins. See [1] for more details on how the scheduler is organized.

Each stage writes its results to disk, to be read as input by the
subsequent stage. During recovery, the job could be restarted from the very
beginning, but if the job has completed some intermediate stages, and if
the results of those stages are still available on disk, then it isn't
necessary to re-execute those stages which were successfully completed.

David

[1]
https://flink.apache.org/2020/12/02/improvements-in-task-scheduling-for-batch-workloads-in-apache-flink-1.12/

On Fri, Feb 16, 2024 at 4:29 AM Вова Фролов 
wrote:

> Hi everyone,
>
> I am currently exploring the fault tolerance and recovery mechanism in
> batch mode within Apache Flink.
>
> If I terminate the task manager process while the job is running, the job
> restarts from the point of failure. However, at some point, the job
> restarts from the very beginning.
>
> The documentation mentions that the checkpointing and state backend do not
> work in batch mode.
>
> How does recovery after a failure occur in BATCH mode?
>
> According to the documentation: “In BATCH runtime mode, Flink will attempt
> to return to previous processing steps for which intermediate results are
> still available. Potentially, only those tasks that fail (or their
> predecessors in the graph) will have to be restarted.”
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/execution_mode/
>
>
>
> I would appreciate any information regarding this matter.
>
> Kind regards,
>
> Vladimir
>


Re: Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-08 Thread David Anderson
For a collection of several complete sample applications using Flink with
Kafka, see https://github.com/confluentinc/flink-cookbook.

And I agree with Marco -- in fact, I would go farther, and say that using
Spring Boot with Flink is an anti-pattern.

David

On Wed, Feb 7, 2024 at 4:37 PM Marco Villalobos 
wrote:

> Hi Nida,
>
> You can find sample code for using Kafka here:
> https://kafka.apache.org/documentation/
> You can find sample code for using Flink here:
> https://nightlies.apache.org/flink/flink-docs-stable/
> You can find sample code for using Flink with Kafka here:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
>
> You can find sample code for using Spring Boot here:
> https://docs.spring.io/spring-boot/docs/3.2.2/reference/htmlsingle/
> You can find sample code for using Spring Boot with Kafka here:
> https://docs.spring.io/spring-boot/docs/3.2.2/reference/htmlsingle/#messaging.kafka
>
> As far as sample code for using Spring Boot with Apache Flink in the same
> process, you won't find it because both technologies solve different
> problems. Apache Flink is stream programming. Code is submitted to a Flink
> cluster.
>
> Spring Boot is micro-services, IoC, integration, application framework for
> building stand-alone applications (it doesn't run on a cluster).
>
> You don't need Spring Boot in an Apache Flink application and there is now
> way to use Apache Flink within a Spring Boot application.
>
> But, maybe can elaborate on why you think it is necessary to use Spring
> Boot with Apache Flink?
>
>
> Why would you need Spring Boot to for a Flink Job?
>
> > On Feb 6, 2024, at 3:22 AM, Fidea Lidea  wrote:
> >
> > Hi Team,
> >
> > I request you to provide sample codes on data streaming using flink,
> kafka and spring boot.
> >
> > Awaiting your response.
> >
> > Thanks & Regards
> > Nida Shaikh
>
>


Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread David Anderson
I've seen enough demand for a streaming broadcast join in the community to
justify a FLIP -- I think it's a good idea, and look forward to the
discussion.

David

On Fri, Feb 2, 2024 at 6:55 AM Feng Jin  wrote:

> +1 a FLIP for this topic.
>
>
> Best,
> Feng
>
> On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> I would definitely expect a FLIP on this topic before moving to
>> implementation.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Fri, Feb 2, 2024 at 12:47 PM Xuyang  wrote:
>>
>>> Hi, Prabhjot.
>>>
>>> IIUC, the main reasons why the community has not previously considered
>>> supporting join hints only in batch mode are as follows:
>>> 1. In batch mode, multiple join type algorithms were implemented quite
>>> early on, and
>>> 2. Stream processing represents a long-running scenario, and it is quite
>>> difficult to determine whether a small table will become a large table
>>> after a long period of operation.
>>>
>>> However, as you mentioned, join hints do indeed have their significance
>>> in streaming. If you want to support the implementation of "join hints +
>>> broadcast join" in streaming, the changes I can currently think of include:
>>> 1. At optimizer, changing the exchange on the small table side to
>>> broadcast instead of hash (InputProperty#BROADCAST).
>>> 2. Unknown changes required at the table runtime level.
>>>
>>> You can also discuss it within the community through JIRA, FLIP, or the
>>> dev mailing list.
>>>
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" <
>>> user@flink.apache.org> wrote:
>>>
>>> Hi Feng,
>>>
>>> Thanks for your prompt response.
>>> If we were to solve this in Flink, my higher level viewpoint is:
>>>
>>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>>> across Table api (e.g. via a `left.join(right, ,
>>> join_type="broadcast")
>>> 2. Then, support a Broadcast hint that would utilize this new join based
>>> on the hint type
>>>
>>> What do you think about this ?
>>> Would you have some pointers on how/where to start on the first part ?
>>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>>> purpose)
>>>
>>> Thanks,
>>> Prabhjot
>>>
>>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:
>>>
 Hi Prabhjot

 I think this is a reasonable scenario. If there is a large table and a
 very small table for regular join, without broadcasting the regular join,
 it can easily cause data skew.
 We have also encountered similar problems too. Currently, we can only
 copy multiple copies of the small table using the union all and append
 random values to alleviate data skewness.


 Best,
 Feng

 On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
 user@flink.apache.org> wrote:

> Hello folks,
>
>
> We have a use case where we have a few stream-stream joins, requiring
> us to join a very large table with a much smaller table, essentially
> enriching the large table with a permutation on the smaller table 
> (Consider
> deriving all orders/sessions for a new location). Given the nature of the
> dataset, if we use a typical join that uses Hash distribution to co-locate
> the records for each join key, we end up with a very skewed join (a few
> task slots getting all of the work, as against a good distribution).
>
>
> We’ve internally implemented a Salting based solution where we salt
> the smaller table and join it with the larger table. While this works in
> the POC stage, we’d like to leverage flink as much as possible to do such 
> a
> join.
>
>
> By the nature of the problem, a broadcast join seems theoretically
> helpful. We’ve done an exploration on query hints supported in Flink,
> starting with this FLIP
> 
> and this FLIP
> 
> .
>
>
> Currently, the Optimizer doesn't consider the Broadcast hint in the
> `Exchange` step of the join, when creating the physical plan (Possibly
> because the hint would require the stream-stream join to also support
> Broadcast join with SQL)
>
>
> Notice that the Query AST (Abstract Syntax Tree) has the broadcast
> hint parsed from the query:
>
>
> ```sql
>
> ...
>
> ...
>
> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0]
> options:[gpla)
>
> ...
>
> ```
>
>
> However, the Flink optimizer ignores the hint and still represents the
> join as a regular `hash` join in the `Exchange` step:
>
>
> ```sql
>
> ...
>
> ...
>
> :- Exchange(distribution=[hash[shop_id, join_key]])
>
> ...
>
> 

Re: Redis as a State Backend

2024-01-31 Thread David Anderson
When it comes to decoupling the state store from Flink, I suggest taking a
look at FlinkNDB, which is an experimental state backend for Flink that
puts the state into an external distributed database. There's a Flink
Forward talk [1] and a master's thesis [2] available.

[1] https://www.youtube.com/watch?v=ZWq_TzsXssM
[2] http://www.diva-portal.org/smash/get/diva2:1536373/FULLTEXT01.pdf






On Wed, Jan 31, 2024 at 12:30 AM Chirag Dewan via user <
user@flink.apache.org> wrote:

> Thanks Zakelly and Junrui.
>
> I was actually exploring RocksDB as a state backend and I thought maybe
> Redis could offer more features as a state backend. For e.g. maybe state
> sharing between operators, geo-red of state, partitioning etc. I understand
> these are not native use cases for Flink, but maybe something that can be
> considered in future. Maybe even as an off the shelf state backend
> framework which allows embedding any other cache as a state backend.
>
> The links you shared are useful and will really help me. Really appreciate
> it.
>
> Thanks
>
> On Tuesday, 30 January, 2024 at 01:43:14 pm IST, Zakelly Lan <
> zakelly@gmail.com> wrote:
>
>
> And I found some previous discussion, FYI:
> 1. https://issues.apache.org/jira/browse/FLINK-3035
> 2. https://www.mail-archive.com/dev@flink.apache.org/msg10666.html
>
> Hope this helps.
>
> Best,
> Zakelly
>
> On Tue, Jan 30, 2024 at 4:08 PM Zakelly Lan  wrote:
>
> Hi Chirag
>
> That's an interesting idea. IIUC, storing key-values can be simply
> implemented for Redis, but supporting checkpoint and recovery is relatively
> challenging. Flink's checkpoint should be consistent among all stateful
> operators at the same time. For an *embedded* and *file-based* key value
> store like RocksDB, it is easier to implement by uploading files of
> specific time asynchronously.
>
> Moreover if you want to store your state basically in memory, then why not
> using the HashMapStateBackend. It saves the overhead of serialization and
> deserialization and may achieve better performance compared with Redis I
> guess.
>
>
> Best,
> Zakelly
>
> On Tue, Jan 30, 2024 at 2:15 PM Chirag Dewan via user <
> user@flink.apache.org> wrote:
>
> Hi,
>
> I was looking at the FLIP-254: Redis Streams Connector and I was
> wondering if Flink ever considered Redis as a state backend? And if yes,
> why was it discarded compared to RocksDB?
>
> If someone can point me towards any deep dives on why RocksDB is a better
> fit as a state backend, it would be helpful.
>
> Thanks,
> Chirag
>
>


Re: Continuous Reading of File using FileSource does not process the existing files in version 1.17

2024-01-08 Thread David Anderson
While the readFile method would monitor changes to existing files, it would
completely re-ingest each changed file after every change. This behavior
wasn't very user friendly.

David

On Fri, Jan 5, 2024 at 2:22 AM Martijn Visser 
wrote:

> Hi Prasanna,
>
> I think this is as expected. There is no support for monitoring
> changes to existing files.
>
> Best regards,
>
> Martijn
>
> On Fri, Jan 5, 2024 at 10:22 AM Prasanna kumar
>  wrote:
> >
> > Hi Flink Community,
> >
> >
> > I hope this email finds you well. I am currently in the process of
> migrating my Flink application from version 1.12.7 to 1.17.2 and have
> encountered a behavior issue with the FileSource while reading data from an
> S3 bucket.
> >
> >  In the previous version (1.12.7), I was utilizing the readFile method
> with the TextInputFormat to continuously monitor the S3 bucket for any
> updates or new files added at a specified time interval. The code snippet
> for this was as follows:
> >
> >
> >
> > streamExecutionEnvironment
> > .readFile(new TextInputFormat(new Path("s3://my-s3-path")),
> >   "s3://my-s3-path",
> >   FileProcessingMode.PROCESS_CONTINUOUSLY,
> >   1)
> > .setParallelism(1);
> >
> >
> >
> > Now, after migrating to Flink 1.17.2, I have switched to using the
> FileSource for continuous monitoring. The code snippet for this is as
> follows:
> >
> > FileSource fileSource = FileSource
> > .forRecordStreamFormat(new TextLineInputFormat(), new
> Path("s3://my-s3-path"))
> > .monitorContinuously(Duration.ofMillis(1))
> > .build();
> >
> > streamExecutionEnvironment
> > .fromSource(fileSource, WatermarkStrategy.noWatermarks(),
> "filesource")
> > .uid("filesource")
> > .setParallelism(1);
> >
> > While this setup successfully detects new files added to the S3 bucket,
> but it seems to be missing changes made to existing files. I am unsure if
> this is expected behavior in Flink 1.17.2 or if there is a configuration
> detail I might be overlooking.
> >
> >  Any guidance or suggestions on resolving this issue would be greatly
> appreciated.
> >
> > Thanks,
> > Prasanna
>


Re: [PyFlink] Collect multiple elements in CoProcessFunction

2023-11-18 Thread David Anderson
Hi, Alex!

Yes, in PyFlink the various flatmap and process functions are implemented
as generator functions, so they use yield to emit results.

David

On Tue, Nov 7, 2023 at 1:16 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> Java ProcessFunction API defines a clear way to collect data via the
> Collector object.
>
> PyFlink documentation also refers to the Collector [1] , but it is not
> being passed to the function and is also nowhere to be found in the pyflink
> source code.
> How can multiple elements be collected? Is "yield" the designated way to
> achieve this?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/python/pyflink.datastream.html#pyflink.datastream.CoProcessFunction.process_element1
>
> Best,
> Alex
>


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread David Anderson
Rui,

I don't have any direct experience with this topic, but given the
motivation you shared, the proposal makes sense to me. Given that the new
default feels more complex than the current behavior, if we decide to do
this I think it will be important to include the rationale you've shared in
the documentation.

David

On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi dear flink users and devs:
>
> FLIP-364[1] intends to make some improvements to restart-strategy
> and discuss updating some of the default values of exponential-delay,
> and whether exponential-delay can be used as the default restart-strategy.
> After discussing at dev mail list[2], we hope to collect more feedback
> from Flink users.
>
> # Why does the default restart-strategy need to be updated?
>
> If checkpointing is enabled, the default value is fixed-delay with
> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
> the job will restart infinitely with high frequency when a job
> continues to fail.
>
> When the Kafka cluster fails, a large number of flink jobs will be
> restarted frequently. After the kafka cluster is recovered, a large
> number of high-frequency restarts of flink jobs may cause the
> kafka cluster to avalanche again.
>
> Considering the exponential-delay as the default strategy with
> a couple of reasons:
>
> - The exponential-delay can reduce the restart frequency when
>   a job continues to fail.
> - It can restart a job quickly when a job fails occasionally.
> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>   estarting multiple jobs at the same time. It’s useful to prevent
>   avalanches.
>
> # What are the current default values[4] of exponential-delay?
>
> restart-strategy.exponential-delay.initial-backoff : 1s
> restart-strategy.exponential-delay.backoff-multiplier : 2.0
> restart-strategy.exponential-delay.jitter-factor : 0.1
> restart-strategy.exponential-delay.max-backoff : 5 min
> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>
> backoff-multiplier=2 means that the delay time of each restart
> will be doubled. The delay times are:
> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>
> The delay time is increased rapidly, it will affect the recover
> time for flink jobs.
>
> # Option improvements
>
> We think the backoff-multiplier between 1 and 2 is more sensible,
> such as:
>
> restart-strategy.exponential-delay.backoff-multiplier : 1.2
> restart-strategy.exponential-delay.max-backoff : 1 min
>
> After updating, the delay times are:
>
> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
> 22.186s, 26.623s, 31.948s, 38.337s, etc
>
> They achieve the following goals:
> - When restarts are infrequent in a short period of time, flink can
>   quickly restart the job. (For example: the retry delay time when
>   restarting 5 times is 2.073s)
> - When restarting frequently in a short period of time, flink can
>   slightly reduce the restart frequency to prevent avalanches.
>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>   and the retry delay time when retrying 20 times is 38s, which is not very
> large.)
>
> As @Mingliang Liu   mentioned at dev mail list: the
> one-size-fits-all
> default values do not exist. So our goal is that the default values
> can be suitable for most jobs.
>
> Looking forward to your thoughts and feedback, thanks~
>
> [1] https://cwiki.apache.org/confluence/x/uJqzDw
> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
> [4]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>
> Best,
> Rui
>


Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread David Anderson
Rui,

I don't have any direct experience with this topic, but given the
motivation you shared, the proposal makes sense to me. Given that the new
default feels more complex than the current behavior, if we decide to do
this I think it will be important to include the rationale you've shared in
the documentation.

David

On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi dear flink users and devs:
>
> FLIP-364[1] intends to make some improvements to restart-strategy
> and discuss updating some of the default values of exponential-delay,
> and whether exponential-delay can be used as the default restart-strategy.
> After discussing at dev mail list[2], we hope to collect more feedback
> from Flink users.
>
> # Why does the default restart-strategy need to be updated?
>
> If checkpointing is enabled, the default value is fixed-delay with
> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
> the job will restart infinitely with high frequency when a job
> continues to fail.
>
> When the Kafka cluster fails, a large number of flink jobs will be
> restarted frequently. After the kafka cluster is recovered, a large
> number of high-frequency restarts of flink jobs may cause the
> kafka cluster to avalanche again.
>
> Considering the exponential-delay as the default strategy with
> a couple of reasons:
>
> - The exponential-delay can reduce the restart frequency when
>   a job continues to fail.
> - It can restart a job quickly when a job fails occasionally.
> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>   estarting multiple jobs at the same time. It’s useful to prevent
>   avalanches.
>
> # What are the current default values[4] of exponential-delay?
>
> restart-strategy.exponential-delay.initial-backoff : 1s
> restart-strategy.exponential-delay.backoff-multiplier : 2.0
> restart-strategy.exponential-delay.jitter-factor : 0.1
> restart-strategy.exponential-delay.max-backoff : 5 min
> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>
> backoff-multiplier=2 means that the delay time of each restart
> will be doubled. The delay times are:
> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>
> The delay time is increased rapidly, it will affect the recover
> time for flink jobs.
>
> # Option improvements
>
> We think the backoff-multiplier between 1 and 2 is more sensible,
> such as:
>
> restart-strategy.exponential-delay.backoff-multiplier : 1.2
> restart-strategy.exponential-delay.max-backoff : 1 min
>
> After updating, the delay times are:
>
> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
> 22.186s, 26.623s, 31.948s, 38.337s, etc
>
> They achieve the following goals:
> - When restarts are infrequent in a short period of time, flink can
>   quickly restart the job. (For example: the retry delay time when
>   restarting 5 times is 2.073s)
> - When restarting frequently in a short period of time, flink can
>   slightly reduce the restart frequency to prevent avalanches.
>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>   and the retry delay time when retrying 20 times is 38s, which is not very
> large.)
>
> As @Mingliang Liu   mentioned at dev mail list: the
> one-size-fits-all
> default values do not exist. So our goal is that the default values
> can be suitable for most jobs.
>
> Looking forward to your thoughts and feedback, thanks~
>
> [1] https://cwiki.apache.org/confluence/x/uJqzDw
> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
> [3]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
> [4]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>
> Best,
> Rui
>


Re: Flink custom parallel data source

2023-11-03 Thread David Anderson
> As you suggested message broker below then how it is feasible in this
case?

To my mind, the idea would be to use something like a socket source for
Kafka Connect. This would give you a simple, reliable way to get the data
stored into a replayable data store. You'd then be able to start, stop, and
redeploy the Flink app without worrying about data loss because the data
reception and storage would be decoupled from the data processing.

David

On Tue, Oct 31, 2023 at 7:50 PM Kamal Mittal via user 
wrote:
>
> Thanks for sharing views.
>
>
>
> Our client supports TCP stream based traffic only which is in some
proprietary format and need to decode that. System which is accepting this
traffic is flink based and that’s why all this tried with custom data
source?
>
>
>
> As you suggested message broker below then how it is feasible in this
case?
>
>
>
> From: Alexander Fedulov 
> Sent: 01 November 2023 01:54 AM
> To: Kamal Mittal 
> Cc: user@flink.apache.org
> Subject: Re: Flink custom parallel data source
>
>
>
> Flink natively supports a pull-based model for sources, where the source
operators request data from the external system when they are ready to
process it.  Implementing a TCP server socket operator essentially creates
a push-based source, which could lead to backpressure problems if the data
ingestion rate exceeds the processing rate. You also lose any delivery
guarantees because Flink's fault tolerance model relies on having
replayable sources.
>
> Is using a message broker not feasible in your case?
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Tue, 31 Oct 2023 at 13:08, Kamal Mittal 
wrote:
>
> Hello,
>
>
>
> We are writing TCP server socket custom source function in which TCP
server socket listener will accept connections and read data.
>
> Single Custom source server socket function – ServerSocket serversocket =
new ServerSocket();
>
> Now using thread pool accept multiple connections in separate threads =
new Runnable () -> serversocket.accept();
>
> So client socket will be accepted and given to separate thread for read
data from TCP stream.
>
> Rgds,
>
> Kamal
>
> From: Alexander Fedulov 
> Sent: 31 October 2023 04:03 PM
> To: Kamal Mittal 
> Cc: user@flink.apache.org
> Subject: Re: Flink custom parallel data source
>
>
>
> Please note that SourceFunction API is deprecated and is due to be
removed, possibly in the next major version of Flink.
>
> Ideally you should not be manually spawning threads in your Flink
applications. Typically you would only perform data fetching in the sources
and do processing in the subsequent operators which you can scale
independently from the source parallelism. Can you describe what you are
trying to achieve?
>
>
>
> Best,
>
> Alexander Fedulov
>
>
>
> On Tue, 31 Oct 2023 at 07:22, Kamal Mittal via user 
wrote:
>
> Hello Community,
>
>
>
> I need to have a custom parallel data source (Flink
ParallelSourceFunction) for fetching data based on some custom logic. In
this source function, opening multiple threads via java thread pool to
distribute work further.
>
>
>
> These threads share Flink provided ‘SourceContext’ and collect records
via source_context.collect() method.
>
>
>
> Is it ok to share source context in separate threads and get data?
>
>
>
> Is there any issue for downstream operators due to above design?
>
>
>
> Rgds,
>
> Kamal

>


Re: Bloom Filter for Rocksdb

2023-10-29 Thread David Anderson
I believe bloom filters are off by default because they add overhead and
aren't always helpful. I.e., in workloads that are write heavy and have few
reads, bloom filters aren't worth the overhead.

David

On Fri, Oct 20, 2023 at 11:31 AM Mate Czagany  wrote:

> Hi,
>
> There have been no reports about setting this configuration causing any
> issues. I would guess it's off by default because it can increase the
> memory usage by an unpredictable amount.
>
> I would say feel free to enable it, from what you've said I also think
> that this would improve the performance of your jobs. But make sure to
> configure your jobs so that they will be able to accommodate the potential
> memory footprint growth. Also please read the following resources to know
> more about RocksDBs bloom filter:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
> https://rocksdb.org/blog/2014/09/12/new-bloom-filter-format.html
>
> Regards,
> Mate
>
>
> Kenan Kılıçtepe  ezt írta (időpont: 2023. okt. 20.,
> P, 15:50):
>
>> Can someone tell the exact performance effect of enabling bloom filter?
>> May enabling it cause some unpredictable performance problems?
>>
>> I read what it is and how it works and it makes sense but  I also asked
>> myself why the default value of state.backend.rocksdb.use-bloom-filter is
>> false.
>>
>> We have a 5 servers flink cluster, processing real time IoT data coming
>> from 5 million devices and for a lot of jobs, we keep different states for
>> each device.
>>
>> Sometimes we have performance issues and when I check the flamegraph on
>> the test server I always see rocksdb.get() is the blocker. I just want to
>> increase rocksdb performance.
>>
>> Thanks
>>
>>


Re: Order of Execution in KeyedBroadcastProcessFunction

2023-09-10 Thread David Anderson
In Flink, all user functions, including KeyedBroadcastProcessFunction,
are (effectively) single threaded, so the processBroadcastElement
method will run to completion before any further messages are
processed in the processElement method. (I said "effectively" because
in the case of processing time timers, Flink takes care to synchronize
the onTimer callback for you.)

On Thu, Sep 7, 2023 at 6:32 AM Anil K  wrote:
>
> Hi,
>
> I am new to flink. I am trying to write a job that updates the Keyed State 
> when a Broadcast Message is received in KeyedBroadcastProcessFunction.
> I was wondering will the ctx.applyToKeyedState in the processBroadCastElement 
> will get completed before further messages are processed in the 
> processElement function?
> Can someone explain or point me to some docs on how the synchronisation of 
> Broadcasted and non Broadcasted messages works in 
> KeyedBroadcastProcessFunction?
>
> Kind regards
> Anil


Re: Job graph

2023-09-01 Thread David Anderson
This may or may not help, but you can get the execution plan from
inside the client, by doing something like this (I printed the plan to
stderr):

...
System.err.println(env.getExecutionPlan());
env.execute("my job");

The result is a JSON-encoded representation of the job graph, which
for the simple example I just tried it with, produced this output:

{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 10
  }, {
"id" : 3,
"type" : "Sink: Writer",
"pact" : "Operator",
"contents" : "Sink: Writer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  }, {
"id" : 5,
"type" : "Sink: Committer",
"pact" : "Operator",
"contents" : "Sink: Committer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 3,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}

On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis
 wrote:
>
> Hello folks,
>
> I am trying to get the job graph of a running flink job. I want to use flink 
> libraries. For now, I have the RestClusterClient and the job IDs. Tell me 
> please how to get the job graph.
>
> Thank you.


Re: Blue green deployment with Flink Apache Operator

2023-09-01 Thread David Anderson
Back in 2020, there was a Flink Forward talk [1] about how Lyft was
doing blue green deployments. Earlier (all the way back in 2017)
Drivetribe described [2] how they were doing so as well.

David

[1] https://www.youtube.com/watch?v=Hyt3YrtKQAM
[2] https://www.ververica.com/blog/drivetribe-cqrs-apache-flink

On Thu, Aug 31, 2023 at 1:21 AM Nicolas Fraison via user
 wrote:
>
> Definitely our intent is to start with an in house specific Blue Green 
> operator and once we will reach some level of confidence we will open a FLIP 
> to discuss it.
>
> Nicolas
>
> On Thu, Aug 31, 2023 at 10:12 AM Gyula Fóra  wrote:
>>
>> The main concern as we discussed in previous mailing list threads before is 
>> the general applicability of such solution:
>>
>>  - Many production jobs cannot really afford running in parallel (starting 
>> the second job while the first one is running), due to data 
>> consistency/duplications reasons
>>  - Exactly once sinks do not really support this
>>
>> So I think we should start with this maybe as an independent effort / 
>> external library and if we see that it works we could discuss it in a FLIP.
>>
>> What do you think?
>> Gyula
>>
>> On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison 
>>  wrote:
>>>
>>> Thanks Gyula for your feedback.
>>>
>>> We were also thinking of relying on such a solution, creating a dedicated 
>>> crd/operator to manage this BlueGreenFlinkDeployment.
>>> Good to hear that it could be incorporated later in the operator.
>>>
>>> Will let you know once we will have something to share with you.
>>>
>>> Nicolas
>>>
>>> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:

 Hey!

 I don't know if anyone has implemented this or not but one way to approach 
 this problem (and this may not be the right way, just an idea :) ) is to 
 add a new Custom Resource type that sits on top of the FlinkDeployment / 
 FlinkSessionJob resources and add a small controller for this.

 This new custom resource, BlueGreenDeployment, would be somewhat similar 
 to how a Replicaset vs Pod works in Kubernetes. It would create a new 
 FlinkDeployment and would delete the old one once the new reached a 
 healthy running state.

 Adding a new CR allows us to not overcomplicate the existing 
 resource/controller loop but simply leverage it. If you prototype 
 something along these lines, please feel free to share and then we can 
 discuss if we want to incorporate something like this in the operator repo 
 in the future :)

 Cheers,
 Gyula

 On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user 
  wrote:
>
> Hi,
>
> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that 
> support for blue green deployment will not be supported or will not 
> happen soon.
>
> I'd like to know if some of you have built a custom mechanism on top of 
> this operator to support the blue green deployment and if you would have 
> any advice on implementing this?
>
> --
>
> Nicolas Fraison (he/him)


Re: Streaming join performance

2023-08-08 Thread David Anderson
This join optimization sounds promising, but I'm wondering why Flink
SQL isn't taking advantage of the N-Ary Stream Operator introduced in
FLIP-92 [1][2] to implement a n-way join in a single operator. Is
there something that makes this impossible/impractical?

[1] https://cwiki.apache.org/confluence/x/o4uvC
[2] https://issues.apache.org/jira/browse/FLINK-15688

On Sat, Aug 5, 2023 at 3:54 AM shuai xu  wrote:
>
> Hi, we are also paying attention to this issue and have completed the 
> validation of the minibatch join optimization including the intermediate 
> message folding you mentioned. We plan to officially release it in Flink 
> 1.19. This optimization could significantly improves the performance of join 
> operations and we are looking forward to the arrival of Flink 1.19 to help 
> solve your problem.
>
> On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
> > Hello, Flink community!
> >
> > I have some important use case for me, which shows extremely bad 
> > performance:
> >
> >   *   Streaming application
> >   *   sql table api
> >   *   10 normal joins (state should be kept forever)
> >
> > Join rules are simple, i have 10 ten tables, which have same primary key. I 
> > want to join result table from 10 pieces.
> >
> > But Flink joins sequentionally, so i have a chain with 10 joins.
> >
> > What happens if i generate update message for first table in chain:
> >
> >
> >   *   first join operator will produce 2 records: delete+insert
> >   *   second operator will double incoming messages. result=2*2=4 messages
> >   *   ...
> >   *   last operator will produce 2**10=1024 messages.
> >
> > Perfomance become extremely slow and resources are wasting away.
> >
> > I've made some simple compaction operator, which compacts records after 
> > join:
> >
> >
> >   *   join operator after receiving delete message, generates 2 messages
> >   *   after receiving insert message, generate 2 more messges
> >   *   but two of the four are compacted. So operator receives 2 
> > messages->sends 2 messages
> >
> > I wonder if this approach is right? Why it is not implemented in Flink yet?
> >
> > And i've got some problem how should i implement it on cluster, because i 
> > have changed some flink sources, which are not pluggable?
> >
> > I have modified StreamExecJoin class and added this code as a proof of 
> > concept:
> >
> >
> > final OneInputTransformation compactTransform =
> > ExecNodeUtil.createOneInputTransformation(
> > transform,
> > "compact join results",
> > "description",
> > new ProcessOperator<>(new CompactStreamOperator(equaliser)),
> > InternalTypeInfo.of(returnType),
> > leftTransform.getParallelism()
> > );
> >
> > return compactTransform;
> >
> > Transform operator:
> > @Override
> >
> > public void processElement(
> > RowData value,
> > ProcessFunction.Context ctx,
> > Collector collector) throws Exception {
> >
> > counter++;
> >
> > boolean compacted=false;
> > if (value.getRowKind()==RowKind.DELETE) {
> > value.setRowKind(RowKind.INSERT);
> > for (int i = buffer.size() - 1; i >= 0; i--) {
> > RowData x = buffer.get(i);
> > if (x.getRowKind() == RowKind.INSERT && 
> > recordEqualiser.equals(x, value)) {
> > buffer.remove(i);
> > compacted = true;
> > break;
> > }
> > }
> > value.setRowKind(RowKind.DELETE);
> > }
> >
> > if (!compacted) {
> > buffer.add(value);
> > }
> >
> > if (counter>=10)
> > {
> > buffer.forEach(collector::collect);
> > buffer.clear();
> > counter=0;
> > }
> > }
> >
> >
> > [cid:f886301c-4708-494e-a6df-d81137150774]
> >
> >
> >
> > Regards,
> > Artem
> >
> >
> >


Re: Investigating use of Custom Trigger to smooth CPU usage

2023-08-03 Thread David Anderson
There's already a built-in concept of WindowStagger that provides an
interface for accomplishing this.

It's not as well integrated (or documented) as it might be, but the
basic mechanism exists. To use it, I believe you would do something
like this:

assigner = new TumblingEventTimeWindows(Time.seconds(5), 0,
WindowStagger.RANDOM);

foo.keyBy(...)
  .window(assigner)
  ...

The different stagger strategies are documented in [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html

On Wed, Aug 2, 2023 at 7:13 PM xiangyu feng  wrote:
>
> Hi Tucker,
>
> Can you describe more about your running job and how the trigger timer is 
> configured? Also it would be better if you can attach a FlameGraph to show 
> the CPU usage when the timer is triggered.
>
> Best,
> Xiangyu
>
> Tucker Harvey via user  于2023年8月1日周二 05:51写道:
>>
>> Hello Flink community! My team is trying to optimize CPU usage on a running 
>> job, and we're exploring the option of offsetting the trigger time for 
>> smoother CPU patterns. Since adjusting the window will compromise job 
>> correctness, we plan to pursue a custom trigger implementation. We were 
>> curious if the community had any thoughts or insights on this issue.
>>
>>


Re: High Start-Delay And Aligned Checkpointing Causing Timeout.

2023-06-02 Thread David Anderson
I'm not 100% certain what "alignment duration" is measuring exactly in
the context of unaligned checkpoints -- however, even with unaligned
checkpointing each operator still has to wait until all of the
barriers are present in the operator's input queues. It doesn't have
to wait for the barriers to be aligned, but they do have to all be
present in order for the operator to know which in-flight messages to
include in the checkpoint.

David

On Fri, Jun 2, 2023 at 12:38 PM Pritam Agarwala
 wrote:
>
> Hi All,
>
>
> I have enabled checkpointing in production. In peak hours, checkpointing is 
> falling due to timeout. When checked in sub-task level  , I could see it is 
> taking some time for alignment and there is also a start delay.  I think the 
> start delay will be due to back-pressure.
> As a first step , I have enabled unalignment checkpointing in my test env. 
> But I could still see it  is taking time for alignment.  Is this expected 
> behaviour. ??
>
> Screenshot for your reference:
>   In test environment:
>
>
> In prod environment :
>
>
>
>
>
>
> Thanks & Regards,
> Pritam
>


Re: Watermarks lagging behind events that generate them

2023-03-16 Thread David Anderson
Watermarks are not included in checkpoints or savepoints.

See [1] for some head-scratchingly-complicated info about restarts,
watermarks, and unaligned checkpoints.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/#interplay-with-watermarks

On Wed, Mar 15, 2023 at 2:54 AM Shammon FY  wrote:
>
> Hi Alexis
>
> Currently I think checkpoint and savepoint will not save watermarks. I think 
> how to deal with watermarks at checkpoint/savepoint is a good question, we 
> can discuss this in dev mail list
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 15, 2023 at 4:22 PM Alexis Sarda-Espinosa 
>  wrote:
>>
>> Hi Shammon, thanks for the info. I was hoping the savepoint would include 
>> the watermark, but I'm not sure that would make sense in every scenario.
>>
>> Regards,
>> Alexis.
>>
>> Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY :
>>>
>>> Hi Alexis
>>>
>>> In some watermark generators such as BoundedOutOfOrderTimestamps, the 
>>> timestamp of watermark will be reset to Long.MIN_VALUE if the subtask is 
>>> restarted and no event from source is processed.
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa 
>>>  wrote:
>>>>
>>>> Hi David, thanks for the answer. One follow-up question: will the 
>>>> watermark be reset to Long.MIN_VALUE every time I restart a job with 
>>>> savepoint?
>>>>
>>>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson 
>>>> :
>>>>>
>>>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>>>> they were designed that way, but that is how they are implemented.
>>>>> Windows maintain this contract by emitting all of their results before
>>>>> forwarding the watermark that triggered the results.
>>>>>
>>>>> David
>>>>>
>>>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>>>>> >
>>>>> > Hi Alexis
>>>>> >
>>>>> > Do you use both event-time watermark generator and TimerService for 
>>>>> > processing time in your job? Maybe you can try using event-time 
>>>>> > watermark first.
>>>>> >
>>>>> > Best,
>>>>> > Shammon.FY
>>>>> >
>>>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa 
>>>>> >  wrote:
>>>>> >>
>>>>> >> Hello,
>>>>> >>
>>>>> >> I recently ran into a weird issue with a streaming job in Flink 
>>>>> >> 1.16.1. One of my functions (KeyedProcessFunction) has been using 
>>>>> >> processing time timers. I now want to execute the same job based on a 
>>>>> >> historical data dump, so I had to adjust the logic to use event time 
>>>>> >> timers in that case (and did not use BATCH execution mode). Since my 
>>>>> >> data has a timestamp field, I implemented a custom WatermarkGenerator 
>>>>> >> that always emits a watermark with that timestamp in the onEvent 
>>>>> >> callback, and does nothing in the onPeriodicEmit callback.
>>>>> >>
>>>>> >> My problem is that, sometimes, the very first time my function calls 
>>>>> >> TimerService.currentWatermark, the value is Long.MIN_VALUE, which 
>>>>> >> causes some false triggers when the first watermark actually arrives.
>>>>> >>
>>>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a 
>>>>> >> watermark, it would be sent before the corresponding event, but maybe 
>>>>> >> this is not always the case?
>>>>> >>
>>>>> >> In case it's relevant, a brief overview of my job's topology:
>>>>> >>
>>>>> >> Source1 -> Broadcast
>>>>> >>
>>>>> >> Source2 ->
>>>>> >>   keyBy ->
>>>>> >>   connect(Broadcast) ->
>>>>> >>   process ->
>>>>> >>   filter ->
>>>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>>>> >>   keyBy ->
>>>>> >>   process // function that uses timers
>>>>> >>
>>>>> >> Regards,
>>>>> >> Alexis.


Re: is there any detrimental side-effect if i set the max parallelism as 32768

2023-03-13 Thread David Anderson
I believe there is some noticeable overhead if you are using the
heap-based state backend, but with RocksDB I think the difference is
negligible.

David

On Tue, Mar 7, 2023 at 11:10 PM Hangxiang Yu  wrote:
>
> Hi, Tony.
> "be detrimental to performance" means that some extra space overhead of the 
> field of the key-group may influence performance.
> As we know, Flink will write the key group as the prefix of the key to speed 
> up rescaling.
> So the format will be like: key group | key len | key | ..
> You could check the relationship between max parallelism and bytes of key 
> group as below:
> --
> max parallelism   bytes of key group
>1281
>   32768 2
> --
> So I think the cost will be very small if the real key length >> 2 bytes.
>
> On Wed, Mar 8, 2023 at 1:06 PM Tony Wei  wrote:
>>
>> Hi experts,
>>
>>> Setting the maximum parallelism to a very large value can be detrimental to 
>>> performance because some state backends have to keep internal data 
>>> structures that scale with the number of key-groups (which are the internal 
>>> implementation mechanism for rescalable state).
>>>
>>> Changing the maximum parallelism explicitly when recovery from original job 
>>> will lead to state incompatibility.
>>
>>
>> I read the section above from Flink official document [1], and I'm wondering 
>> what the detail is regarding to the side-effect.
>>
>> Suppose that I have a Flink SQL job with large state, large parallelism and 
>> using RocksDB as my state backend.
>> I would like to set the max parallelism as 32768, so that I don't bother if 
>> the max parallelism can be divided by the parallelism whenever I want to 
>> scale my job,
>> because the number of key groups will not differ too much between each 
>> subtask.
>>
>> I'm wondering if this is a good practice, because based on the official 
>> document it is not recommended actually.
>> If possible, I would like to know the detail about this side-effect. Which 
>> state backend will have this issue? and Why?
>> Please give me an advice. Thanks in advance.
>>
>> Best regards,
>> Tony Wei
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism
>
>
>
> --
> Best,
> Hangxiang.


Re: Watermarks lagging behind events that generate them

2023-03-13 Thread David Anderson
Watermarks always follow the corresponding event(s). I'm not sure why
they were designed that way, but that is how they are implemented.
Windows maintain this contract by emitting all of their results before
forwarding the watermark that triggered the results.

David

On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>
> Hi Alexis
>
> Do you use both event-time watermark generator and TimerService for 
> processing time in your job? Maybe you can try using event-time watermark 
> first.
>
> Best,
> Shammon.FY
>
> On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa 
>  wrote:
>>
>> Hello,
>>
>> I recently ran into a weird issue with a streaming job in Flink 1.16.1. One 
>> of my functions (KeyedProcessFunction) has been using processing time 
>> timers. I now want to execute the same job based on a historical data dump, 
>> so I had to adjust the logic to use event time timers in that case (and did 
>> not use BATCH execution mode). Since my data has a timestamp field, I 
>> implemented a custom WatermarkGenerator that always emits a watermark with 
>> that timestamp in the onEvent callback, and does nothing in the 
>> onPeriodicEmit callback.
>>
>> My problem is that, sometimes, the very first time my function calls 
>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes 
>> some false triggers when the first watermark actually arrives.
>>
>> I would have expected that, if WatermarkGenerator.onEvent emits a watermark, 
>> it would be sent before the corresponding event, but maybe this is not 
>> always the case?
>>
>> In case it's relevant, a brief overview of my job's topology:
>>
>> Source1 -> Broadcast
>>
>> Source2 ->
>>   keyBy ->
>>   connect(Broadcast) ->
>>   process ->
>>   filter ->
>>   assignTimestampsAndWatermarks -> // newly added for historical data
>>   keyBy ->
>>   process // function that uses timers
>>
>> Regards,
>> Alexis.


Re: Reusing the same OutputTag in multiple ProcessFunctions

2023-02-14 Thread David Anderson
I can't respond to the python-specific aspects of this situation, but
I don't believe you need to use the same OutputTag instance. It should
be enough that the various tag instances involved all have the same
String id. (That's why the id exists.)

David

On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto  wrote:
>
> Hi,
>
> I'm attempting to implement a generic error handling ProcessFunction in 
> pyflink.  Given a user provided function, I want to invoke that function for 
> each element in the DataStream, catch any errors thrown by the function, 
> convert those errors into events, and then emit those event errors to a 
> different DataStream sink.
>
> I'm trying to do this by reusing the same OutputTag in each of my 
> ProcessFunctions.
> However, this does not work, I believe because I am using the same 
> error_output_tag in two different functions, which causes it to have a 
> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance to 
> be un-pickleable.
>
> Here's a standalone example of the failure using the canonical word_count 
> example.
>
> My question is.
> 1. Does Flink support re-use of the same OutputTag instance in multiple 
> ProcessFunctions?
> 2. If so, is my problem pyflink / python / pickle specific?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Re: Non-temporal watermarks

2023-02-03 Thread David Anderson
DataStream time windows and Flink SQL make assumptions about the timestamps
and watermarks being milliseconds since the epoch. But the underlying
machinery does not. So if you limit yourself to process functions (for
example), then nothing will assign any semantics to the time values.

David

On Thu, Feb 2, 2023 at 2:43 AM James Sandys-Lumsdaine 
wrote:

> I can describe a use that has been successful for me. We have a Flink
> workflow that calculates reports over many days and have it currently set
> up to recompute the last 10 days or so when recovering this "deep history"
> from our databases and then switches over to live flow to process all
> subsequent update events. I wrote this before the days of the HyrbidSource
> so it is literally a JDBC data source that queries state for the last 10
> days and that stream is merged with a "live" stream from a db poller or
> Kafka stream.
>
> In answer to your question, during recovery I have all state for the old
> business days sent with a timestamp of that business date e.g. new
> DateTime(2023, 1, 15, 0, 0, 0, UTC).getMillis() for any data associated
> with the 15th Jan 2023. Once the data source has emitted all the state
> for that date, it then emits a watermark with exactly the same timestamp as
> it is communicating downstream that all the data has been sent for that
> date. Then moves onto the next date emitting that state.
>
> When my system starts up it records the current datetime and treats all
> data retrieved before that timestamp as being recovered state, and all data
> receieved from the live pollers/Kafka to be after that cut-off point. The
> live sources emit objects timestamped with the current time and
> periodically emit a watermark to make forward progress. I'm simplifying
> here but you get the point.
>
> This pattern is useful for me because my keyed process functions are able
> to register timers to process all the data for an historic date at once -
> it won't need to fire on each message received or try to compute with
> missing data, but instead runs once all the data has been received for a
> date from all the sources. (The time is only triggered when the watermark
> is reached and that required all sources to have reached at least that
> point in the recovery). Once we have reached the startup datetime watermark
> the system seamlessly flips into live processing mode. The watermarks still
> trigger my timers but now we are processing the last ~1 minute of batched
> data.
>
> So logically the meaning of a timestamp and watermark in my system always
> represents a forward moving moment in time - it is just that it means an
> historic date for data during recovery from the databases and then a
> current timestamp when the system is processing live data.
>
> Hope that gives you some ideas and help.
>
> James.
> --
> *From:* Gen Luo 
> *Sent:* 02 February 2023 09:52
> *To:* Jan Lukavský 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Non-temporal watermarks
>
> Hi,
>
> This is an interesting topic. I suppose the watermark is defined based on
> the event time since it's mainly used, or designed, for the event time
> processing. Flink provides the event time processing mechanism because it's
> widely needed. Every event has its event time and we usually need to group
> or order by the event time. On the other hand, this also means that we can
> process events from different sources as the event time is naturally of the
> same scale.
>
> However, just as you say, technically speaking the event timestamp can be
> replaced with any other meaningful number (or event a comparable), and the
> (event time) watermark should change accordingly. If we promise this field
> and its watermark of all sources are of the same scale, we can process the
> data/event from the sources together with it just like the event time. As
> the event time processing and event time timer service doesn't rely on the
> actual time point or duration, I suppose this can be implemented by
> defining it as the event time, if it contains only positive numbers.
>
>
> On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský  wrote:
>
> Hi,
>
> I will not speak about details related to Flink specifically, the
> concept of watermarks is more abstract, so I'll leave implementation
> details aside.
>
> Speaking generally, yes, there is a set of requirements that must be met
> in order to be able to generate a system that uses watermarks.
>
> The primary question is what are watermarks used for? The answer is - we
> need watermarks to be able to define a partially stable order of
> _events_. Event is an immutable piece of data that can be _observed_
> (i.e. processed) with various consumer-dependent delays (two consumers
> of the event can see the event at different processing times), or a
> specific (local) timestamp. Generally an event tells us that something,
> somewhere happened at given local timestamp.
>
> Watermarks create markers in processing time of each 

Re: Failing to build Flink 1.9 using Scala 2.12

2022-12-24 Thread David Anderson
Flink only officially supports Scala 2.12 up to 2.12.7 -- you are running
into the binary compatibility check, intended to keep you from unknowingly
running into problems. You can disable japicmp, and everything will
hopefully work:

mvn clean install -DskipTests -Djapicmp.skip -Dscala-2.12
-Dscala.version=2.12.16

See https://issues.apache.org/jira/browse/FLINK-12461 for more info.

Regards,
David

On Fri, Dec 23, 2022 at 6:55 AM Milind Vaidya  wrote:

> Hi
>
> First of all, I do understand that I am using a very old version. But as
> of now the team can not help it. We need to move to Scala 2.12 first and
> then we will move forward towards the latest version of Flink.
>
> I have added following things to main pom.xml
>
>  2.11.12
> 2.11
>
> Under Scala-2.11
>
> removed following lines
>
>!scala-2.12
> 
>
> Added
>false
>
>
> Under Scala-2.12
>
> Added
>
> false
>
> Command :
>
> mvn clean install -DskipTests -Dscala-2.12 -Dscala.version=2.12.16
>
> Error :
>
> Failed to execute goal
> com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on
> project flink-metrics-core: Execution default of goal
> com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp failed: A
> required class was missing while executing
> com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp:
> javax/xml/bind/JAXBException
>
>
> Is there any set of instructions to follow while compiling a flink version
> for Scala 2.12 ?
>
>
>


Re: Support for higher-than-millisecond resolution event-time timestamps

2022-11-25 Thread David Anderson
When it comes to event time processing and watermarks, I believe that if
you stick to the lower level APIs, then the milliseconds assumption is
indeed arbitrary, but at higher levels that assumption is baked in.

In other words, that rules out using Flink SQL, or things
like TumblingEventTimeWindows.of(Time.milliseconds(10)). It might not be
difficult to build something to work around those assumptions, but I
haven't given it much thought. But if you stick to KeyedProcessFunction, it
should be fine.

Best,
David

On Fri, Nov 25, 2022 at 5:32 AM Salva Alcántara 
wrote:

> As mentioned in the docs
> 
> :
>
> > Attention: Both timestamps and watermarks are specified as milliseconds
> since the Java epoch of 1970-01-01T00:00:00Z.
>
> Are there any plans for supporting higher time resolutions?
>
> Also, internally, Flink uses the `long` type for the timestamps, so maybe
> the milliseconds assumption is arbitrary and things would actually work
> just fine for higher resolutions provided that they fit into the long type
> (???). I found this SO post:
>
>
> https://stackoverflow.com/questions/54402759/streaming-data-processing-and-nano-second-time-resolution
>
> which touches upon this but it's a bit old already and there seems to be
> no clear answer in the end. So maybe we could touch base on it.
>
> Regards,
>
> Salva
>


Re: question about Async IO

2022-11-04 Thread David Anderson
Yes, that will work as you expect. So long as you don't put another shuffle
or rebalance in between, the keyed partitioning that's already in place
will carry through the async i/o operator, and beyond. In most cases you
can even use reinterpretAsKeyedStream on the output (so long as you haven't
done something to emit records that break the contract it expects).

David

On Wed, Nov 2, 2022 at 4:19 PM Galen Warren  wrote:

> Thanks, that makes sense and matches my understanding of how it works.
>
> In my case, I don't actually need access to keyed *state*; I just want to
> make sure that all elements with the same key are routed to the same
> instance of the async function. (Without going into too much detail, the
> reason for this is that I want to invoke async operations for the same key
> in sequence, i.e. not have two in-flight async operations for the same key
> at the same time. I can accomplish this with a local map of in-flight async
> operations, in the function, so long as all inputs with the same keys get
> routed to the same instance of the function.)
>
> So far as I can tell, if I create a keyed stream and then use it as an
> input to an async function, the keys will be distributed across the async
> function instances the way I want, even if keyed state is inaccessible.
> Anyway, that's what I'm looking to confirm.
>
> On Wed, Nov 2, 2022 at 5:14 AM Filip Karnicki 
> wrote:
>
>> Hi Galen
>>
>> I was thinking about the same thing recently and reached a point where I
>> see that async io does not have access to the keyed state because:
>>
>> "* State related apis in
>> [[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
>>  * yet because the key may get changed while accessing states in the
>> working thread."
>>
>> I don't think that the key can change at any random time here, because of
>>
>> "A common confusion that we want to explicitly point out here is that the
>> AsyncFunction is not called in a multi-threaded fashion. There exists only
>> one instance of the AsyncFunction and it is called sequentially for each
>> record in the respective partition of the stream"
>> From:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
>>
>> So if the RichAsyncFunctionRuntimeContext had access to a
>> KeyedStateBackend and since it's basically a facade on top of
>> RuntimeContext. we could (maybe) change the method signature for something
>> like getState to include the key, and run
>> keyedStateBackend.setCurrentKey(key) before continuing with anything else.
>>
>>
>> Anyone - please stop me if I'm talking nonsense
>>
>>
>> On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Hi Galen,
>>> i will tell from my experience as a Flink user and developer of Flink
>>> jobs.
>>>
>>>
>>>
>>> *"if the input to an AsyncFunction is a keyed stream, can I assume that
>>> all input elements with the same key will be handled by the same instance
>>> of the async operator"*
>>> From what I know (and someone can correct me if I'm wrong) this is
>>> possible. However you have to make sure that there is no Re-balance or
>>> re-shuffle between those operators. For example operators after first
>>> .keyBy(..) call must have same parallelism level.
>>>
>>> Regarding:
>>> " I have a situation where I would like to enforce that async operations
>>> associated with a particular key happen sequentially,"
>>>
>>> This is also possible as far as I know. In fact I was implementing
>>> streaming pipeline with similar requirements like
>>> *"maintaining order of events withing keyBy group across multiple
>>> operators including Async operators". *
>>> We achieved that with same thing -> making sure that all operators in
>>> entire pipeline except Source and Sink had exact same parallelism level.
>>> Additional thing to remember here is that if you call .keyBy(...) again
>>> but with different key extractor, then original order might not be
>>> preserved since keyBy will execute re-shuffle/re-balance.
>>>
>>> We were also using reinterpretAsKeyedStream feature [1] after async
>>> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
>>> .keyBy always has negative impact on performance.
>>> With reinterpretAsKeyedStream we were able to use keyed operators with
>>> access to keyed state after Async operators.
>>>
>>> Hope that helped.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>>>
>>> Regards,
>>> Krzysztof Chmielewski
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> pt., 14 paź 2022 o 19:11 Galen Warren 
>>> napisał(a):
>>>
 I have a question about Flink's Async IO support: Async I/O | Apache
 Flink
 
 .

 I understand that access to state is not supported in an AsyncFunction.
 However, if the input to an AsyncFunction is a keyed 

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
As for your original question, the documentation states that a temporal
table function can only be registered via the Table API, and I believe this
is true.

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson  wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>> tEnv.registerFunction(
>> "dimension_table1",
>> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>> D1.col1 AS A,
>> D1.col2 AS B,
>> FROM
>> fact_table,
>> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>> fact_table.dim1 = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin
>>   extends CoProcessFunction {
>> private static final long serialVersionUID = 1L;
>>
>> protected transient ValueState dimState;
>>
>> @Override
>> public void processElement1(IN1 value, Context ctx, Collector
>> out) throws Exception {
>>   Dimension dim = dimState.value();
>>   if (dim == null) {
>> return;
>>   }
>>   out.collect(join(value, dim));
>> }
>>
>> abstract OUT join(IN1 value, Dimension dim);
>>
>> @Override
>> public void processElement2(Dimension value, Context ctx,
>> Collector out) throws Exception {
>>   dimState.update(value);
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   super.open(parameters);
>>   ValueStateDescriptor dimStateDesc =
>>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>>   this.dimState = getRuntimeContext().getState(dimStateDesc);
>> }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime =

Re: Can temporal table functions only be registered using the table API?

2022-10-06 Thread David Anderson
Salva,

Have you tried doing an AS OF style processing time temporal join? I know
the documentation leads one to believe this isn't supported, but I think it
actually works. I'm basing this on this comment [1] in the code for
the TemporalProcessTimeJoinOperator:

The operator to temporal join a stream on processing time.

For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime))
> and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same
> processing-time operator implementation, the differences between them are:
> (1) The temporal TableFunction join only supports single column in primary
> key but temporal table join supports arbitrary columns in primary key. (2)
> The temporal TableFunction join only supports inner join, temporal table
> join supports both inner join and left outer join.


[1]
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38

Regards,
David

On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara 
wrote:

> I've found more examples here:
>
> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>
> where a fact table is enriched using several dimension tables, but again
> the temporal table functions are registered using Table API like so:
>
> ```java
> tEnv.registerFunction(
> "dimension_table1",
> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> "id"));```
>
> It's not exactly the same application, since this example covers a lookup
> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> table functions:
>
> ```
> SELECT
> D1.col1 AS A,
> D1.col2 AS B,
> FROM
> fact_table,
> LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> WHERE
> fact_table.dim1 = D1.id
> ```
>
> In particular, this produces a job which is equivalent to
>
> ```
>   private abstract static class AbstractFactDimTableJoin
>   extends CoProcessFunction {
> private static final long serialVersionUID = 1L;
>
> protected transient ValueState dimState;
>
> @Override
> public void processElement1(IN1 value, Context ctx, Collector
> out) throws Exception {
>   Dimension dim = dimState.value();
>   if (dim == null) {
> return;
>   }
>   out.collect(join(value, dim));
> }
>
> abstract OUT join(IN1 value, Dimension dim);
>
> @Override
> public void processElement2(Dimension value, Context ctx,
> Collector out) throws Exception {
>   dimState.update(value);
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   ValueStateDescriptor dimStateDesc =
>   new ValueStateDescriptor<>("dimstate", Dimension.class);
>   this.dimState = getRuntimeContext().getState(dimStateDesc);
> }
>   }
> ```
>
> I'm basically interested in rewriting these types of DIY joins (based on
> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> possible, otherwise I would like to know which limitations there are.
>
> Regards,
>
> Salva
>
> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara 
> wrote:
>
>> By looking at the docs for older versions of Flink, e.g.,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>
>> it seems that it's possible to rewrite this query
>>
>> ```
>> SELECT
>>   o.amount * r.rate AS amount
>> FROM
>>   Orders AS o,
>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>> WHERE r.currency = o.currency
>> ```
>>
>> as
>>
>> ```
>> SELECT
>>   SUM(o.amount * r.rate) AS amount
>> FROM Orders AS o,
>>   RatesHistory AS r
>> WHERE r.currency = o.currency
>> AND r.rowtime = (
>>   SELECT MAX(rowtime)
>>   FROM RatesHistory AS r2
>>   WHERE r2.currency = o.currency
>>   AND r2.rowtime <= o.rowtime);
>> ```
>>
>> This would be a way to accomplish this task in SQL without using a
>> temporal table function.
>>
>> Would this rewrite be equivalent in terms of the final generated job?
>> Obviously I very much prefer the LATERAL TABLE query but this requires
>> using a temporal table function which can only be registered using the
>> Table API (apparently).
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara 
>> wrote:
>>
>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>
>>> This case seems to require a different syntax based on LATERAL TABLE and
>>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs
>>> too, it seems that temporal table functions can only be registered via the
>>> table API. Am I missing/misunderstanding something?
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser 
>>> wrote:
>>>
 Hi Salva,

 The examples for temporal table 

Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread David Anderson
I want to clarify one point here, which is that modifying jobs written in
Scala to use Flink's Java API does not require porting them to Java. I can
readily understand why folks using Scala might rather use Java 17 than Java
11, but sticking to Scala will remain an option even if Flink's Scala API
goes away.

For more on this, see [1] and some of the examples it points to, such as
those in [2].

[1] https://flink.apache.org/2022/02/22/scala-free.html
[2] https://github.com/sjwiesman/flink-scala-3

On Tue, Oct 4, 2022 at 6:16 PM Clayton Wohl  wrote:

> +1
>
> At my employer, we maintain several Flink jobs in Scala. We've been
> writing newer jobs in Java, and we'd be fine with porting our Scala jobs
> over to the Java API.
>
> I'd like to request Java 17 support. Specifically, Java records is a
> feature our Flink code would use a lot of and make the Java syntax much
> nicer.
>


Re: Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-27 Thread David Anderson
Logically it would make sense to be able to initialize BroadcastState in
the open method of a BroadcastProcessFunction, but in practice I don't
believe it can be done -- because the necessary Context isn't made
available.

Perhaps you could use the State Processor API to bootstrap some state into
the broadcast state.

David

On Mon, Sep 26, 2022 at 6:07 PM alfredo.vasquez.spglobal.com via user <
user@flink.apache.org> wrote:

> Hello community.
>
>
>
> Currently we have a BroadcastProcessFunction implementation that is
> storing the broadcast state using a MapStateDescriptor.
>
> I have a use case that needs to load the BroadcastState to perform some
> operation before receiving any processElement or processBroadcastElement
> message.
>
>
>
> Is there a way to load the BroadcastState on BroadcastProcessFunction
>  instantiation, overriding open(Configuration parameters) method or by
> overriding some other callback function?
>
>
>
> Kind regards,
>
> --
>
> The information contained in this message is intended only for the
> recipient, and may be a confidential attorney-client communication or may
> otherwise be privileged and confidential and protected from disclosure. If
> the reader of this message is not the intended recipient, or an employee or
> agent responsible for delivering this message to the intended recipient,
> please be aware that any dissemination or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please immediately notify us by replying to the message and deleting it
> from your computer. S Global Inc. reserves the right, subject to
> applicable local law, to monitor, review and process the content of any
> electronic message or information sent to or from S Global Inc. e-mail
> addresses without informing the sender or recipient of the message. By
> sending electronic message or information to S Global Inc. e-mail
> addresses you, as the sender, are consenting to S Global Inc. processing
> any of your personal data therein.
>


Re: A question about restoring state with an additional variable with kryo

2022-09-18 Thread David Anderson
Vishal,

If you decide you can't live with dropping that state, [1] is a complete
example showing how to migrate from Kryo by using the state processor API.

David

[1]
https://www.docs.immerok.cloud/docs/cookbook/migrating-state-away-from-kryo/


On Fri, Sep 16, 2022 at 8:32 AM Vishal Santoshi 
wrote:

> Thank you for the clarification. I thought so to,
>
> Unfortunately my state are generics based and those are definitely not
> treated as a POJO , though it has all the constructs ( no arg constructor,
> getters/setters etc ). I will likely take an at least once hit by
>
> Changing the uid of that specific Operator, and restart with Allow
> non-restored state ... This will ignore state that cannot be restored (
> for the previous uid ) , construct state for the new uid  and not affect
> other operators ( including the kafka consumer operators ). I can live with
> it, I think.
>
> On Fri, Sep 16, 2022 at 2:55 AM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hi Vishal,
>>
>>
>>
>> Good news and bad news :
>>
>>
>>
>>- Bad: Kryo serializer cannot be used for schema evolution, see [1]
>>- Good: not all is lost here,
>>   - If you happen to have state that you cannot afford to lose, you
>>   can transcode it by means of the savepoint API [2],
>>   - However, this takes quite some effort
>>- In general, if you ever plan to migrate/extend your schemas, choose
>>a data type that supports schema migration [1],
>>- In your case, PoJo types would be the closest to your original
>>implementation
>>- You can disable Kryo in configuration to avoid this situation in
>>the future, by the way,
>>- Kryo serializer is quite slow compared to the other options and I
>>believe it is only there as a (emergency) fallback solution: [3]
>>
>>
>>
>> Feel free to ask for clarification 
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#kryo-cannot-be-used-for-schema-evolution
>>
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>
>> [3]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>
>>
>>
>>
>>
>>
>>
>> *From:* Vishal Santoshi 
>> *Sent:* Friday, September 16, 2022 1:17 AM
>> *To:* user 
>> *Subject:* Re: A question about restoring state with an additional
>> variable with kryo
>>
>>
>>
>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>
>>
>>
>> The exception thrown is as follows. I realize that it is trying to read
>> the long value. How do I signal to kryo that it is OK and that he object
>> can have a default value
>>
>>
>>
>> Caused by: java.io.EOFException: No more bytes left.
>>
>> at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
>> .require(NoFetchingInput.java:80)
>>
>> at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>
>> at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>
>> at com.esotericsoftware.kryo.serializers.
>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:133)
>>
>> at com.esotericsoftware.kryo.serializers.
>> DefaultSerializers$LongSerializer.read(DefaultSerializers.java:123)
>>
>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
>>
>> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:113)
>>
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>> .deserialize(KryoSerializer.java:354)
>>
>> at org.apache.flink.api.common.typeutils.CompositeSerializer
>> .deserialize(CompositeSerializer.java:156)
>>
>> at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(
>> RocksDBValueState.java:89)
>>
>>
>>
>> On Thu, Sep 15, 2022 at 7:10 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>> << How do I make sure that when reconstituting the state, kryo does not
>> complain? It tries to map the previous state to the new definition of Class
>> A and complains that it cannot read the value for `String b`.
>>
>>
>>
>> >> How do I make sure that when reconstituting the state, kryo does not
>> complain? It tries to map the previous state to the new definition of Class
>> A and complains that it cannot read the value for `long b`.
>>
>>
>>
>> Sorry a typo
>>
>>
>>
>>
>>
>> On Thu, Sep 15, 2022 at 7:04 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>> I have state in rocksDB that represents say
>>
>>
>>
>> class A {
>>
>>   String a
>>
>> }
>>
>>
>>
>> I now change my class and add another variable
>>
>>
>>
>>
>> Class A {
>>
>>   String a;
>>
>>   long b = 0;
>>
>> }
>>
>>
>>
>> How do I make sure that when reconstituting the state, kryo does not
>> complain? It tries to map 

Re: Mixed up session aggregations for same key

2022-09-07 Thread David Anderson
The way that Flink handles session windows is that every new event is
initially assigned to its own session window, and then overlapping sessions
are merged. I imagine this is why you are seeing so many calls
to createAccumulator.

This implementation choice is deeply embedded in the code; I don't think it
can be avoided.

If you can afford to wait until a session ends to emit the session start
event, then you will only be reporting once for each session. Another
solution might be to implement your own windowing using a process function
-- but if you are using event time logic, and if the events can be
processed out of order, I suspect it would be difficult to do much better.

David

On Mon, Sep 5, 2022 at 9:45 PM Kristinn Danielsson via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> I'm trying to migrate a KafkaStreams application to Flink (using
> DataStream API).
>
> The application consumes a high traffic (millions of events per second)
> Kafka
>
> topic and collects events into sessions keyed by id. To reduce the load on
>
> subsequent processing steps I want to output one event on session start
> and one
>
> event on session end. So, I set up a pipeline which keys the stream by id,
>
> aggregates the events over a event time session window with a gap of 4
> seconds.
>
> I also implemented a custom trigger to trigger when the first event
>
> arrives in a window.
>
>
>
> When I run this pipeline I somtimes observe that I get multiple calls to
> the
>
> aggregator's "createAccumulator" method for a given session id, and
> therefore I
>
> also get duplicate session start and session end events for the session id.
>
> So it looks to me that the Flink is collecting the events into multiple
> sessions
>
> even if they have the same session id.
>
>
>
> Examples:
>
>
>
> Input events:
>
> Event timestamp Id
>
> 2022-09-06 08:00:00 ABC
>
> 2022-09-06 08:00:01 ABC
>
> 2022-09-06 08:00:02 ABC
>
> 2022-09-06 08:00:03 ABC
>
> 2022-09-06 08:00:04 ABC
>
> 2022-09-06 08:00:05 ABC
>
>
>
> Problem 1:
>
> Output events:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC End
>
> 2022-09-06 08:00:04 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
> Problem 2:
>
> Output events:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC Start
>
> 2022-09-06 08:00:04 ABC End
>
> 2022-09-06 08:00:05 ABC End
>
>
>
> Expected output:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
>
>
>
>
> Is this expected behaviour? How can I avoid getting duplicate session
> windows?
>
>
>
> Thanks for your help
>
> Kristinn
>


Re: StreamingFileSink question

2022-08-31 Thread David Anderson
If I remember correctly, there's a fix for this in Flink 1.14 (but the
feature is disabled by default in 1.14, and enabled by default in 1.15).
(I'm thinking
that execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]
takes care of this.)

With Flink 1.13 I believe you'll have to handle this yourself somehow.

Regards,
David

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled

On Wed, Aug 31, 2022 at 6:26 AM David Clutter 
wrote:

> I am using Flink 1.13.1 on AWS EMR 6.4.  I have an existing application
> using DataStream API that I would like to modify to write output to S3.  I
> am testing the StreamingFileSink with a bounded input.  I have enabled
> checkpointing.
>
> A couple questions:
> 1) When the program finishes, all the files remain .inprogress.  Is that
> "Important Note 2" in the documentation
> ?
> Is there a solution to this other than renaming the files myself?  Renaming
> the files in S3 could be costly I think.
>
> 2) If I use a deprecated method such as DataStream.writeAsText() is that
> guaranteed to write *all* the records from the stream, as long as the job
> does not fail?  I understand checkpointing will not be effective here.
>
> Thanks,
> David
>


Re: Why this example does not save anything to file?

2022-08-31 Thread David Anderson
 That's Flink fault-tolerance mechanism, see
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/[https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/][https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D]
> <https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5Bhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/%5D%5D>
>
>
> Op ma 1 aug. 2022 om 16:37 schreef mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> What's that?
>
>
>
> Sent: Monday, August 01, 2022 at 2:49 PM
> From: "Martijn Visser"  martijnvis...@apache.org][mailto:martijnvis...@apache.org[mailto:
> martijnvis...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: user@flink.apache.org[mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]
> Subject: Re: Why this example does not save anything to file?
>
> Do you have checkpointing enabled?
>
>
> Op za 30 jul. 2022 om 17:31 schreef mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]>:
>
> Thanks David but there's no problem with that (probably ";" is default
> separator).
> I can read the file and insert into "Table1" (I said that in my mail).
> Problem is to save to CSV.
>
>
>
> Sent: Saturday, July 30, 2022 at 3:33 PM
> From: "David Anderson" mailto:dander...@apache.org
> ][mailto:dander...@apache.org[mailto:dander...@apache.org]]>
> To: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]]
> Cc: "user" mailto:user@flink.apache.org][mailto:
> user@flink.apache.org[mailto:user@flink.apache.org]]>
> Subject: Re: Why this example does not save anything to file?
>
> You need to add
>
> 'csv.field-delimiter'=';'
>
> to the definition of Table1 so that the input from test4.txt can be
> correctly parsed:
>  tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
> column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' =
> 'csv', 'csv.field-delimiter'=';')");
>
> Cheers,
> David
>
> On Fri, Jul 29, 2022 at 4:15 PM mailto:pod...@gmx.com
> ][mailto:pod...@gmx.com[mailto:pod...@gmx.com]]> wrote:
>
> Hi,
>
> you mean adding:
>
> " 'csv.field-delimiter'=';', "
>
> like:
>
> tEnv.executeSql("CREATE TABLE fs_table ("
> + "column_nameA STRING, "
> + "column_nameB DOUBLE "
> + ") WITH ( "
> + "'connector'='filesystem', "
> + "'path'='file:///C:/temp/test5.txt', "
> + "'format'='csv', "
> + " 'csv.field-delimiter'=';', "
> + " 'sink.partition-commit.delay'='1 s', "
> + " 'sink.partition-commit.policy.kind'='success-file'"
> + ")");
>
> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
> I did. Nothing new - still does not work.
>
>
>
>
> Sent: Tuesday, July 26, 2022 at 4:00 PM
> From: "Gil De Grove"  gil.degr...@euranova.eu][mailto:gil.degr...@euranova.eu[mailto:
> gil.degr...@euranova.eu]]>
> To: "Weihua Hu" mailto:huweihua@gmail.com
> ][mailto:huweihua@gmail.com[mailto:huweihua@gmail.com]]>
> Cc: pod...@gmx.com[mailto:pod...@gmx.com][mailto:pod...@gmx.com[mailto:
> pod...@gmx.com]], "user"  user@flink.apache.org][mailto:user@flink.apache.org[mailto:
> user@flink.apache.org]]>
> Subject: Re: Why this example does not save anything to file?
>
> Hello,
>
> I may be really wrong with this, but from what I get in the source file,
> you are using a semi-column to separate the value.
> This probably means that you should set the csv.field-delimiter to `;` to
> make your example work properly.
>
> Have you tried with that configuration in your create table csv c

Re: How Flink knows that CREATE TABLE is sometimes about creating table, sometimes about creating file?

2022-08-29 Thread David Anderson
The role of CREATE TABLE is to provide the necessary metadata for the table
-- the location of the data, its format, etc. Executing CREATE TABLE
creates an entry in the catalog, but otherwise doesn't do anything.

In a case like this one

CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH
('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format'
= 'csv', 'csv.field-delimiter' = ';');

Flink doesn't need to know in advance if you will be reading from or
writing to Table1 (or both). The CREATE TABLE DDL statement includes enough
information to handle either case.

David

On Mon, Aug 29, 2022 at 8:15 AM  wrote:

>
> To create table from file:
>
> "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH
> ('connector' = 'filesystem', 'path' = 'file:///C:/temp/test4.txt', 'format'
> = 'csv', 'csv.field-delimiter' = ';')");
>
> To create file:
>
> "CREATE TABLE Table1 (column_name1 STRING, column_name2 DOUBLE) WITH (
> 'connector'='filesystem', 'path'='file:///C:/temp/test',  'format' = 'csv',
> 'csv.field-delimiter'= ';')");
>
> Till I add 'sink.partition-commit.delay' or something - syntax is same.
>
>


Re: Overwriting watermarks in DataStream

2022-08-21 Thread David Anderson
If you have two watermark strategies in your job, the
downstream TimestampsAndWatermarksOperator will absorb incoming watermarks
and not forward them downstream, but it will have no effect upstream.

The only exception to this is that watermarks equal to Long.MAX_VALUE are
forwarded downstream, since they are used to signal the end of input.

David

[1]
https://github.com/apache/flink/blob/release-1.15/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java#L120

On Thu, Aug 18, 2022 at 8:45 AM Peter Schrott  wrote:

> Hi there,
>
> While still struggling with events and watermarks out of order after
> sorting with a buffer process function (compare [1]) I tired to solve the
> issue by assigning a new watermark after the mentioned sorting function.
>
> The Flink docs [2] are not very certain about the impact of assigning
> additional watermarks downstream: "If the original stream had timestamps
> and/or watermarks already, the timestamp assigner overwrites them.”
>
> Does it overwrite the watermark from the point in the stream where its
> assigned or entirely also upstream?
>
> Thanks in advance
> Peter
>
> [1] https://lists.apache.org/thread/wwvpg2qk5v3lb5pxhn4hhkt0xkygg9f3
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategies
>
>


Re: flink sink kafka exactly once plz help me

2022-08-17 Thread David Anderson
You can keep the same transaction ID if you are restarting the job as a
continuation of what was running before. You need distinct IDs for
different jobs that will be running against the same kafka brokers. I think
of the transaction ID as an application identifier.

See [1] for a complete list of what needs to be done to achieve
exactly-once with Kafka.

[1]
https://www.docs.immerok.cloud/docs/cookbook/exactly-once-with-apache-kafka-and-apache-flink/

On Wed, Aug 17, 2022 at 1:02 AM kcz <573693...@qq.com> wrote:

> flink-1.14.4
> kafka-2.4.0
> setTransactionalIdPrefix has a small question, this parameter if I start
> the next job, can not use the last transaction ID, need to automatically
> generate a new one, I just tested the restart from chk, but also generated
> a new transaction ID, this will not lead to data loss, right? I would like
> to ask for guidance from the big guys, I don't know if I can guarantee no
> data loss by producing a new transaction ID through
> System.currentTimeMillis() during production. Please help me, I need to use
> this function in production environment, but I don't know how to write it
> correctly. Pushing cleanup data from kafka consumption to kafka.
>
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(BOOTSTRAP_SERVERS)
> .setKafkaProducerConfig(kafkaProducerConfig)
> 
> .setTransactionalIdPrefix(JOB_NAME+System.currentTimeMillis()+"transactional.id")
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic(SINK_TOPICS)
> .setValueSerializationSchema(new SimpleStringSchema())
> .build())
> .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
> .build();
>
>


Re: Problem with KafkaSource and watermark idleness

2022-08-15 Thread David Anderson
Yan, I've created https://issues.apache.org/jira/browse/FLINK-28975 to
track this.

Regards,
David

On Sun, Aug 14, 2022 at 6:38 PM Yan Shen  wrote:

> Thanks David,
>
> I am working on a flink datastream job that does a temporal join of two
> kafka topics based on watermarks. The problem was quite obvious when I
> enabled idleness and data flowed through much faster with different results
> even though the topics were not idle.
>
> Regards.
>
> On Mon, Aug 15, 2022 at 12:12 AM David Anderson 
> wrote:
>
>> Although I'm not very familiar with the design of the code involved, I
>> also looked at the code, and I'm inclined to agree with you that this is a
>> bug. Please do raise an issue.
>>
>> I'm wondering how you noticed this. I was thinking about how to write a
>> failing test, and I'm wondering if this has some impact that is easily
>> observed. (My first thought was "How can something this basic be broken?"
>> but then I realized that the impact is fairly subtle.)
>>
>> David
>>
>> On Sat, Aug 13, 2022 at 11:46 PM Yan Shen  wrote:
>>
>>> Hi all,
>>>
>>> After examining the source code further, I am quite sure 
>>> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
>>> does not work with FLIP-27 sources.
>>>
>>> In org.apache.flink.streaming.api.operators.SourceOperator, there are
>>> separate instances of WatermarksWithIdleness created for each split
>>> output and the main output. There is multiplexing of watermarks between
>>> split outputs but no multiplexing between split output and main output.
>>>
>>> For a source such as org.apache.flink.connector.kafka.source.KafkaSource,
>>> there is only output from splits and no output from main. Hence the main
>>> output will (after an initial timeout) be marked as idle.
>>>
>>> The implementation of WatermarksWithIdleness is such that once an
>>> output is idle, it will periodically re-mark the output as idle. Since
>>> there is no multiplexing between split outputs and main output, the idle
>>> marks coming from main output will repeatedly set the output to idle even
>>> though there are events from the splits. Result is that the entire source
>>> is repeatedly marked as idle.
>>>
>>> I currently worked around this by implementing my own WatermarksWithIdleness
>>> which will only mark the output as idle once (until it becomes active then
>>> idle again) instead of repeatedly.
>>>
>>> I will try to raise an issue on this unless somebody can point out where
>>> I went wrong with this.
>>>
>>> Thanks.
>>>
>>> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen  wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
>>>> watermark strategy like this:
>>>>
>>>>
>>>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>>>>
>>>> I noticed that after a short while all the partitions seem to be marked
>>>> as idle even though there are messages coming in.
>>>>
>>>> I made a copy of the class WatermarksWithIdleness and added some
>>>> logging to trace what is happening.
>>>>
>>>> It seems there are 2 copies of this WatermarkGenerator created per
>>>> partition. One during SourceOperator.initializeMainOutput and another
>>>> during SourceOperator.createOutputForSplits.
>>>>
>>>> When there are new messages, only the one created during
>>>> SourceOperator.createOutputForSplits has activity.
>>>>
>>>> It seems the one created during SourceOperator.initializeMainOutput
>>>> will eventually output an idle mark as it has no activity and this causes
>>>> the entire partition to be marked as idle.
>>>>
>>>> Is my understanding correct? If so, is this a feature or bug?
>>>>
>>>> Thanks.
>>>>
>>>


Re: Problem with KafkaSource and watermark idleness

2022-08-14 Thread David Anderson
Although I'm not very familiar with the design of the code involved, I also
looked at the code, and I'm inclined to agree with you that this is a bug.
Please do raise an issue.

I'm wondering how you noticed this. I was thinking about how to write a
failing test, and I'm wondering if this has some impact that is easily
observed. (My first thought was "How can something this basic be broken?"
but then I realized that the impact is fairly subtle.)

David

On Sat, Aug 13, 2022 at 11:46 PM Yan Shen  wrote:

> Hi all,
>
> After examining the source code further, I am quite sure 
> org.apache.flink.api.common.eventtime.WatermarksWithIdleness
> does not work with FLIP-27 sources.
>
> In org.apache.flink.streaming.api.operators.SourceOperator, there are
> separate instances of WatermarksWithIdleness created for each split
> output and the main output. There is multiplexing of watermarks between
> split outputs but no multiplexing between split output and main output.
>
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> there
> is only output from splits and no output from main. Hence the main output
> will (after an initial timeout) be marked as idle.
>
> The implementation of WatermarksWithIdleness is such that once an output
> is idle, it will periodically re-mark the output as idle. Since there is no
> multiplexing between split outputs and main output, the idle marks coming
> from main output will repeatedly set the output to idle even though there
> are events from the splits. Result is that the entire source is repeatedly
> marked as idle.
>
> I currently worked around this by implementing my own WatermarksWithIdleness
> which will only mark the output as idle once (until it becomes active then
> idle again) instead of repeatedly.
>
> I will try to raise an issue on this unless somebody can point out where I
> went wrong with this.
>
> Thanks.
>
> On Wed, Aug 10, 2022 at 1:26 PM Yan Shen  wrote:
>
>> Hi,
>>
>> I am using a org.apache.flink.connector.kafka.source.KafkaSource with a
>> watermark strategy like this:
>>
>>
>> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10))
>>
>> I noticed that after a short while all the partitions seem to be marked
>> as idle even though there are messages coming in.
>>
>> I made a copy of the class WatermarksWithIdleness and added some logging
>> to trace what is happening.
>>
>> It seems there are 2 copies of this WatermarkGenerator created per
>> partition. One during SourceOperator.initializeMainOutput and another
>> during SourceOperator.createOutputForSplits.
>>
>> When there are new messages, only the one created during
>> SourceOperator.createOutputForSplits has activity.
>>
>> It seems the one created during SourceOperator.initializeMainOutput will
>> eventually output an idle mark as it has no activity and this causes the
>> entire partition to be marked as idle.
>>
>> Is my understanding correct? If so, is this a feature or bug?
>>
>> Thanks.
>>
>


Re: RichFunctions, streaming, and configuration (it's always empty)

2022-08-07 Thread David Anderson
The configuration parameter passed to the open method is a legacy holdover
that has been retained to avoid breaking a public API, but is no longer
used.

Your options are to either get the global job parameters from the execution
context as described in [1], or to pass the configuration to a constructor
for your RichFunction (as described in [2]). Defining a constructor that
takes the configuration as a parameter is usually the preferred approach.

Cheers,
David

[1] https://stackoverflow.com/a/70273620/2000823
[2] https://stackoverflow.com/a/66909203/2000823

On Sat, Aug 6, 2022 at 12:52 PM Ben Edwards  wrote:

> A comment
> 
> on this issue seems to suggest that the configuration parameter is legacy
> and I should getting the configuration off the runtime context, but there
> is no way to inspect the configuration from the runtime context, unless we
> are talking about the global job parameters on the execution context. Is
> that the "right" way to pass configuration?
>
> On Sat, Aug 6, 2022 at 6:04 PM Ben Edwards  wrote:
>
>> I am new to flink and trying to write some unit tests for a RichFunction.
>> This function wants to find configuration passed in via the open method in
>> order to set up a network client. I am using a stream harness for my test,
>> customised with my own MockEnvironment + Configuration. To my surprise, the
>> configuration is always empty. So I did some reading and debugging and came
>> across this:
>>
>>
>> https://github.com/apache/flink/blob/62786320eb555e36fe9fb82168fe97855dc54056/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L100
>>
>> Given that open is defined with no ability to pass in configuration in
>> the super class, it seems like there is no code path that ever injects
>> anything other than the empty configuration, which seems to render the
>> configuration completely useless. I am sure I have missed some other
>> important part of the api and would love some insight as to how to get my
>> configuration pushed down into my function. I note that the DataSet api has
>> withParameters, but afaict there is no such api for DataStream.
>>
>> I haven't yet gone to production, but I am now worried that my entire
>> plan for configuration passing is completely suspect, and would love to
>> hear otherwise.
>>
>> Ben
>>
>


Re: Why this example does not save anything to file?

2022-07-30 Thread David Anderson
You need to add

'csv.field-delimiter'=';'

to the definition of Table1 so that the input from test4.txt can be
correctly parsed:

tEnv.executeSql("CREATE TABLE Table1 (column_name1 STRING,
column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv',
'csv.field-delimiter'=';')");

Cheers,
David

On Fri, Jul 29, 2022 at 4:15 PM  wrote:

> Hi,
>
> you mean adding:
>
> " 'csv.field-delimiter'=';', "
>
> like:
>
> tEnv.executeSql("CREATE TABLE fs_table ("
> + "column_nameA STRING, "
> + "column_nameB DOUBLE "
> + ") WITH ( "
> + "'connector'='filesystem', "
> + "'path'='file:///C:/temp/test5.txt', "
> + "'format'='csv', "
> + " 'csv.field-delimiter'=';', "
> + " 'sink.partition-commit.delay'='1 s', "
> + " 'sink.partition-commit.policy.kind'='success-file'"
> + ")");
>
> tEnv.executeSql("INSERT INTO fs_table SELECT column_name1,
> column_name2 from Table1");
>
> I did. Nothing new - still does not work.
>
>
>
> *Sent:* Tuesday, July 26, 2022 at 4:00 PM
> *From:* "Gil De Grove" 
> *To:* "Weihua Hu" 
> *Cc:* pod...@gmx.com, "user" 
> *Subject:* Re: Why this example does not save anything to file?
> Hello,
>
> I may be really wrong with this, but from what I get in the source file,
> you are using a semi-column to separate the value.
> This probably means that you should set the csv.field-delimiter to `;` to
> make your example work properly.
>
> Have you tried with that configuration in your create table csv connector
> option?
>
> Regards,
> Gil
>
> On Tue, 26 Jul 2022 at 15:40, Weihua Hu  wrote:
>
>> Hi,
>>
>> Can you see any exception logs?
>> Where is this code running? is it a standalone cluster with one
>> TaskManager?
>>
>>
>> Best,
>> Weihua
>>
>> On Tue, Jul 26, 2022 at 4:18 AM  wrote:
>>
>>> If I get it correctly this is the way how I can save to CSV:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
>>>
>>> So my code is (read from file, save to file):
>>>
>>>
>>> *package flinkCSV;*
>>>
>>> *import org.apache.flink.table.api.EnvironmentSettings; import
>>> org.apache.flink.table.api.TableEnvironment;*
>>> *public class flinkCSV {*
>>> *public static void main(String[] args) throws Exception {*
>>>
>>>
>>>
>>>
>>>
>>>
>>> * //register and create table
>>>  EnvironmentSettings settings = EnvironmentSettings
>>> .newInstance() //.inStreamingMode()
>>> .inBatchMode() .build();*
>>> *final TableEnvironment tEnv =
>>> TableEnvironment.create(settings);*
>>>
>>>
>>>
>>>
>>>
>>>
>>> * tEnv.executeSql("CREATE TABLE Table1 (column_name1
>>> STRING, column_name2 DOUBLE) WITH ('connector.type' = 'filesystem',
>>> 'connector.path' = 'file:///C:/temp/test4.txt', 'format.type' = 'csv')");
>>>  tEnv.sqlQuery("SELECT COUNT(*) AS Table1_result FROM
>>> Table1") .execute() .print(); *
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *tEnv.executeSql("CREATE TABLE fs_table (" + "
>>>  column_nameA STRING, " + "column_nameB DOUBLE "
>>>  + ") WITH ( \n" + "
>>>  'connector'='filesystem', " + "
>>>  'path'='file:///C:/temp/test5.txt', " + "
>>>  'format'='csv', " + "  'sink.partition-commit.delay'='1
>>> s', " + "
>>> 'sink.partition-commit.policy.kind'='success-file'" + "
>>>  )");  tEnv.executeSql("INSERT INTO fs_table SELECT
>>> column_name1, column_name2 from Table1");
>>> tEnv.sqlQuery("SELECT COUNT(*) AS fs_table_result FROM fs_table")
>>> .execute() .print();   } }*
>>>
>>> Source file (test4.txt) is:
>>>
>>> aa; 23
>>> bb; 657.9
>>> cc; 55
>>>
>>> test5.txt is not created, select from fs_table gives null
>>>
>>>
>>


Re: Did the semantics of Kafka's earliest offset change with the new source API?

2022-07-15 Thread David Anderson
What did change was the default starting position when not starting from a
checkpoint. With FlinkKafkaConsumer, it starts from the committed offsets
by default. With KafkaSource, it starts from the earliest offset.

David

On Fri, Jul 15, 2022 at 5:57 AM Chesnay Schepler  wrote:

> I'm not sure about the previous behavior, but at the very least according
> to the documentation the behavior is identical.
>
> 1.12:
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> *setStartFromEarliest()** / **setStartFromLatest()**: Start from the
> earliest / latest record. Under these modes, committed offsets in Kafka
> will be ignored and not used as starting positions.*
>
> On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote:
>
> Hello,
>
> I have a job running with Flink 1.15.0 that consumes from Kafka with the
> new KafkaSource API, setting a group ID explicitly and specifying
> OffsetsInitializer.earliest() as a starting offset. Today I restarted the
> job ignoring both savepoint and checkpoint, and the consumer started
> reading from the first available message in the broker (from 24 hours ago),
> i.e. it completely ignored the offsets that were committed to Kafka. If I
> use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
> instead, the problem seems to go away.
>
> With the previous FlinkKafkaConsumer, using earliest didn't cause any such
> issues. Was this changed in the aforementioned way on purpose?
>
> Regards,
> Alexis.
>
>
>


Re: Data is lost in the ListState

2022-07-11 Thread David Anderson
This is, in fact, the expected behavior. Let me explain why:

In order for Flink to provide exactly-once guarantees, the input sources
must be able to rewind and then replay any events since the last checkpoint.

In the scenario you shared, the last checkpoint was checkpoint 2, which
occurred before x2 was processed. The x3 input caused a failure, and the
state from checkpoint 2 was restored. This state contained only [a, b, c].

Since sockets don't rewind and replay their input, the x2 has been lost --
it wasn't checkpointed, nor did you repeat it after the failure.

If the source had been something that supported rewind and replay, like
kafka, the events since the offsets stored in the checkpoint would have
been automatically re-processed and nothing would have been lost.

Hope that was helpful,
David


[ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread David Anderson
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/07/06/release-1.15.1.html

The full release notes are available in Jira:


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
David Anderson


[ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread David Anderson
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/07/06/release-1.15.1.html

The full release notes are available in Jira:


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
David Anderson


Re: Fink 15: InvalidProgramException: Table program cannot be compiled. This is a bug

2022-06-09 Thread David Anderson
Sorry; I guess I jumped to conclusions and got it wrong. Let's dig in
deeper.

This sounds similar to what was reported in this thread [1] where someone
else ran into problems with Janino after upgrading from 1.14 to 1.15. Could
this be another instance of the same issue (i.e., Flink ends up using the
wrong version of Janino)?

[1] https://lists.apache.org/thread/9tw165cgpdqz4ron76b1ckmwm9hy4qfd

On Thu, Jun 9, 2022 at 8:18 PM Benenson, Michael <
mikhail_benen...@intuit.com> wrote:

> Hi, David
>
>
>
> I have tried CREATE TABLE … without proc_time, but it does not help, I see
> the same exception.
>
>
> And this is a new issue for Flink 1.15.0, for Flink 1.14.3 it works fine,
> even with both event_time & proc_time in CREATE TABLE statement.
>
> Now I use
>
> CREATE OR REPLACE TABLE input (
> event_header ROW(topic_name STRING),
> `timestamp` STRING NOT NULL,
> event_time AS TO_TIMESTAMP(fix_instant(`timestamp`),
> '-MM-dd''T''HH:mm:ss.SSS''Z'''),
> properties ROW(company_id STRING NOT NULL, scope_area STRING, action STRING)
> NOT NULL,
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
>'connector' = 'kafka',
>'topic' = 'mb-1644796800-qbo',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'json',
>'scan.startup.mode' = 'latest-offset',
>'json.ignore-parse-errors' = 'true',
>'json.fail-on-missing-field' = 'false'
> );
>
> And got the same exception
>
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
> ~[flink-table-runtime-1.15.0.jar:1.15.0]
> …
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
>
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
> ~[flink-table-runtime-1.15.0.jar:1.15.0]
>
> …
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column
> 75: Cannot determine simple type name "org"
>
> at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
> ~[flink-table-runtime-1.15.0.jar:1.15.0]
>
>
>
> *From: *David Anderson 
> *Date: *Thursday, June 9, 2022 at 5:34 AM
> *To: *Benenson, Michael 
> *Cc: *user@flink.apache.org , Deshpande, Omkar <
> omkar_deshpa...@intuit.com>, Waghulde, Suraj 
> *Subject: *Re: Fink 15: InvalidProgramException: Table program cannot be
> compiled. This is a bug
>
> This email is from an external sender.
>
>
>
> A Table can have at most one time attribute. In your Table the proc_time
> column is a processing time attribute, and when you define a watermark on
> the event_time column then that column becomes an event-time attribute.
>
>
>
> If you want to combine event time and processing time, you can use
> the PROCTIME() function in your queries without having a processing time
> attribute as one of the columns in the table.
>
>
>
> Best,
>
> David
>
>
>
> On Wed, Jun 8, 2022 at 9:46 PM Benenson, Michael <
> mikhail_benen...@intuit.com> wrote:
>
> Hi, folks
>
>
>
> *Short description*:
>
>
>
> I use Flink 1.15.0 sql-client and Java User Define Function in CREATE
> TABLE … statement to get Timestamp.
>
> It works OK, if I do not use Timestamp in Watermark, but if used in
> Watermark it causes
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
> …
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
>
> at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
> CompileUtils.java:107)
>
> …
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column
> 75: Cannot determine simple type name "org"
>
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:
> 12211)
>
>
>
>
>
> *Details*:
>
> java -version
>
> openjdk version "11.0.14.1" 2022-02-08 LTS
>
> OpenJDK Runtime Environment Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS)
>
> OpenJDK 64-Bit Server VM Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS,
> mixed mode)
>
>
>
> Flink 1.15.0, flink-1.15.0/bin/sql-client.sh
>
>
>
> SET 'sql-client.execution.result-mode' = 'tableau';
>
> SET 'table.exec.sink.not-null-enforcer' = 'drop';
>
>
>
> CREATE TEMPORARY FUNCTION default_catalog.default_database.fix_ins

Re: Fink 15: InvalidProgramException: Table program cannot be compiled. This is a bug

2022-06-09 Thread David Anderson
A Table can have at most one time attribute. In your Table the proc_time
column is a processing time attribute, and when you define a watermark on
the event_time column then that column becomes an event-time attribute.

If you want to combine event time and processing time, you can use
the PROCTIME() function in your queries without having a processing time
attribute as one of the columns in the table.

Best,
David

On Wed, Jun 8, 2022 at 9:46 PM Benenson, Michael <
mikhail_benen...@intuit.com> wrote:

> Hi, folks
>
>
>
> *Short description*:
>
>
>
> I use Flink 1.15.0 sql-client and Java User Define Function in CREATE
> TABLE … statement to get Timestamp.
>
> It works OK, if I do not use Timestamp in Watermark, but if used in
> Watermark it causes
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
> …
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
>
> at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
> CompileUtils.java:107)
>
> …
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column
> 75: Cannot determine simple type name "org"
>
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:
> 12211)
>
>
>
>
>
> *Details*:
>
> java -version
>
> openjdk version "11.0.14.1" 2022-02-08 LTS
>
> OpenJDK Runtime Environment Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS)
>
> OpenJDK 64-Bit Server VM Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS,
> mixed mode)
>
>
>
> Flink 1.15.0, flink-1.15.0/bin/sql-client.sh
>
>
>
> SET 'sql-client.execution.result-mode' = 'tableau';
>
> SET 'table.exec.sink.not-null-enforcer' = 'drop';
>
>
>
> CREATE TEMPORARY FUNCTION default_catalog.default_database.fix_instant
>
> AS 'com.intuit.data.strmprocess.udf.FixInstant' LANGUAGE JAVA;
>
>
>
> CREATE OR REPLACE TABLE input (
>
>   event_header ROW(topic_name STRING),
>
>   `timestamp` STRING NOT NULL,
>
>   event_time AS TO_TIMESTAMP(fix_instant(`timestamp`),
> '-MM-dd''T''HH:mm:ss.SSS''Z'''),
>
>   properties ROW(company_id STRING NOT NULL, scope_area STRING,
> action STRING) NOT NULL,
>
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND,
>
>   proc_time AS PROCTIME()
>
> ) WITH (
>
> 'connector' = 'kafka',
>
> 'topic' = 'mb-1644796800-qbo',
>
> 'properties.bootstrap.servers' = 'localhost:9092',
>
> 'format' = 'json',
>
> 'scan.startup.mode' = 'latest-offset',
>
> 'json.ignore-parse-errors' = 'true',
>
> 'json.fail-on-missing-field' = 'false'
>
> );
>
>
>
> SELECT `timestamp`, event_time, event_header.topic_name AS topic,
> properties.company_id as company FROM input
>
>LIMIT 10
>
> ;
>
>
>
> Works fine, if I comment WATERMARK FOR event_time …
> Causes an error, if WATERMARK FOR event_time is used:
>
> 2022-06-08 12:16:32
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
> at org.apache.flink.table.runtime.generated.GeneratedClass
> .newInstance(GeneratedClass.java:74)
>
> at org.apache.flink.table.runtime.generated.
> GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(
> GeneratedWatermarkGeneratorSupplier.java:62)
>
> at org.apache.flink.streaming.api.operators.source.
> ProgressiveTimestampsAndWatermarks.createMainOutput(
> ProgressiveTimestampsAndWatermarks.java:104)
>
> at org.apache.flink.streaming.api.operators.SourceOperator
> .initializeMainOutput(SourceOperator.java:426)
>
> at org.apache.flink.streaming.api.operators.SourceOperator
> .emitNextNotReading(SourceOperator.java:402)
>
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:387)
>
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput
> .emitNext(StreamTaskSourceInput.java:68)
>
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:519)
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:804)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:753)
>
> at org.apache.flink.runtime.taskmanager.Task
> .runWithSystemExitMonitoring(Task.java:948)
>
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
> .java:927)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> 

Re: Recover watermark from savepoint

2022-06-09 Thread David Anderson
Sweta,

Flink does not include watermarks in savepoints, nor are they included in
aligned checkpoints. For what it's worth, I believe that with unaligned
checkpoints in-flight watermarks are included in checkpoints, but I don't
believe that would solve the problem, since the watermark strategy's state
is still lost during a restart.

I can't think of any way to guarantee that all possibly late events will be
deterministically identified as late. The commonly used
bounded-out-of-orderness watermark strategy doesn't guarantee this either,
even without a restart (because watermarks are delayed by the auto
watermark interval, rather than being produced at every conceivable
opportunity).

If this is a strong requirement, you could decide not to rely on watermarks
for dropping late events, and implement the logic yourself in a process
function.

Best,
David

On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
wrote:

> Hi,
>
> I want to understand if flink saves a watermark during savepoint and if
> not, how do we achieve this?
>
> We are seeing an issue where on recovery, the job processes some late
> events which should have been discarded if the job were to be running
> without any downtime.
>
> Thank you,
> Sweta
>


Re: Request for Review: FLINK-27507 and FLINK-27509

2022-05-23 Thread David Anderson
I've taken care of this.

David

On Sun, May 22, 2022 at 4:12 AM Shubham Bansal 
wrote:

> Hi Everyone,
>
> I am not sure who to reach out for the reviews of these changesets, so I
> am putting this on the mailing list here.
>
> I have raised the review for
> FLINK-27507 - https://github.com/apache/flink-playgrounds/pull/30
> FLINK-27509 - https://github.com/apache/flink-playgrounds/pull/29
>
> I would appreciate it if somebody can review these change sets as I want
> to make the similar changes for 1.15 version after that.
> Let me know if you need more information regarding this PR, I would be
> happy to connect with you and explain the changes.
>
> Thanks,
> Shubham Bansal
>


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-20 Thread David Anderson
One more thing to be aware of: the Presto S3 implementation has issues too.
See FLINK-24392 [1]. This means that there's no ideal solution, and in some
cases it is preferable to use Hadoop, perhaps in combination with
increasing the value of state.storage.fs.memory-threshold [2] in order to
decrease the number of data files written with each checkpoint.

[1] https://issues.apache.org/jira/browse/FLINK-24392
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#state-storage-fs-memory-threshold

On Thu, May 19, 2022 at 8:45 PM Aeden Jameson 
wrote:

> Great, that all makes sense to me. Thanks again.
>
> On Thu, May 19, 2022 at 11:42 AM David Anderson 
> wrote:
> >
> > Sure, happy to try to help.
> >
> > What's happening with the hadoop filesystem is that before it writes
> each key it checks to see if the "parent directory" exists by checking for
> a key with the prefix up to the last "/", and if that key isn't found it
> then creates empty marker files to cause of that parent directory to exist.
> These existence checks are S3 HEAD requests. None of this is helpful in the
> case of Flink checkpointing.
> >
> > And yes, while Presto avoids this unnecessary overhead, entropy
> injection remains an important tool for improving scalability. Where you'll
> run into quotas and rate limits depends, of course, on factors like the
> number of stateful tasks in your job(s), the parallelism, and the
> checkpointing interval.
> >
> > On Thu, May 19, 2022 at 8:04 PM Aeden Jameson 
> wrote:
> >>
> >> Thanks for the response David. That's the conclusion I came to as
> >> well.  The Hadoop plugin behavior doesn't appear to reflect more
> >> recent changes to S3 like strong read-after-write consistency,
> >>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
> >> .
> >>
> >> Given the improved interaction of the Presto plugin with S3 I would
> >> conclude that this increases the size of the cluster that would
> >> require entropy injection, yes? But that it doesn't necessarily get
> >> rid of the need because one could have a large enough cluster and say
> >> a lifecycle policy that could still end up requiring entropy
> >> injection.
> >>
> >> On Thu, May 19, 2022 at 10:24 AM David Anderson 
> wrote:
> >> >
> >> > Aeden, this is probably happening because you are using the Hadoop
> implementation of S3.
> >> >
> >> > The Hadoop S3 filesystem tries to imitate a filesystem on top of S3.
> In so doing it makes a lot of HEAD requests. These are expensive, and they
> violate read-after-create visibility, which is what you seem to be
> experiencing. By contrast, the Presto S3 implementation doesn't do the same
> (harmful in this case) magic, and simply does PUT/GET operations. Because
> that's all Flink needs to checkpointing, this works much better.
> >> >
> >> > Best,
> >> > David
> >> >
> >> > On Thu, May 12, 2022 at 1:53 AM Aeden Jameson <
> aeden.jame...@gmail.com> wrote:
> >> >>
> >> >> We're using S3 to store checkpoints. They are taken every minute. I'm
> >> >> seeing a large number of 404 responses from S3 being generated by the
> >> >> job manager. The order of the entries in the debugging log would
> imply
> >> >> that it's a result of a HEAD request to a key. For example all the
> >> >> incidents look like this,
> >> >>
> >> >>
> >> >> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
> >> >> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
> >> >> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
> >> >> (amz-sdk-invocation-id: X, Content-Type:
> application/octet-stream,
> >> >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
> >> >> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
> >> >> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
> >> >>
> >> >> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
> >> >> error response: com.amazonaws.services.s3.model.AmazonS3Exception:
> Not
> >> >> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
> >> >> Found; ..)
> >> >>
> >> >> The key does in fact exist. How can I go about resolving this?
> >> >>
> >> >> --
> >> >> Cheers,
> >> >> Aeden
> >> >>
> >> >> GitHub: https://github.com/aedenj
> >>
> >>
> >>
> >> --
> >> Cheers,
> >> Aeden
> >>
> >> GitHub: https://github.com/aedenj
> >> Linked In: http://www.linkedin.com/in/aedenjameson
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread David Anderson
Sure, happy to try to help.

What's happening with the hadoop filesystem is that before it writes each
key it checks to see if the "parent directory" exists by checking for a key
with the prefix up to the last "/", and if that key isn't found it then
creates empty marker files to cause of that parent directory to exist.
These existence checks are S3 HEAD requests. None of this is helpful in the
case of Flink checkpointing.

And yes, while Presto avoids this unnecessary overhead, entropy injection
remains an important tool for improving scalability. Where you'll run into
quotas and rate limits depends, of course, on factors like the number of
stateful tasks in your job(s), the parallelism, and the checkpointing
interval.

On Thu, May 19, 2022 at 8:04 PM Aeden Jameson 
wrote:

> Thanks for the response David. That's the conclusion I came to as
> well.  The Hadoop plugin behavior doesn't appear to reflect more
> recent changes to S3 like strong read-after-write consistency,
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
> .
>
> Given the improved interaction of the Presto plugin with S3 I would
> conclude that this increases the size of the cluster that would
> require entropy injection, yes? But that it doesn't necessarily get
> rid of the need because one could have a large enough cluster and say
> a lifecycle policy that could still end up requiring entropy
> injection.
>
> On Thu, May 19, 2022 at 10:24 AM David Anderson 
> wrote:
> >
> > Aeden, this is probably happening because you are using the Hadoop
> implementation of S3.
> >
> > The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In
> so doing it makes a lot of HEAD requests. These are expensive, and they
> violate read-after-create visibility, which is what you seem to be
> experiencing. By contrast, the Presto S3 implementation doesn't do the same
> (harmful in this case) magic, and simply does PUT/GET operations. Because
> that's all Flink needs to checkpointing, this works much better.
> >
> > Best,
> > David
> >
> > On Thu, May 12, 2022 at 1:53 AM Aeden Jameson 
> wrote:
> >>
> >> We're using S3 to store checkpoints. They are taken every minute. I'm
> >> seeing a large number of 404 responses from S3 being generated by the
> >> job manager. The order of the entries in the debugging log would imply
> >> that it's a result of a HEAD request to a key. For example all the
> >> incidents look like this,
> >>
> >>
> >> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
> >> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
> >> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
> >> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
> >> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
> >> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
> >> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
> >>
> >> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
> >> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
> >> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
> >> Found; ..)
> >>
> >> The key does in fact exist. How can I go about resolving this?
> >>
> >> --
> >> Cheers,
> >> Aeden
> >>
> >> GitHub: https://github.com/aedenj
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>


Re: Checkpointing - Job Manager S3 Head Requests are 404s

2022-05-19 Thread David Anderson
Aeden, this is probably happening because you are using the Hadoop
implementation of S3.

The Hadoop S3 filesystem tries to imitate a filesystem on top of S3. In so
doing it makes a lot of HEAD requests. These are expensive, and they
violate read-after-create visibility, which is what you seem to be
experiencing. By contrast, the Presto S3 implementation doesn't do the same
(harmful in this case) magic, and simply does PUT/GET operations. Because
that's all Flink needs to checkpointing, this works much better.

Best,
David

On Thu, May 12, 2022 at 1:53 AM Aeden Jameson 
wrote:

> We're using S3 to store checkpoints. They are taken every minute. I'm
> seeing a large number of 404 responses from S3 being generated by the
> job manager. The order of the entries in the debugging log would imply
> that it's a result of a HEAD request to a key. For example all the
> incidents look like this,
>
>
> 2022-05-11 23:29:00,804 DEBUG com.amazonaws.request [] - Sending
> Request: HEAD https://[MY-BUCKET].s3.amazonaws.com
> /[MY_JOB]/checkpoints/5f4d6923883a1702b206f978fa3637a3/ Headers:
> (amz-sdk-invocation-id: X, Content-Type: application/octet-stream,
> User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.788
> Linux/5.4.181-99.354.amzn2.x86_64 OpenJDK_64-Bit_Server_VM/11.0.13+8
> java/11.0.13 scala/2.12.7 vendor/Oracle_Corporation, )
>
> 2022-05-11 23:29:00,815 DEBUG com.amazonaws.request [] - Received
> error response: com.amazonaws.services.s3.model.AmazonS3Exception: Not
> Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not
> Found; ..)
>
> The key does in fact exist. How can I go about resolving this?
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
>


Re: Confusing S3 Entropy Injection Behavior

2022-05-19 Thread David Anderson
Aeden,

I want to expand my answer after having re-read your question a bit more
carefully.

For point 1 the behavior you are seeing is what is expected. With hadoop
the metadata written by the job manager will literally include "_entropy_"
in its path, while this will be replaced in paths of any and all checkpoint
data files. With presto the metadata path won't include "_entropy_" at all
(it will disappear, rather than being replaced by something specific).

For point 2, I'm not sure.

David

On Thu, May 19, 2022 at 2:37 PM David Anderson  wrote:

> This sounds like it could be FLINK-17359 [1]. What version of Flink are
> you using?
>
> Another likely explanation arises from the fact that only the
> checkpoint data files (the ones created and written by the task managers)
> will have the _entropy_ replaced. The job manager does not inject entropy
> into the path of the checkpoint metadata, so that it remains at a
> predictable URI. Since Flink only writes keyed state larger than
> state.storage.fs.memory-threshold into the checkpoint data files, and only
> those files have entropy injected into their paths, if all of your state is
> small it will all end up in the metadata file and you don't see any entropy
> injection happening. See the comments on [2] for more on this.
>
> FWIW, I would urge you to use presto instead of hadoop for checkpointing
> on S3. The performance of the hadoop "filesystem" is problematic when it's
> used for checkpointing.
>
> Regards,,
> David
>
> [1] https://issues.apache.org/jira/browse/FLINK-17359
> [2] https://issues.apache.org/jira/browse/FLINK-24878
>
> On Wed, May 18, 2022 at 7:48 PM Aeden Jameson 
> wrote:
>
>> I have checkpoints setup against s3 using the hadoop plugin. (I'll
>> migrate to presto at some point) I've setup entropy injection per the
>> documentation with
>>
>> state.checkpoints.dir: s3://my-bucket/_entropy_/my-job/checkpoints
>> s3.entropy.key: _entropy_
>>
>> I'm seeing some behavior that I don't quite understand.
>>
>> 1. The folder s3://my-bucket/_entropy_/my-job/checkpoints/...
>> literally exists. Meaning that "_entropy_" has not been replaced. At
>> the same time there are also a bunch of folders where "_entropy_" has
>> been replaced. Is that to be expected? If so, would someone elaborate
>> on why this is happening?
>>
>> 2. Should the paths in the checkpoints history tab in the FlinkUI
>> display the path the key? With the current setup it is not.
>>
>> Thanks,
>> Aeden
>>
>> GitHub: https://github.com/aedenj
>> Linked In: http://www.linkedin.com/in/aedenjameson
>>
>


Re: Confusing S3 Entropy Injection Behavior

2022-05-19 Thread David Anderson
This sounds like it could be FLINK-17359 [1]. What version of Flink are you
using?

Another likely explanation arises from the fact that only the
checkpoint data files (the ones created and written by the task managers)
will have the _entropy_ replaced. The job manager does not inject entropy
into the path of the checkpoint metadata, so that it remains at a
predictable URI. Since Flink only writes keyed state larger than
state.storage.fs.memory-threshold into the checkpoint data files, and only
those files have entropy injected into their paths, if all of your state is
small it will all end up in the metadata file and you don't see any entropy
injection happening. See the comments on [2] for more on this.

FWIW, I would urge you to use presto instead of hadoop for checkpointing on
S3. The performance of the hadoop "filesystem" is problematic when it's
used for checkpointing.

Regards,,
David

[1] https://issues.apache.org/jira/browse/FLINK-17359
[2] https://issues.apache.org/jira/browse/FLINK-24878

On Wed, May 18, 2022 at 7:48 PM Aeden Jameson 
wrote:

> I have checkpoints setup against s3 using the hadoop plugin. (I'll
> migrate to presto at some point) I've setup entropy injection per the
> documentation with
>
> state.checkpoints.dir: s3://my-bucket/_entropy_/my-job/checkpoints
> s3.entropy.key: _entropy_
>
> I'm seeing some behavior that I don't quite understand.
>
> 1. The folder s3://my-bucket/_entropy_/my-job/checkpoints/...
> literally exists. Meaning that "_entropy_" has not been replaced. At
> the same time there are also a bunch of folders where "_entropy_" has
> been replaced. Is that to be expected? If so, would someone elaborate
> on why this is happening?
>
> 2. Should the paths in the checkpoints history tab in the FlinkUI
> display the path the key? With the current setup it is not.
>
> Thanks,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
>


Re: sharing data between 2 pipelines

2022-05-10 Thread David Anderson
This sounds like it might be a use case for something like a
KeyedCoProcessFunction (or possibly a KeyedBroadcastProcessFunction,
depending on the details). These operators can receive inputs from two
different sources, and share state between them.

The rides and fares exercise [1] from the flink-training illustrates this
pattern, which you can also read about in the tutorial on Connected Streams
in the documentation [2].

If you need to broadcast the signal from the scheduler, see [3].

Regards,
David

[1] https://github.com/apache/flink-training/tree/master/rides-and-fares
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/learn-flink/etl/#connected-streams
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/broadcast_state/

On Tue, May 10, 2022 at 12:09 PM Sigalit Eliazov 
wrote:

> Hi all
> i have 2 pipelines:
> A. receives information from kafka and "holds" that info
> B. a pipeline which is triggered by a scheduler and every x minutes should
> send the info i received in pipeline A to another kafka topic
>
> As i understood i cannot use the flink state for this since these are
> different pipelines/operators..
> is there a way to implement such a case in Flink itself without using any
> external application like redis or db?
>
> Thanks
> Sigalit
>
>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-10 Thread David Anderson
gt; "soft no". We may consider to also put that in the cod of conduct.
>>> >>
>>> >> Concerning using Slack for user QAs, it seem the major concern is
>>> that, we
>>> >> may end up repeatedly answering the same questions from different
>>> users,
>>> >> due to lack of capacity for archiving and searching historical
>>> >> conversations. TBH, I don't have a good solution for the
>>> archivability and
>>> >> searchability. I investigated some tools like Zapier [1], but none of
>>> them
>>> >> seems suitable for us. However, I'd like to share 2 arguments.
>>> >> - The purpose of Slack is to make the communication more efficient? By
>>> >> *efficient*, I mean saving time for both question askers and helpers
>>> with
>>> >> instance messages, file transmissions, even voice / video calls, etc.
>>> >> (Especially for cases where back and forth is needed, as David
>>> mentioned.)
>>> >> It does not mean questions that do not get enough attentions on MLs
>>> are
>>> >> now
>>> >> guaranteed to be answered immediately. We can probably put that into
>>> the
>>> >> code of conduct, and kindly guide users to first search and initiate
>>> >> questions on MLs.
>>> >> - I'd also like to share some experience from the Flink China
>>> community.
>>> >> We
>>> >> have 3 DingTalk groups with totally 25k members (might be less, I
>>> didn't
>>> >> do
>>> >> deduplication), posting hundreds of messages daily. What I'm really
>>> >> excited
>>> >> about is that, there are way more interactions between users & users
>>> than
>>> >> between users & developers. Users are helping each other, sharing
>>> >> experiences, sending screenshots / log files / documentations and
>>> solving
>>> >> problems together. We the developers seldom get pinged, if not
>>> proactively
>>> >> joined the conversations. The DingTalk groups are way more active
>>> compared
>>> >> to the user-zh@ ML, which I'd attribute to the improvement of
>>> interaction
>>> >> experiences. Admittedly, there are questions being repeatedly asked &
>>> >> answered, but TBH I don't think that compares to the benefit of a
>>> >> self-driven user community. I'd really love to see if we can bring
>>> such
>>> >> success to the global English-speaking community.
>>> >>
>>> >> Concerning StackOverFlow, it definitely worth more attention from the
>>> >> community. Thanks for the suggestion / reminder, Piotr & David. I
>>> think
>>> >> Slack and StackOverFlow are probably not mutual exclusive.
>>> >>
>>> >> Thank you~
>>> >>
>>> >> Xintong Song
>>> >>
>>> >>
>>> >> [1] https://zapier.com/
>>> >>
>>> >>
>>> >>
>>> >> On Sat, May 7, 2022 at 9:50 AM Jingsong Li 
>>> >> wrote:
>>> >>
>>> >> > Most of the open source communities I know have set up their slack
>>> >> > channels, such as Apache Iceberg [1], Apache Druid [2], etc.
>>> >> > So I think slack can be worth trying.
>>> >> >
>>> >> > David is right, there are some cases that need to communicate back
>>> and
>>> >> > forth, slack communication will be more effective.
>>> >> >
>>> >> > But back to the question, ultimately it's about whether there are
>>> >> > enough core developers willing to invest time in the slack, to
>>> >> > discuss, to answer questions, to communicate.
>>> >> > And whether there will be enough time to reply to the mailing list
>>> and
>>> >> > stackoverflow after we put in the slack (which we need to do).
>>> >> >
>>> >> > [1] https://iceberg.apache.org/community/#slack
>>> >> > [2] https://druid.apache.org/community/
>>> >> >
>>> >> > On Fri, May 6, 2022 at 10:06 PM David Anderson <
>>> dander...@apache.org>
>>> >> > wrote:
>>> >> > >
>>> >> > > I have mixed feelings about this.
>>> >> > >
>>

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-06 Thread David Anderson
I have mixed feelings about this.

I have been rather visible on stack overflow, and as a result I get a lot
of DMs asking for help. I enjoy helping, but want to do it on a platform
where the responses can be searched and shared.

It is currently the case that good questions on stack overflow frequently
go unanswered because no one with the necessary expertise takes the time to
respond. If the Flink community has the collective energy to do more user
outreach, more involvement on stack overflow would be a good place to
start. Adding slack as another way for users to request help from those who
are already actively providing support on the existing communication
channels might just lead to burnout.

On the other hand, there are rather rare, but very interesting cases where
considerable back and forth is needed to figure out what's going on. This
can happen, for example, when the requirements are unusual, or when a
difficult to diagnose bug is involved. In these circumstances, something
like slack is much better suited than email or stack overflow.

David

On Fri, May 6, 2022 at 3:04 PM Becket Qin  wrote:

> Thanks for the proposal, Xintong.
>
> While I share the same concerns as those mentioned in the previous
> discussion thread, admittedly there are benefits of having a slack channel
> as a supplementary way to discuss Flink. The fact that this topic is raised
> once a while indicates lasting interests.
>
> Personally I am open to having such a slack channel. Although it has
> drawbacks, it serves a different purpose. I'd imagine that for people who
> prefer instant messaging, in absence of the slack channel, a lot of
> discussions might just take place offline today, which leaves no public
> record at all.
>
> One step further, if the channel is maintained by the Flink PMC, some kind
> of code of conduct might be necessary. I think the suggestions of ad-hoc
> conversations, reflecting back to the emails are good starting points. I
> am +1 to give it a try and see how it goes. In the worst case, we can just
> stop doing this and come back to where we are right now.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, May 6, 2022 at 8:55 PM Martijn Visser 
> wrote:
>
>> Hi everyone,
>>
>> While I see Slack having a major downside (the results are not indexed by
>> external search engines, you can't link directly to Slack content unless
>> you've signed up), I do think that the open source space has progressed and
>> that Slack is considered as something that's invaluable to users. There are
>> other Apache programs that also run it, like Apache Airflow [1]. I also see
>> it as a potential option to create a more active community.
>>
>> A concern I can see is that users will start DMing well-known
>> reviewers/committers to get a review or a PR merged. That can cause a lot
>> of noise. I can go +1 for Slack, but then we need to establish a set of
>> community rules.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] https://airflow.apache.org/community/
>>
>> On Fri, 6 May 2022 at 13:59, Piotr Nowojski  wrote:
>>
>>> Hi Xintong,
>>>
>>> I'm not sure if slack is the right tool for the job. IMO it works great
>>> as
>>> an adhoc tool for discussion between developers, but it's not searchable
>>> and it's not persistent. Between devs, it works fine, as long as the
>>> result
>>> of the ad hoc discussions is backported to JIRA/mailing list/design doc.
>>> For users, that simply would be extremely difficult to achieve. In the
>>> result, I would be afraid we are answering the same questions over, and
>>> over and over again, without even a way to provide a link to the previous
>>> thread, because nobody can search for it .
>>>
>>> I'm +1 for having an open and shared slack space/channel for the
>>> contributors, but I think I would be -1 for such channels for the users.
>>>
>>> For users, I would prefer to focus more on, for example, stackoverflow.
>>> With upvoting, clever sorting of the answers (not the oldest/newest at
>>> top)
>>> it's easily searchable - those features make it fit our use case much
>>> better IMO.
>>>
>>> Best,
>>> Piotrek
>>>
>>>
>>>
>>> pt., 6 maj 2022 o 11:08 Xintong Song  napisał(a):
>>>
>>> > Thank you~
>>> >
>>> > Xintong Song
>>> >
>>> >
>>> >
>>> > -- Forwarded message -
>>> > From: Xintong Song 
>>> > Date: Fri, May 6, 2022 at 5:07 PM
>>> > Subject: Re: [Discuss] Creating an Apache Flink slack workspace
>>> > To: private 
>>> > Cc: Chesnay Schepler 
>>> >
>>> >
>>> > Hi Chesnay,
>>> >
>>> > Correct me if I'm wrong, I don't find this is *repeatedly* discussed
>>> on the
>>> > ML. The only discussions I find are [1] & [2], which are 4 years ago.
>>> On
>>> > the other hand, I do find many users are asking questions about whether
>>> > Slack should be supported [2][3][4]. Besides, I also find a recent
>>> > discussion thread from ComDev [5], where alternative communication
>>> channels
>>> > are being discussed. It seems to me ASF is quite open to having such
>>> > additional 

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-22 Thread David Anderson
Alexis,

Compaction isn't an all-at-once procedure. RocksDB is organized as a series
of levels, each 10x larger than the one below. There are a few different
compaction algorithms available, and they are tunable, but what's typically
happening during compaction is that one SST file at level n is being merged
into the relevant SST files at level n+1. During this compaction procedure,
obsolete and deleted entries are cleaned up. And several such compactions
can be occurring concurrently. (Not to mention that each TM has its own
independent RocksDB instance.)

It's not unusual for jobs with a small amount of state to end up with
checkpoints of a few hundred MBs in size, where a lot of that is
uncompacted garbage. If you find this troublesome, you could configure
RocksDB to compact more frequently.

David

On Thu, Apr 21, 2022 at 12:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hello,
>
> I enabled some of the RocksDB metrics and I noticed some additional
> things. After changing the configuration YAML, I restarted the cluster with
> a savepoint, and I can see that it only used 5.6MB on disk. Consequently,
> after the job switched to running state, the new checkpoints were also a
> few MB in size. After running for 1 day, checkpoint size is now around
> 100MB. From the metrics I can see with the Prometheus reporter:
>
> - All entries for num-live-versions show 1
> - All entries for compaction-pending show 0
> - Most entries for estimate-num-keys are in the range of 0 to 100,
> although I see a few with 151 coming from
> flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys
>
> Is compaction expected after only 100MB? I imagine not, but if the
> savepoint shows that the effective amount of data is so low, size growth
> still seems far too large. In fact, if I only look at the UI, Bytes
> Received for the relevant SubTasks is about 14MB, yet the latest checkpoint
> already shows a Data Size of 75MB for said SubTasks.
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Mittwoch, 20. April 2022 10:37
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state
> processor API
>
> State Processor API works on a higher level and is not aware of any
> RocksDB specifics (in fact, it can be used with any backend).
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa
>
>  wrote:
> >
> > I can look into RocksDB metrics, I need to configure Prometheus at some
> point anyway. However, going back to the original question, is there no way
> to gain more insight into this with the state processor API? You've
> mentioned potential issues (too many states, missing compaction) but, with
> my admittedly limited understanding of the way RocksDB is used in Flink, I
> would have thought that such things would be visible when using the state
> processor. Is there no way for me to "parse" those MANIFEST files with some
> of Flink's classes and get some more hints?
> >
> > Regards,
> > Alexis.
> >
> > 
> > From: Roman Khachatryan 
> > Sent: Tuesday, April 19, 2022 5:51 PM
> > To: Alexis Sarda-Espinosa 
> > Cc: user@flink.apache.org 
> > Subject: Re: RocksDB's state size discrepancy with what's seen with
> > state processor API
> >
> > > I assume that when you say "new states", that is related to new
> descriptors with different names? Because, in the case of windowing for
> example, each window "instance" has its own scoped (non-global and keyed)
> state, but that's not regarded as a separate column family, is it?
> > Yes, that's what I meant, and that's regarded as the same column family.
> >
> > Another possible reason is that SST files aren't being compacted and
> > that increases the MANIFEST file size.
> > I'd check the total number of loaded SST files and the creation date
> > of the oldest one.
> >
> > You can also see whether there are any compactions running via RocksDB
> > metrics [1] [2] (a reporter needs to be configured [3]).
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-backend-rocksdb-metrics-num-running-compactions
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-backend-rocksdb-metrics-compaction-pending
> > [3]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> > etric_reporters/#reporters
> >
> > Regards,
> > Roman
> >
> > On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
> >  wrote:
> > >
> > > Hi Roman,
> > >
> > > I assume that when you say "new states", that is related to new
> descriptors with different names? Because, in the case of windowing for
> example, each window "instance" has its own scoped (non-global and keyed)
> state, but that's not regarded as a separate column family, is it?
> > >
> > > For the 3 descriptors I mentioned 

Re: Flink batch mode does not sort by event timestamp

2022-04-22 Thread David Anderson
The DataStream API's BATCH execution mode first sorts by key, and within
each key, it sorts by timestamp. By operating this way, it only needs to
keep state for one key at a time, so this keeps the runtime simple and
efficient.

Regards,
David

P.S. I see you also asked this question on stack overflow. Please try to
refrain from asking questions in both forums simultaneously -- this creates
extra work for the community, and we struggle to find the resources to keep
up with all of the questions as it is.



On Fri, Apr 22, 2022 at 7:57 AM Han You  wrote:

> I have a custom flink Source, and I have a SerializableTimestampAssigner that
> assigns event timestamps to records emitted by the source. The source may
> emit records out of order because of the nature of the underlying data
> storage, however with BATCH mode, I expect Flink to sort these records by
> event timestamp before any operator processes them.
>
> Excerpted from Flink document
> 
>  on
> execution mode: In BATCH mode, where the input dataset is known in
> advance, there is no need for such a heuristic as, at the very least,
> elements can be sorted by timestamp so that they are processed in temporal
> order.
>
> However, this doesn't seem to be the case. If I create a datastream out of
> the Source (StreamExecutionEnvironment.fromSource) with my timestamp
> assigner, and then datastream.addSink(x => println(extractTimestamp(x)),
> the output isn't strictly ascending. Is my understanding of the document
> wrong? Or does flink expect me (the users) to sort the input dataset
> themselves?
>
>
>
> Thanks in advance for any help!
> --
>
> *Han You* | Junior Developer
> *Akuna Capital*
> | www.akunacapital.com
> p: | m: | f: | han@akunacapital.com
> Learn More About the New Wave in Tech and Trading
> 
> Visit Us- A 360° Virtual Reality Experience
> 
>
> 
> 
> 
> 
>
> Please consider the environment *before* printing this email.
>
> This electronic message contains information from Akuna Capital LLC that
> may be confidential, legally privileged or otherwise protected from
> disclosure. This information is intended for the use of the addressee only
> and is not offered as investment advice to be relied upon for personal or
> professional use. Additionally, all electronic messages are recorded and
> stored in compliance pursuant to applicable SEC rules. If you are not the
> intended recipient, you are hereby notified that any disclosure, copying,
> distribution, printing or any other use of, or any action in reliance on,
> the contents of this electronic message is strictly prohibited. If you have
> received this communication in error, please notify us by telephone at
> (312)994- 4640 and destroy the original message.
>


Re: How to reprocess historical data with event-time windowing?

2022-04-13 Thread David Anderson
Ty,

Usually what's done is to run a separate instance of the app to handle the
re-ingestion of the historic data while another instance is processing live
data. That way the backfill job won't be confused by observing events with
recent timestamps -- it will only see the historic data. But you will need
to either provide the data roughly sorted by timestamp, or execute the
backfill job in batch execution mode.

Regards,
David

On Tue, Apr 12, 2022 at 5:28 PM Ty Brooks  wrote:

> Hi devs,
>
> I’ve got a question about how to design a Flink app that can handle the
> reprocessing of historical data. As a quick background, I’m building an ETL
> / data aggregator application, and one of the requirements is that if in
> the future we want to extend the system by adding some new metrics or
> dimensions, we’ll have the option to reprocess historical data in order to
> generate backfill (assuming the historical data already has the fields
> we’ll be adding to our aggregations, of course).
>
> The data source for my application is Kafka, so my naive approach here
> would just be to re-ingest the older events into Kafka and let the stream
> processor handle the rest. But if I’m understanding Flink’s windowing and
> watermarks correctly for an event-time based app, there doesn’t really seem
> to be a way of revisiting older data. If you’re using event-time windowing,
> then any window for which (end window range + window allowed lateness +
> watermark strategy out of orderness) <= max_timestamp is permanently
> finished since max_timestamp is always increasing.
>
> And that all does make sense, but I’m not sure where that leaves me for my
> requirement to support backfill. Is the alternative to just recreate the
> app every time and try to ensure that all the historical data gets
> processed before any new, live data?
>
> Best,
> Ty
>
>
>


Re: Parallel processing in a 2 node cluster apparently not working

2022-03-29 Thread David Anderson
In this situation, changing your configuration [1] to include

cluster.evenly-spread-out-slots: true

should change the scheduling behavior to what you are looking for.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#cluster-evenly-spread-out-slots

Regards,
David

On Tue, Mar 29, 2022 at 1:30 PM HG  wrote:

>
> Hi,
>
> I have a 2 node cluster just for testing.
> When I start the cluster and the job I see that the parallelism is 2 as
> expected.
> But only they are both on the same node.
> When I stop the taskmanager on that node it switches to the other one.
> But I expected both nodes to have a subtask.
>
> See below.
>
> Any clues?
>
> Regards Hans-Peter
>
> [image: image.png]
> [image: image.png]
>


Re: Using Amazon EC2 Spot instances with Flink

2022-03-24 Thread David Anderson
I remember a Flink Forward talk several years ago where the speaker shared
how they were running on spot instances. They were catching the
notification that the instance was being shutdown, taking a savepoint, and
relaunching. They were also proactively monitoring spot instance prices
around the world, and migrating to the least expensive region.

David

On Tue, Mar 22, 2022 at 11:33 PM Vasileva, Valeriia <
c-valeriia.vasil...@disneystreaming.com> wrote:

> Hello, folks!
>
>
>
> I was wondering if there are some good articles on how to use EC2 Spot
> instances with Flink?
>
>
>
> I would appreciate your help! Thank you!
>
>
>
> Kind Regards,
>
> Valeriia
>


Re: How to flatten ARRAY in Table API

2022-02-20 Thread David Anderson
Matthias,

You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1].

Something like this should work:

SELECT
  id, customerid, productid, quantity, ...
FROM
  orders
CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price,
discount);

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion

Regards,
David

On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler 
wrote:

> Hey Flinksters,
>
> I'm reading a nested JSON object into a table and would like to access the
> nested rows inside an array. Is there a way to flatten them so that I get a
> table with the nested rows?
>
> So far, I've only been able to figure out how to access a specific element
> inside the array using the "at" method but I'm trying to flatten the nested
> rows into a table and the arrays can have variable length. Below is a code
> snippet of what I have thus far but notice how I'm only accessing the first
> element in each array.
>
> How do you do this in Flink? Apologies if this is obvious - I wasn't able
> to find an example or documentation and would appreciate any help.
>
> Thank you,
> Matthias
>
> ---
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
>
> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.INT())
> .column("customerid", DataTypes.INT())
> .column("time", DataTypes.INT())
> .column("entries", DataTypes.ARRAY(DataTypes.ROW(
> DataTypes.FIELD("productid", DataTypes.INT()),
> DataTypes.FIELD("quantity", DataTypes.INT()),
> DataTypes.FIELD("unit_price", DataTypes.DECIMAL(9,3)),
> DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
> )))
> .build())
> .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + 
> "/orders.json")
> .format("json")
> .build();
>
>
>
> tEnv.createTable("Orders",jsonTable);
> Table orders = tEnv.from("Orders");
>
> Table flattenEntries = 
> orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));
>
>


Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I've done some work on this with Nico Kruber.

In our benchmarking, the performance loss (from not being able to use the
namespace) was roughly a factor of two, so it is significant. We prototyped
an API extension that addresses this particular concern but without
exposing the namespace directly, which I believe there is some reluctance
to do. I've been thinking of turning this into a FLIP, but it needs more
work first.

Another direction that could be explored would be to use finer-grained
timestamps. E.g., with nanosecond-precision timestamps the number of
colliding events will be dramatically smaller.

David

On Wed, Feb 16, 2022 at 10:17 PM David Anderson 
wrote:

> I'm afraid not. The DataStream window implementation uses internal APIs to
> manipulate the state backend namespace, which isn't possible to do with the
> public-facing API. And without this, you can't implement this as
> efficiently.
>
> David
>
> On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing  wrote:
>
>> Hi,
>>
>> I'm trying to implement customized state logic with KeyedProcessFunction.
>> But I'm not quite sure how to implement the correct watermark behavior when
>> late data is involved.
>>
>> According to the answer on stackoverflow:
>> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
>> , there should be a state buffering all events until watermark passed the
>> expected time and a event time trigger will fetch from the state and do the
>> calculation. The buffer type should be Map> where T is the
>> timestamp and E is the event type.
>>
>> However, the interface provided by Flink currently is only a MapStae> V>. If the V is a List and buffered all events, every time an event
>> comes Flink will do ser/deser and could be very expensive when throughput
>> is huge.
>>
>> I checked the built-in window implementation which implements the
>> watermark buffering.  It seems that WindowOperator consists of some
>> InternalStates,  of which signature is where window is namespace or key, if
>> I understand correctly. But internal states are not available for Flink
>> users.
>>
>> So my question is: is there an efficient way to simulate watermark
>> buffering using process function for Flink users?
>>
>> Thanks.
>>
>


Re: Implement watermark buffering with Process Function

2022-02-16 Thread David Anderson
I'm afraid not. The DataStream window implementation uses internal APIs to
manipulate the state backend namespace, which isn't possible to do with the
public-facing API. And without this, you can't implement this as
efficiently.

David

On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing  wrote:

> Hi,
>
> I'm trying to implement customized state logic with KeyedProcessFunction.
> But I'm not quite sure how to implement the correct watermark behavior when
> late data is involved.
>
> According to the answer on stackoverflow:
> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
> , there should be a state buffering all events until watermark passed the
> expected time and a event time trigger will fetch from the state and do the
> calculation. The buffer type should be Map> where T is the
> timestamp and E is the event type.
>
> However, the interface provided by Flink currently is only a MapStae V>. If the V is a List and buffered all events, every time an event
> comes Flink will do ser/deser and could be very expensive when throughput
> is huge.
>
> I checked the built-in window implementation which implements the
> watermark buffering.  It seems that WindowOperator consists of some
> InternalStates,  of which signature is where window is namespace or key, if
> I understand correctly. But internal states are not available for Flink
> users.
>
> So my question is: is there an efficient way to simulate watermark
> buffering using process function for Flink users?
>
> Thanks.
>


Re: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner

2022-02-12 Thread David Anderson
You are probably running with Java 11 (with Java 8 these messages aren't
produced). The Flink docs [1] say

These warnings are considered harmless and will be addressed in future
Flink releases.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.10/#java-11-support

David

On Fri, Feb 11, 2022 at 7:44 AM Антон  wrote:

> Hello,
>
> what could be the reason for warning like this:
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/var/flink/flink-1.13.2/lib/flink-dist_2.12-1.13.2.jar) to field
> java.lang.String.value
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
>
> ?
>


Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-12 Thread David Anderson
Flink uses watermarks to indicate when a stream has become complete up
through some point in time. Various operations on streams wait for
watermarks in order to know when they can safely stop waiting for
further input, and so go ahead and produce their results. These
operations include event-time windowing, interval and temporal joins,
pattern matching, and sorting (by timestamp).

Events that are late have timestamps less than equal to the current
watermark. They have missed their chance to influence the results of those
operations that rely on watermarks for triggering. But otherwise, Flink
doesn't care if events are late or not. It's not that late events are
automatically dropped in all circumstances -- it's just that these temporal
operations won't wait long enough to accommodate their extreme
out-of-order-ness (lateness).

So yes, your ALL_EVENTS view will contain all of the events, including late
ones.

When your job starts running, it takes some time for an initial watermark
to be produced. During that period of time, the current watermark is NULL,
and no events will be considered late.

Hope this helps clarify things.

Regards,
David

On Sat, Feb 12, 2022 at 12:01 AM M Singh  wrote:

> I thought a little more about your references Martijn and wanted to
> confirm one thing - the table is specifying the watermark and the
> downstream view needs to check if it wants all events or only the non-late
> events.  Please let my understanding is correct.
>
> Thanks again for your references.
>
> Mans
>
> On Friday, February 11, 2022, 05:02:49 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
>
>
>
> Hi Martijn:
>
> Thanks for the reference.
>
> My understanding was that if we use watermark then any event with event
> time (in the above example) < event_time - 30 seconds will be dropped
> automatically.
>
> My question [1] is will the downstream (ALL_EVENTS) view which is
> selecting the events from the table receive events which are late ?  If
> late events are dropped at the table level then do we still need the second
> predicate check (ts > CURRENT_WATERMARK(ts)) to filter out late events at
> the view level.
>
> If the table does not drop late events, then will all downstream views/etc
> need to add this check (ts > CURRENT_WATERMARK(ts)) ?
>
> I am still not clear on this concept of whether downstream view need to
> check for late events with this predicate or will they never receive late
> events.
>
> Thanks again for your time.
>
>
> On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser <
> mart...@ververica.com> wrote:
>
>
> Hi,
>
> There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this
> would cover your questions [1].
>
> Best regards,
>
> Martijn
>
> [1]
> https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
>
> On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:
>
> Hi:
>
> The flink docs (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
> indicates that the CURRENT_WATERMARK(rowtime) can return null:
>
> Note that this function can return NULL, and you may have to consider
> this case. For example, if you want to filter out late data you can use:
>
> WHERE
>   CURRENT_WATERMARK(ts) IS NULL
>   OR ts > CURRENT_WATERMARK(ts)
>
>
> I have the following questions that if the table is defined with a
> watermark eg:
>
> CREATE TABLE `MYEVENTS` (
> `name` STRING,
> `event_time` TIMESTAMP_LTZ(3),
>  ...
> WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)
> WITH (...)
>
>
> 1. If we define the water mark as above, will the late events still be
> propagated to a view or table which is selecting from MYEVENTS table:
>
> CREATE TEMPORARY VIEW `ALL_EVENTS` AS
> SELECT * FROM MYEVENTS;
>
> 2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are
> the conditions for returning null ?
>
>
>
> Thanks
>
>
>
>


Re: KafkaSource vs FlinkKafkaConsumer010

2022-02-01 Thread David Anderson
Before Kafka introduced their universal client, Flink had version-specific
connectors, e.g., for versions 0.8, 0.9, 0.10, and 0.11. Those were
eventually removed in favor of FlinkKafkaConsumer, which is/was backward
compatible back to Kafka version 0.10.

FlinkKafkaConsumer itself was deprecated in Flink 1.14 in favor of
KafkaSource, which implements the unified batch/streaming interface defined
in FLIP-27.

Regards,
David

On Tue, Feb 1, 2022 at 9:21 AM Francesco Guardiani 
wrote:

> I think the FlinkKakfaConsumer010 you're talking about is the old source
> api. You should use only KafkaSource now, as they use the new source
> infrastructure.
>
> On Tue, Feb 1, 2022 at 9:02 AM HG  wrote:
>
>> Hello Francesco
>> Perhaps I copied the wrong link of 1.2.
>> But there is also
>> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>
>> It seems there are 2 ways to use Kafka
>>
>> KafkaSource source = KafkaSource.builder()
>> .setBootstrapServers(brokers)
>> .setTopics("input-topic")
>> .setGroupId("my-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build();
>>
>> And like this:
>>
>> Properties kafkaProperties = new Properties();
>> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
>> kafkaProperties.put("group.id",kafkaGroupID);
>> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
>> FlinkKafkaConsumer010 kafkaConsumer = new 
>> FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), 
>> kafkaProperties);
>> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>>
>>
>> There is even a FlinkKafkaConsumer011
>>
>> Which one is preferable ? Or have they different use cases?
>>
>> Regards Hans
>>
>>
>> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> The latter link you posted refers to a very old flink release. You shold
>>> use the first link, which refers to latest release
>>>
>>> FG
>>>
>>> On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:
>>>
 Hello all

 I am confused.
 What is the difference between KafkaSource as defined in :
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
 and FlinkKafkaConsumer010 as defined in
 https://nightlies.apache.org/flink/flink-docs-release-
 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
 

 When should I use which?

 Regards Hans

>>>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-30 Thread David Anderson
I agree.

The Twitter connector is used in a few (unofficial) tutorials, so if we
remove it that will make it more difficult for those tutorials to be
maintained. On the other hand, if I recall correctly, that connector uses
V1 of the Twitter API, which has been deprecated, so it's really not very
useful even for that purpose.

David



On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I would like to discuss deprecating Flinks' Twitter connector [1]. This
> was one of the first connectors that was added to Flink, which could be
> used to access the tweets from Twitter. Given the evolution of Flink over
> Twitter, I don't think that:
>
> * Users are still using this connector at all
> * That the code for this connector should be in the main Flink codebase.
>
> Given the circumstances, I would propose to deprecate and remove this
> connector. I'm looking forward to your thoughts. If you agree, please also
> let me know if you think we should first deprecate it in Flink 1.15 and
> remove it in a version after that, or if you think we can remove it
> directly.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>
>


Re: Stack Overflow Question - Deserialization schema for multiple topics

2022-01-30 Thread David Anderson
Hussein,

To use a JsonRowDeserializationSchema you'll need to use the Table API, and
not DataStream.

You'll want to use a JsonRowSchemaConverter to convert your json schema
into the TypeInformation needed by Flink, which is done for you by
the JsonRowDeserializationSchema builder:

json_row_schema =
JsonRowDeserializationSchema.builder().json_schema(json_schema_string).build()

Given that schema, you can pass it to constructor for the kafka consumer:

kafka_consumer = FlinkKafkaConsumer("stream-source", json_row_schema,
kafka_props)

To read from 3 different topics, you can either instantiate three different
sources, or specify that a single source is to be used to read from
multiple topics, which you can do by passing a list of strings as the
topics parameter.

Regards,
David

On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul 
wrote:

> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer


Re: Stack Overflow Question - Deserialization schema for multiple topics

2022-01-28 Thread David Anderson
For questions like this one, please address them to either Stack Overflow
or the user mailing list, but not both at once. Those two forums are
appropriate places to get help with using Flink's APIs. And once you've
asked a question, please allow some days for folks to respond before trying
again.

The dev and community mailing lists are dedicated to other topics, and
aren't suitable for getting help. The community list is for discussions
related to meetups and conferences, and the dev list is for discussions and
decision making about the ongoing development of Flink itself.

In the interest of not further spamming the dev and community lists, let's
limit the follow-up on deserializers to the user ML.

Best regards,
David

On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul 
wrote:

> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer


Re: Serving Machine Learning models

2022-01-10 Thread David Anderson
Another approach that I find quite natural is to use Flink's Stateful
Functions API [1] for model serving, and this has some nice advantages,
such as zero-downtime deployments of new models, and the ease with which
you can use Python. [2] is an example of this approach.

[1] https://flink.apache.org/stateful-functions.html
[2] https://github.com/ververica/flink-statefun-workshop

On Fri, Jan 7, 2022 at 5:55 PM Yun Gao  wrote:

> Hi Sonia,
>
> Sorry I might not have the statistics on the provided two methods, perhaps
> as input
> I could also provide another method: currently there is an eco-project
> dl-on-flink
> that supports running DL frameworks on top of the Flink and it will handle
> the data
> exchange between java and python processes, which would allows to user the
> native
> model directly.
>
> Best,
> Yun
>
>
> [1] https://github.com/flink-extended/dl-on-flink
>
>
>
> --
> From:Sonia-Florina Horchidan 
> Send Time:2022 Jan. 7 (Fri.) 17:23
> To:user@flink.apache.org 
> Subject:Serving Machine Learning models
>
> Hello,
>
>
> I recently started looking into serving Machine Learning models for
> streaming data in Flink. To give more context, that would involve training
> a model offline (using PyTorch or TensorFlow), and calling it from inside a
> Flink job to do online inference on newly arrived data. I have found
> multiple discussions, presentations, and tools that could achieve this, and
> it seems like the two alternatives would be: (1) wrap the pre-trained
> models in a HTTP service (such as PyTorch Serve [1]) and let Flink do async
> calls for model scoring, or (2) convert the models into a standardized
> format (e.g., ONNX [2]), pre-load the model in memory for every task
> manager (or use external storage if needed) and call it for each new data
> point.
>
> Both approaches come with a set of advantages and drawbacks and, as far as
> I understand, there is no "silver bullet", since one approach could be more
> suitable than the other based on the application requirements. However, I
> would be curious to know what would be the "recommended" methods for model
> serving (if any) and what approaches are currently adopted by the users in
> the wild.
>
> [1] https://pytorch.org/serve/
>
> [2] https://onnx.ai/
>
> Best regards,
>
> Sonia
>
>
>  [image: Kth Logo]
>
> Sonia-Florina Horchidan
> PhD Student
> KTH Royal Institute of Technology
> *Software and Computer Systems (SCS)*
> School of Electrical Engineering and Computer Science (EECS)
> Mobil: +46769751562
> sf...@kth.se,  www.kth.se
>
>
>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread David Anderson
One way to solve this with Flink SQL would be to use MATCH_RECOGNIZE. [1]
is an example illustrating a very similar use case.

[1] https://stackoverflow.com/a/62122751/2000823

On Fri, Jan 7, 2022 at 11:32 AM Ali Bahadir Zeybek 
wrote:

> Hello Hans,
>
> If you would like to see some hands-on examples which showcases the
> capabilities of Flink, I would suggest you follow the training
> exercises[1].
> To be more specific, checkpointing[2] example implements a similar logic to
> what you have described.
>
> Sincerely,
>
> Ali
>
> [1]: https://github.com/ververica/flink-training
> [2]:
> https://github.com/ververica/flink-training/tree/master/troubleshooting/checkpointing
>
> On Fri, Jan 7, 2022 at 1:13 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> So in Flink we essentially have 2 main APIs to define stream topologies:
>> one is DataStream and the other one is Table API. My guess is that right
>> now you're trying to use DataStream with the Kafka connector.
>>
>> DataStream allows you to statically define a stream topology, with an API
>> in a similar fashion to Java Streams or RxJava.
>> Table API on the other hand gives you the ability to define stream jobs
>> using SQL, where you can easily perform operations such as joins over
>> windows.
>>
>> Flink is definitely able to solve your use case, with both APIs. You can
>> also mix these two APIs in your application to solve your use case in the
>> way you want.
>> I suggest you start by looking at the documentation of Table API
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
>> and then, for your specific use case, check
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
>> .
>>
>> Hope it helps.
>> FG
>>
>> On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:
>>
>>> Hi Francesco.
>>>
>>> I am not using anything right now apart from Kafka.
>>> Just need to know whether Flink is capable of doing this and trying to
>>> understand the documentation and terminology etc.
>>> I grapple a bit to understand the whole picture.
>>>
>>> Thanks
>>>
>>> Regards Hans
>>>
>>> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
>>> france...@ververica.com>:
>>>
 Hi,
 Are you using SQL or DataStream? For SQL you can use the Window TVF
 
 feature, where the window size is the "max" elapsed time, and then inside
 the window you pick the beginning and end event and join them.

 Hope it helps,
 FG

 On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:

> Hello all,
>
> My question is basically whether it is possible to group events by a
> key (these will belong to a specific transaction) and then calculate the
> elapsed times between them based on a timestamp that is present in the
> event.
> So a transaction my have x events all timestamped and with the
> transaction_id as key.
> Is it possible to
> 1. group them by the key
> 2. order by the timestamp,
> 3. calculate the elapsed times between the steps/event
> 4. add that elapsed time to the step/event
> 5. output the modified events to the sink
>
>
>
> Regards Hans
>



Re: [DISCUSS] Drop Gelly

2022-01-03 Thread David Anderson
Most of the inquiries I've had about Gelly in recent memory have been from
folks looking for a streaming solution, and it's only been a handful.

+1 for dropping Gelly

David

On Mon, Jan 3, 2022 at 2:41 PM Till Rohrmann  wrote:

> I haven't seen any changes or requests to/for Gelly in ages. Hence, I
> would assume that it is not really used and can be removed.
>
> +1 for dropping Gelly.
>
> Cheers,
> Till
>
> On Mon, Jan 3, 2022 at 2:20 PM Martijn Visser 
> wrote:
>
>> Hi everyone,
>>
>> Flink is bundled with Gelly, a Graph API library [1]. This has been
>> marked as approaching end-of-life for quite some time [2].
>>
>> Gelly is built on top of Flink's DataSet API, which is deprecated and
>> slowly being phased out [3]. It only works on batch jobs. Based on the
>> activity in the Dev and User mailing lists, I don't see a lot of questions
>> popping up regarding the usage of Gelly. Removing Gelly would reduce CI
>> time and resources because we won't need to run tests for this anymore.
>>
>> I'm cross-posting this to the User mailing list to see if there are any
>> users of Gelly at the moment.
>>
>> Let me know your thoughts.
>>
>> Martijn Visser | Product Manager
>>
>> mart...@ververica.com
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/libs/gelly/overview/
>>
>> [2] https://flink.apache.org/roadmap.html
>>
>> [3] https://lists.apache.org/thread/b2y3xx3thbcbtzdphoct5wvzwogs9sqz
>>
>> 
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>>


Re: Order of events in Broadcast State

2021-12-06 Thread David Anderson
Event ordering in Flink is only maintained between pairs of events that
take exactly the same path through the execution graph. So if you
have multiple instances of A (let's call them A1 and A2), each broadcasting
a partition of the total rule space, then one instance of B (B1) might
receive rule1 from A1 before rule2 from A2, while B2 might receive rule2
before rule1.

If it fits your needs, one simple way to avoid having problems with this is
to broadcast from a task with a parallelism of 1. Then every downstream
instance will receive the broadcast stream in the same order.

David

On Sat, Dec 4, 2021 at 2:45 AM Alexey Trenikhun  wrote:

> [1] -
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/
> The Broadcast State Pattern | Apache Flink
> 
> The Broadcast State Pattern # In this section you will learn about how to
> use broadcast state in practise. Please refer to Stateful Stream Processing
> to learn about the concepts behind stateful stream processing. Provided
> APIs # To show the provided APIs, we will start with an example before
> presenting their full functionality. As our running example, we will use
> the case where we have a ...
> nightlies.apache.org
>
>
> --
> *From:* Alexey Trenikhun 
> *Sent:* Friday, December 3, 2021 4:33 PM
> *To:* Flink User Mail List 
> *Subject:* Order of events in Broadcast State
>
> Hello,
> Trying to understand what statement "Order of events in Broadcast State
> may differ across tasks" in [1] means.
> Let's say I have keyed function "A" which broadcasting stream of rules,
> KeyedBroadcastProcessFunction  "B" receives rules and updates broadcast
> state, like example in [1]. Let's say "A" broadcasts "rule 1" with name X,
> then "A" (same key) broadcasts "rule 2" with same name X, is there
> guarantee that eventually broadcast state will contain "rule 2" or since
> there is no ordering, B could receive "rule 2", then "rule 1" and broadcast
> state will end up with {X="rule 1"} forever ?
>
> Thanks,
> Alexey
>


Re: behavior change with idle partitions and the new KafkaSource?

2021-11-22 Thread David Anderson
Thanks, Arvid.

Can you clarify how the KafkaSource currently behaves in the situation
where it starts with fewer partitions than subtasks? Is that the case
described in FLIP-180 as case 1: "Static assignment + too few splits"? The
implementation described there (emit MAX_WATERMARK) should yield equivalent
behavior to the old source -- but that doesn't seem to be what folks are
experiencing. Or is this case 3: "Dynamic assignment"?

As for where to explain all this, I think somewhere near the paragraph [1]
explaining how to migrate from FlinkKafkaConsumer would be appropriate.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

On Mon, Nov 22, 2021 at 11:24 AM Arvid Heise  wrote:

> Hi David,
>
> yes that's intentionally [1] as it could lead to correctness issues and it
> was inconsistently used across sources. Yes it should be documented.
>
> For now I'd put it in the KafkaSource docs because I'm not sure in which
> release notes it would fit best. In which release notes would you expect
> such a disclaimer?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>
> On Mon, Nov 22, 2021 at 10:37 AM David Anderson 
> wrote:
>
>> I've seen a few questions recently from folks migrating from
>> FlinkKafkaConsumer to KafkaSource that make me suspect that something has
>> changed.
>>
>> In FlinkKafkaConsumerBase we have this code which sets a source subtask
>> to idle if all of its partitions are empty when the subtask starts:
>>
>> // mark the subtask as temporarily idle if there are no initial
>> seed partitions;
>> // once this subtask discovers some partitions and starts
>> collecting records, the subtask's
>> // status will automatically be triggered back to be active.
>> if (subscribedPartitionsToStartOffsets.isEmpty()) {
>> sourceContext.markAsTemporarilyIdle();
>> }
>>
>> Unsurprisingly, people have code that depends on this behavior, and after
>> switching to KafkaSource, their tests or applications are failing to
>> produce results (because the idle partitions are now holding back the
>> watermarks). This leads me to believe that KafkaSource does not work the
>> same way.
>>
>> Can someone confirm that the behavior here has changed?
>>
>> Was this intentional? Yes, one can use withIdleness to achieve something
>> similar, but if this is now required, it needs to be documented in the
>> release notes, etc.
>>
>> David
>>
>


behavior change with idle partitions and the new KafkaSource?

2021-11-22 Thread David Anderson
I've seen a few questions recently from folks migrating from
FlinkKafkaConsumer to KafkaSource that make me suspect that something has
changed.

In FlinkKafkaConsumerBase we have this code which sets a source subtask to
idle if all of its partitions are empty when the subtask starts:

// mark the subtask as temporarily idle if there are no initial
seed partitions;
// once this subtask discovers some partitions and starts
collecting records, the subtask's
// status will automatically be triggered back to be active.
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}

Unsurprisingly, people have code that depends on this behavior, and after
switching to KafkaSource, their tests or applications are failing to
produce results (because the idle partitions are now holding back the
watermarks). This leads me to believe that KafkaSource does not work the
same way.

Can someone confirm that the behavior here has changed?

Was this intentional? Yes, one can use withIdleness to achieve something
similar, but if this is now required, it needs to be documented in the
release notes, etc.

David


Re: s3.entropy working locally but not in production

2021-11-13 Thread David Anderson
>
> It seems to work for some jobs and not for others.  Maybe jobs with little
> or empty state don't have _entropy_ swapped out correctly?


This is done by design. As the documentation explains:

The Flink runtime currently passes the option to inject entropy only to
> checkpoint data files. All other files, including checkpoint metadata and
> external URI, do not inject entropy to keep checkpoint URIs predictable.


This means that smaller pieces of state (state smaller
than state.storage.fs.memory-threshold is written with the checkpoint
metadata) won't have entropy injected.

David

On Sat, Nov 13, 2021 at 2:33 AM Dan Hill  wrote:

> It seems to work for some jobs and not for others.  Maybe jobs with little
> or empty state don't have _entropy_ swapped out correctly?
>
> On Fri, Nov 12, 2021 at 5:31 PM Dan Hill  wrote:
>
>> Hi.  My config I was able to verify my configs work locally with using
>> minio.  When I have the same code deployed to prod, the entropy key is not
>> replaced.  Any ideas?  My logs are showing the correct entropy key.
>>
>>
>> Configs:
>> state.checkpoints.dir: s3a://my-flink-state/_entropy_/checkpoints
>> state.savepoints.dir: s3a://my-flink-state/savepoints
>> s3.entropy.key: _entropy_
>> s3.entropy.length: 4
>>
>>
>> INFO [] - Loading configuration property: state.checkpoints.dir,
>> s3a://my-flink-state/_entropy_/checkpoints
>> INFO [] - Loading configuration property: state.savepoints.dir,
>> s3a://my-flink-state/savepoints
>> INFO [] - Loading configuration property: s3.entropy.key, _entropy_
>> INFO [] - Loading configuration property: s3.entropy.length, 4
>>
>


Re: Custom partitioning of keys with keyBy

2021-11-03 Thread David Anderson
Another possibility, if you know in advance the values of the keys, is to
find a mapping that transforms the original keys into new keys that will,
in fact, end up in disjoint key groups that will, in turn, be assigned to
different slots (given a specific parallelism). This is ugly, but feasible.

For reference, the key group for a given key is

MathUtils.murmurHash(key.hashCode()) % maxParallelism

and a given key group will be assigned to the slot computed by

keyGroup * actualParallelism / maxParallelism

David



On Wed, Nov 3, 2021 at 3:35 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Yuval,
>
>
>
> Just a couple of comments:
>
>
>
>- Assuming that all your 4 different keys are evenly distributed, and
>you send them to (only) 3 buckets, you would expect at least one bucket to
>cover 2 of your keys, hence the 50%
>- With low entropy keys avoiding data skew is quite difficult
>- But your situation could be worse, all 4 keys could end up in the
>same bucket, if the hash function in use happens to generate collisions for
>the 4 keys, in which case 2 of your 3 buckets would not process any events
>… this could also lead to watermarks not progressing …
>- There is two proposal on how to improve the situation:
>   - Use the same parallelism and max parallelism for the relevant
>   operators and implement a manual partitioner
>  - A manual partitioner is also good in situations where you want
>  to lower the bias and you exactly know the distribution of your key 
> space
>  and rearrange keys to even-out numbers
>   - More sophisticated (if possible), divide-and-conquer like:
>  - Key by your ‘small’ key plus soma arbitrary attribute with
>  higher entropy
>  - Window aggregate first on that artificial key
>  - Aggregate the results on your original ‘small’ key
>  - This could be interesting for high-throughput situation where
>  you actually want to run in parallelism higher than the number of 
> different
>  ‘small’ keys
>
>
>
> Hope this helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Yuval Itzchakov 
> *Sent:* Mittwoch, 3. November 2021 14:41
> *To:* user 
> *Subject:* Custom partitioning of keys with keyBy
>
>
>
> Hi,
>
> I have a use-case where I'd like to partition a KeyedDataStream a bit
> differently than how Flinks default partitioning works with key groups.
>
>
>
> What I'd like to be able to do is take all my data and split it up evenly
> between 3 buckets which will store the data in the state. Using the key
> above works, but splits the data unevenly between the different key groups,
> as usually the key space is very small (0 - 3). What ends up happening is
> that sometimes 50% of the keys end up on the same operator index, where
> ideally I'd like to distribute it evenly between all operator indexes in
> the cluster.
>
>
>
> Is there any way of doing this?
>
> --
>
> Best Regards,
> Yuval Itzchakov.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Flink application question

2021-08-09 Thread David Anderson
FYI, I've responded to this on stack overflow:
https://stackoverflow.com/questions/68715430/apache-flink-accessing-keyed-state-from-late-window


On Mon, Aug 9, 2021 at 3:16 AM suman shil  wrote:

> I am writing a Flink application which consumes time series data from
> kafka topic. Time series data has components like metric name, tag key
> value pair, timestamp and a value. I have created a tumbling window to
> aggregate data based on a metric key (which is a combination of metric
> name, key value pair and timestamp). Here is the main stream looks like
>
> kafka source -> Flat Map which parses and emits Metric ->  Key by metric
> key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
> sync.
>
> I also want to check if there is any metric which arrived late outside the
> above window. I want to check how many metrics arrived late and calculate
> the percentage of late metrics compared to original metrics. I am thinking
> of using the "allowedLateness" feature of flink to send the late metrics to
> a different stream. I am planning to add a "MapState" in the main
> "Aggregate the data" operator which will have the key as the metric key and
> value as the count of the metrics that arrived in the main window.
>
>
> kafka source -> Flat Map which parses and emits Metric -> Key by metric
> key ->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
> state of metric count) -> write to the sync.
>
>  \
>
>   \
>
> Late data -> Key by
> metric key ->  Collect late metrics and find the percentage of late metrics
> -> Write the result in sink
>
> My question is can "Collect late metrics and find the percentage of late
> metrics" operator access the "MapState" which got updated by the
> mainstream. Even though they are keyed by the same metric key, I guess they
> are two different tasks. I want to calculate (number of late metrics /
> (number of late metrics + number of metrics arrived on time)).
>
> Thanks
> Suman
>


Re: [ANNOUNCE] RocksDB Version Upgrade and Performance

2021-08-04 Thread David Anderson
I am hearing quite often from users who are struggling to manage memory
usage, and these are all users using RocksDB. While I don't know for
certain that RocksDB is the cause in every case, from my perspective,
getting the better memory stability of version 6.20 in place is critical.

Regards,
David

On Wed, Aug 4, 2021 at 8:08 AM Stephan Ewen  wrote:

> Hi all!
>
> *!!!  If you are a big user of the Embedded RocksDB State Backend and have
> performance sensitive workloads, please read this !!!*
>
> I want to quickly raise some awareness for a RocksDB version upgrade we
> plan to do, and some possible impact on application performance.
>
> *We plan to upgrade RocksDB to version 6.20.* That version of RocksDB
> unfortunately introduces some non-trivial performance regression. In our
> Nexmark Benchmark, at least one query is up to 13% slower.
> With some fixes, this can be improved, but even then there is an overall 
> *regression
> up to 6% in some queries*. (See attached table for results from relevant
> Nexmark Benchmark queries).
>
> We would do this update nonetheless, because we need to get new features
> and bugfixes from RocksDB in.
>
> Please respond to this mail thread if you have major concerns about this.
>
>
> *### Fallback Plan*
>
> Optionally, we could fall back to Plan B, which is to upgrade RocksDB only
> to version 5.18.4.
> Which has no performance regression (after applying a custom patch).
>
> While this spares us the performance degradation of RocksDB 6.20.x, this
> has multiple disadvantages:
>   - Does not include the better memory stability (strict cache control)
>   - Misses out on some new features which some users asked about
>   - Does not have the latest RocksDB bugfixes
>
> The latest point is especially bad in my opinion. While we can cherry-pick
> some bugfixes back (and have done this in the past), users typically run
> into an issue first and need to trace it back to RocksDB, then one of the
> committers can find the relevant patch from RocksDB master and backport it.
> That isn't the greatest user experience.
>
> Because of those disadvantages, we would prefer to do the upgrade to the
> newer RocksDB version despite the unfortunate performance regression.
>
> Best,
> Stephan
>
>
>


Re: StreamingFileSink only writes data to MINIO during savepoint

2021-05-31 Thread David Anderson
The StreamingFileSink requires that you have checkpointing enabled. I'm
guessing that you don't have checkpointing enabled, since that would
explain the behavior you are seeing.

The relevant section of the docs [1] explains:

Checkpointing needs to be enabled when using the StreamingFileSink. Part
> files can only be finalized on successful checkpoints. If checkpointing is
> disabled, part files will forever stay in the in-progress or the pending
> state, and cannot be safely read by downstream systems.


Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/#streaming-file-sink

On Fri, May 28, 2021 at 5:26 PM Robert Cullen  wrote:

> On my kubernetes cluster when I set the StreamingFileSink to write to a
> local instance of S3 (MINIO - 500 GB) it only writes the data after I
> execute a savepoint
>
> The expected behavior is to write the data in real-time. I'm guessing the
> memory requirements have not been met or a configuration in MINIO is
> missing?  Any ideas?
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Nested match_recognize query not supported in SQL ?

2021-05-13 Thread David Anderson
By the way, views that use MATCH_RECOGNIZE don't work in Flink 1.11. [1]

[1] https://issues.apache.org/jira/browse/FLINK-20077

On Thu, May 13, 2021 at 11:06 AM David Anderson 
wrote:

> I was able to get something like this working, but only by introducing a
> view:
>
> CREATE TEMPORARY VIEW mmm AS SELECT id FROM events MATCH_RECOGNIZE (...);
>
> SELECT * FROM event WHERE id IN (SELECT id FROM mmm);
>
> Regards,
> David
>
> On Tue, May 11, 2021 at 9:22 PM Tejas  wrote:
>
>> Hi,
>> I am using flink 1.11 and trying nested query where match_recognize is
>> inside, as shown below :
>> /select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE
>> (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW
>> PER
>> MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B
>> AS B.tag = 'tag2'));/
>>
>> And I am getting an error as :
>> /org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not
>> found/
>>
>> Is this not supported ? If not what's the alternative ?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Nested match_recognize query not supported in SQL ?

2021-05-13 Thread David Anderson
I was able to get something like this working, but only by introducing a
view:

CREATE TEMPORARY VIEW mmm AS SELECT id FROM events MATCH_RECOGNIZE (...);

SELECT * FROM event WHERE id IN (SELECT id FROM mmm);

Regards,
David

On Tue, May 11, 2021 at 9:22 PM Tejas  wrote:

> Hi,
> I am using flink 1.11 and trying nested query where match_recognize is
> inside, as shown below :
> /select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE
> (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW PER
> MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B
> AS B.tag = 'tag2'));/
>
> And I am getting an error as :
> /org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not
> found/
>
> Is this not supported ? If not what's the alternative ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread David Anderson
Well, I was thinking you could have avoided overwhelming your internal
services by using something like Flink's async i/o operator, tuned to limit
the total number of concurrent requests. That way the pipeline could have
uniform parallelism without overwhelming those services, and then you'd
rely on backpressure to throttle the sources. I'm not saying that would be
better -- it's arguably worse to have constant backpressure.

But this point I don't understand:

> running all operators at such a high scale would result in wastage of
resources, even with operator chaining in place.

Don't you have the same number of slots, each with the same resources,
either way? Plus, you have to do more ser/de, and more networking?

On Wed, May 5, 2021 at 6:08 PM vishalovercome  wrote:

> Yes. While back-pressure would eventually ensure high throughput, hand
> tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adjust the in-rate. Plus running all operators at such a high
> scale would result in wastage of resources, even with operator chaining in
> place.
>
> That's why I think more toggles are needed to make current auto-scaling
> truly shine.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread David Anderson
Interesting. So if I understand correctly, basically you limited the
parallelism of the sources in order to avoid running the job with constant
backpressure, and then scaled up the windows to maximize throughput.

On Tue, May 4, 2021 at 11:23 PM vishalovercome  wrote:

> In one of my jobs, windowing is the costliest operation while upstream and
> downstream operators are not as resource intensive. There's another
> operator
> in this job that communicates with internal services. This has high
> parallelism as well but not as much as that of the windowing operation.
> Running all operators with the same parallelism as the windowing operation
> would choke some of our internal services we'll be consuming from our
> source
> at a rate much higher than what our internal services can handle. Thus our
> sources, sinks, validation, monitoring related operators have very low
> parallelism while one has high parallelism and another has even higher
> parallelism.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread David Anderson
Could you describe a situation in which hand-tuning the parallelism of
individual operators produces significantly better throughput than the
default approach? I think it would help this discussion if we could have a
specific use case in mind where this is clearly better.

Regards,
David

On Tue, May 4, 2021 at 12:29 PM vishalovercome  wrote:

> Forgot to add one more question - 7. If maxParallelism needs to be set to
> control parallelism, then wouldn't that mean that we wouldn't ever be able
> to take a savepoint and rescale beyond the configured maxParallelism? This
> would mean that we can never achieve hand tuned resource efficient. I will
> need to set maxParallelism beyond the current parallelism and given current
> tendency to allocate same number of sub-tasks for each operator, I will
> inevitably end up with several under utilized operators.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Use State query to dump state into datalake

2021-05-03 Thread David Anderson
I think you'd be better off using the State Processor API [1] instead. The
State Processor API has cleaner semantics -- as you'll be seeing a
self-consistent snapshot of all the state -- and it's also much more
performant.

Note also that the Queryable State API is "approaching end of life" [2].
The long-term objective is to replace this with something more useful.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[2] https://flink.apache.org/roadmap.html

On Sun, May 2, 2021 at 9:07 PM Lian Jiang  wrote:

> Hi,
>
> I am interested in dumping Flink state from Rockdb to datalake using state
> query
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/.
> My map state could have 200 million key-values pairs and the total size
> could be 150G bytes. My batch job scheduled using airflow will have one
> task which uses Flink state query to dump the Flink state to datalake in
> parquet format so other spark tasks can use it.
>
> Is there any scalability concern for using state query in this way?
> Appreciate any insight. Thanks!
>


Re: Question about snapshot file

2021-04-30 Thread David Anderson
>
> So, can't we extract all previous savepoint data  by using
> ExistingSavepoint?


You can extract all of the data from any specific savepoint. Or nearly all
data, anyway. There is at least one corner case that isn't covered --
ListCheckpointed state -- which has been deprecated and isn't supported by
the savepoint API.

David

On Fri, Apr 30, 2021 at 5:42 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> So, can't we extract all previous savepoint data  by using
> ExistingSavepoint?
>
>
> Thank you
>
>
>
>
>
>
> On Fri, Apr 30, 2021 at 10:25 AM David Anderson 
> wrote:
>
>> Abdullah,
>>
>> The example you are studying -- the one using the state processor API --
>> can be used with any retained checkpoint or savepoint created while running
>> the RidesAndFaresSolution job. But this is a very special use of
>> checkpoints and savepoints that shows how to extract data from them.
>>
>> Normally the state processor API is used with savepoints, and not with
>> checkpoints. This example uses checkpoints so that the example can be
>> easily run from the IDE, without requiring a local flink installation.
>>
>> The normal use for checkpoints is for failure recovery, while savepoints
>> are typically used for redeployments and rescaling -- and in these cases
>> the state processor API is not involved. You would use "flink run -s ..."
>> on the command line to manually resume from a checkpoint or savepoint, and
>> in the case of a job failure, the restart will happen automatically.
>>
>> The flink operations playground [1] is a great way to gain more
>> understanding of these aspects of flink.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/try-flink/flink-operations-playground.html
>>
>> Best regards,
>> David
>>
>> On Fri, Apr 30, 2021 at 1:56 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Please answer me some of my below question whether my understanding
>>> correct or not, and please answer the direct ask questions.
>>>
>>> *Question no 1 (about dependency):*
>>>
>>> *What is dependency (in pom.xml) for the org.apache.flink.training?*
>>>
>>> I am trying to *import* 
>>> org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
>>> However, it can not resolve.
>>>
>>> [note that, I am using the group id: org.apache.flink
>>>
>>>
>>> *Question No 2 (which one is being load to an existing savepoint):*
>>>
>>>
>>> According to my understanding after reading [1], the name
>>> "ExistingSavepoint" looks like that it will restore all previous savepoint.
>>> However, according to [2], the input file is only a checkpointed file.
>>>
>>>
>>> *(i)* *is that mean that we can only load the last checkpointed file
>>> (in case of job failure) by using the ExistingSavepoint to restart the job
>>> where it fails?*
>>>
>>>
>>> *(ii)* *and there is no option to load all previous savepoint. is this
>>> correct?*
>>>
>>>
>>>
>>> *Question No 3 (about loading an existing savepoint):*
>>>
>>> ExecutionEnvironment bEnv = ExecutionEnvironment.
>>> *getExecutionEnvironment*();
>>>
>>> ExistingSavepoint sp = Savepoint.*load*(bEnv, "hdfs://path/", new
>>> MemoryStateBackend);
>>>
>>>
>>>
>>> This is the code for loading an existing savepoint. However, I configure
>>> a file location in flink conf to save the savepoint. So then, each time the
>>> job is running. I use a command in the terminal, ./bin/flink savepoint jobid
>>>
>>> and the savepointed file saved in the file location (that is set up in
>>> flink conf).
>>>
>>>
>>> In this case, to load the savepoint, file location will be the location
>>> that set up in the flink conf and FileSystemBackend will have to use
>>> instead of MemoryStateBackend. *is this correct?*
>>>
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> [2]
>>> https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
>>>
>>>
>>>
>>>
>>> Thank you
>>>
>>>
>>

Re: "myuid" in snapshot.readingstate

2021-04-30 Thread David Anderson
>
> What is dependency (in pom.xml) for the org.apache.flink.training?


We don't publish artifacts for this repository.

David

On Fri, Apr 30, 2021 at 5:19 PM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi David,
>
> A quick question more
>
> I am trying to *import* 
> org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
> However, it can not resolve.
>
> What is dependency (in pom.xml) for the org.apache.flink.training?
>
>
> Thank you
>
> On Fri, Apr 30, 2021 at 10:12 AM David Anderson 
> wrote:
>
>> You can read about assigning unique IDs to stateful operators in the docs
>> [1][2]. What the uid() method does is to establish a stable and unique
>> identifier for a stateful operator. Then as you evolve your application,
>> this helps ensure that future versions of your job will be able to restore
>> savepoints taken by earlier versions and find the state they need, despite
>> changes to the topology.
>>
>> The two uids specified in those two independent jobs (the streaming
>> RidesAndFaresSolution job and the batch ReadRidesAndFaresSnapshot job) must
>> match -- in other words, the strings must be the same. But there's nothing
>> in that example that ensures this will be the case.
>>
>> Regards,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-uuids-for-all-operators
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids
>>
>> On Fri, Apr 30, 2021 at 4:52 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> when we readstate of of savepooint, we use, "myuid" as a argument of the
>>> function. For example,
>>>
>>> DataSet keyedState = savepoint.readKeyedState("my-uid", new 
>>> ReaderFunction());
>>>
>>>
>>> *Question 1:*
>>>
>>> In [1] (line no 79), we get the "uid" with datastream. Then in [2] (line
>>> no 45), *how can we use the "uid" that we have got from in [1]?*
>>> Because in [2], there is no declaration of using the "uid" from [1].
>>>
>>> *Question 2:*
>>>
>>> *what does it mean by "uid" in the datastream of [1]?* is it something
>>> a unique user identification for each stream or for each state of the
>>> datastream?
>>>
>>>
>>> [1]
>>> https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
>>> [2]
>>> https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
>>>
>>> Thank you
>>>
>>>
>>>
>
> --
> Regards,
> Abu Bakar Siddiqur Rahman
>
>


Re: Question about snapshot file

2021-04-30 Thread David Anderson
Abdullah,

The example you are studying -- the one using the state processor API --
can be used with any retained checkpoint or savepoint created while running
the RidesAndFaresSolution job. But this is a very special use of
checkpoints and savepoints that shows how to extract data from them.

Normally the state processor API is used with savepoints, and not with
checkpoints. This example uses checkpoints so that the example can be
easily run from the IDE, without requiring a local flink installation.

The normal use for checkpoints is for failure recovery, while savepoints
are typically used for redeployments and rescaling -- and in these cases
the state processor API is not involved. You would use "flink run -s ..."
on the command line to manually resume from a checkpoint or savepoint, and
in the case of a job failure, the restart will happen automatically.

The flink operations playground [1] is a great way to gain more
understanding of these aspects of flink.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/try-flink/flink-operations-playground.html

Best regards,
David

On Fri, Apr 30, 2021 at 1:56 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> Please answer me some of my below question whether my understanding
> correct or not, and please answer the direct ask questions.
>
> *Question no 1 (about dependency):*
>
> *What is dependency (in pom.xml) for the org.apache.flink.training?*
>
> I am trying to *import* 
> org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
> However, it can not resolve.
>
> [note that, I am using the group id: org.apache.flink
>
>
> *Question No 2 (which one is being load to an existing savepoint):*
>
>
> According to my understanding after reading [1], the name
> "ExistingSavepoint" looks like that it will restore all previous savepoint.
> However, according to [2], the input file is only a checkpointed file.
>
>
> *(i)* *is that mean that we can only load the last checkpointed file (in
> case of job failure) by using the ExistingSavepoint to restart the job
> where it fails?*
>
>
> *(ii)* *and there is no option to load all previous savepoint. is this
> correct?*
>
>
>
> *Question No 3 (about loading an existing savepoint):*
>
> ExecutionEnvironment bEnv = ExecutionEnvironment.*getExecutionEnvironment*
> ();
>
> ExistingSavepoint sp = Savepoint.*load*(bEnv, "hdfs://path/", new
> MemoryStateBackend);
>
>
>
> This is the code for loading an existing savepoint. However, I configure a
> file location in flink conf to save the savepoint. So then, each time the
> job is running. I use a command in the terminal, ./bin/flink savepoint jobid
>
> and the savepointed file saved in the file location (that is set up in
> flink conf).
>
>
> In this case, to load the savepoint, file location will be the location
> that set up in the flink conf and FileSystemBackend will have to use
> instead of MemoryStateBackend. *is this correct?*
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> [2]
> https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
>
>
>
>
> Thank you
>
>
>
>
>
>
> On Fri, Apr 23, 2021 at 10:10 AM David Anderson 
> wrote:
>
>> Abdullah,
>>
>> ReadRidesAndFaresSnapshot [1] is an example that shows how to use the
>> State Processor API to display the contents of a snapshot taken while
>> running RidesAndFaresSolution [2].
>>
>> Hopefully that will help you get started.
>>
>> [1]
>> https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
>> [2]
>> https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
>>
>> Best regards,
>> David
>>
>> On Fri, Apr 23, 2021 at 3:32 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thank you for your reply.
>>>
>>> I want to read the previous snapshot (if needed) at the time of
>>> operation. In [1], there is a portion:
>>>
>>> DataSet listState  = savepoint.readListState<>(
>>> "my-uid",
>>> "list-state",
>>> Types.INT);
>>>
>>>
>>> here, will the function savepoint.readliststate<> () work to read the
>>> previous snapshot?  If it is, then is th

Re: "myuid" in snapshot.readingstate

2021-04-30 Thread David Anderson
You can read about assigning unique IDs to stateful operators in the docs
[1][2]. What the uid() method does is to establish a stable and unique
identifier for a stateful operator. Then as you evolve your application,
this helps ensure that future versions of your job will be able to restore
savepoints taken by earlier versions and find the state they need, despite
changes to the topology.

The two uids specified in those two independent jobs (the streaming
RidesAndFaresSolution job and the batch ReadRidesAndFaresSnapshot job) must
match -- in other words, the strings must be the same. But there's nothing
in that example that ensures this will be the case.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-uuids-for-all-operators
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#assigning-operator-ids

On Fri, Apr 30, 2021 at 4:52 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> when we readstate of of savepooint, we use, "myuid" as a argument of the
> function. For example,
>
> DataSet keyedState = savepoint.readKeyedState("my-uid", new 
> ReaderFunction());
>
>
> *Question 1:*
>
> In [1] (line no 79), we get the "uid" with datastream. Then in [2] (line
> no 45), *how can we use the "uid" that we have got from in [1]?* Because
> in [2], there is no declaration of using the "uid" from [1].
>
> *Question 2:*
>
> *what does it mean by "uid" in the datastream of [1]?* is it something a
> unique user identification for each stream or for each state of the
> datastream?
>
>
> [1]
> https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
> [2]
> https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
>
> Thank you
>
>
>


Re: Flink Metric isBackPressured not available

2021-04-26 Thread David Anderson
The isBackPressured metric is a Boolean -- it reports true or false, rather
than 1 or 0. The Flink web UI can not display it (it shows NaN); perhaps
the same is true for Datadog.

https://issues.apache.org/jira/browse/FLINK-15753 relates to this.

Regards,
David

On Tue, Apr 13, 2021 at 12:13 PM Claude M  wrote:

> Thanks for your reply.  I'm using Flink 1.12.  I'm checking in Datadog and
> the metric is not available there.
> It has other task/operator metrics such as numRecordsIn/numRecordsOut
> there but not the isBackPressured.
>
>
> On Mon, Apr 12, 2021 at 8:40 AM Roman Khachatryan 
> wrote:
>
>> Hi,
>>
>> The metric is registered upon task deployment and reported periodically.
>>
>> Which Flink version are you using? The metric was added in 1.10.
>> Are you checking it in the UI?
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 9, 2021 at 8:50 PM Claude M  wrote:
>> >
>> > Hello,
>> >
>> > The documentation here
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
>> states there is a isBackPressured metric available yet I don't see it.  Any
>> ideas why?
>> >
>> >
>> > Thanks
>>
>


Re: Approaches for external state for Flink

2021-04-25 Thread David Anderson
>
> Now, I'm just worried about the state size. State size will grow forever.
> There is no TTL.


The potential for unbounded state is certainly a problem, and it's going to
be a problem no matter how you implement the deduplication. Standard
techniques for mitigating this include (1) limiting the timeframe for
deduplication, and/or (2) using bloom filters to reduce the storage needed
in exchange for some (bounded percentage of) false positives.  But since
you must store data from stream1 to use later for enrichment, I think bloom
filters are only potentially relevant for deduplicating stream2.

Do you have any temporal constraints on how the enrichment of stream2 is
done? For example, if an event from stream2 arrives before the
corresponding event from stream1 has been processed, can you simply ignore
the event from stream2? Or should it be buffered, and enriched later? I ask
this because checkpointing can become challenging at scale when joining two
streams, if there's a requirement to buffer one of the streams so the other
can catch up.

Flink may or may not be the best choice for your application. The devil is
in the details.

Regards,
David

On Sun, Apr 25, 2021 at 12:25 PM Omngr 
wrote:

> Thank you David. That's perfect.
>
> Now, I'm just worried about the state size. State size will grow forever.
> There is no TTL.
>
> 24 Nis 2021 Cmt 17:42 tarihinde David Anderson 
> şunu yazdı:
>
>> What are the other techniques for bootstrapping rocksdb state?
>>
>>
>> Bootstrapping state involves somehow creating a snapshot (typically a
>> savepoint, but a retained checkpoint can be a better choice in some cases)
>> containing the necessary state -- meaning that the state has the same
>> operator uid and and state descriptor used by the real streaming job.
>>
>> You can do this by either: (1) running a variant of the live streaming
>> job against the data used for bootstrapping and taking a snapshot when the
>> data has been fully ingested, or (2) by using the State Processor API [1].
>> You'll find a trivial example of the second approach in [2]. Once you have
>> a suitable snapshot, you can run your real job against it.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
>> [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>>
>> Regards,
>> David
>>
>> On Sat, Apr 24, 2021 at 3:01 PM Omngr 
>> wrote:
>>
>>> Hi David, thank you for your response first!
>>>
>>> The state size is about 1 TB for now, but it will increase fastly, and
>>> also I can not use the TLL for states. It will grow indefinitely.
>>> What are the other techniques for bootstrapping rocksdb state?
>>>
>>> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
>>> şunu yazdı:
>>>
>>>> Oguzhan,
>>>>
>>>> Note, the state size is very large and I have to feed the state from
>>>>> batch flow firstly. Thus I can not use the internal state like rocksdb.
>>>>
>>>>
>>>> How large is "very large"? Using RocksDB, several users have reported
>>>> working with jobs using many TBs of state.
>>>>
>>>> And there are techniques for bootstrapping the state. That doesn't have
>>>> to be a showstopper.
>>>>
>>>> May be any bottleneck in that flow? I think to use asyncMap functions
>>>>> for state read/write operations.
>>>>
>>>>
>>>> That's a good reason to reconsider using Flink state.
>>>>
>>>> Regards,
>>>> David
>>>>
>>>>
>>>>
>>>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>>>> sosyalmedya.oguz...@gmail.com> wrote:
>>>>
>>>>> I'm trying to design a stream flow that checks *de-duplicate* events
>>>>> and sends them to the Kafka topic.
>>>>>
>>>>> Basically, flow looks like that;
>>>>>
>>>>> kafka (multiple topics) =>  flink (checking de-duplication and event
>>>>> enrichment) => kafka (single topic)
>>>>>
>>>>> For de-duplication, I'm thinking of using Cassandra as an external
>>>>> state store. The details of my job;
>>>>>
>>>>> I have an event payload with *uuid* Field. If the event that has the
>>>>> same uuid will come, this event should be discarded. In my case, two kafka
>>>>> topics are reading. The first topic has a lot of fields, but other topics
>>>>> jus

Re: Approaches for external state for Flink

2021-04-24 Thread David Anderson
>
> What are the other techniques for bootstrapping rocksdb state?


Bootstrapping state involves somehow creating a snapshot (typically a
savepoint, but a retained checkpoint can be a better choice in some cases)
containing the necessary state -- meaning that the state has the same
operator uid and and state descriptor used by the real streaming job.

You can do this by either: (1) running a variant of the live streaming job
against the data used for bootstrapping and taking a snapshot when the data
has been fully ingested, or (2) by using the State Processor API [1].
You'll find a trivial example of the second approach in [2]. Once you have
a suitable snapshot, you can run your real job against it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
[2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf

Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr  wrote:

> Hi David, thank you for your response first!
>
> The state size is about 1 TB for now, but it will increase fastly, and
> also I can not use the TLL for states. It will grow indefinitely.
> What are the other techniques for bootstrapping rocksdb state?
>
> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
> şunu yazdı:
>
>> Oguzhan,
>>
>> Note, the state size is very large and I have to feed the state from
>>> batch flow firstly. Thus I can not use the internal state like rocksdb.
>>
>>
>> How large is "very large"? Using RocksDB, several users have reported
>> working with jobs using many TBs of state.
>>
>> And there are techniques for bootstrapping the state. That doesn't have
>> to be a showstopper.
>>
>> May be any bottleneck in that flow? I think to use asyncMap functions for
>>> state read/write operations.
>>
>>
>> That's a good reason to reconsider using Flink state.
>>
>> Regards,
>> David
>>
>>
>>
>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>> sosyalmedya.oguz...@gmail.com> wrote:
>>
>>> I'm trying to design a stream flow that checks *de-duplicate* events
>>> and sends them to the Kafka topic.
>>>
>>> Basically, flow looks like that;
>>>
>>> kafka (multiple topics) =>  flink (checking de-duplication and event
>>> enrichment) => kafka (single topic)
>>>
>>> For de-duplication, I'm thinking of using Cassandra as an external state
>>> store. The details of my job;
>>>
>>> I have an event payload with *uuid* Field. If the event that has the
>>> same uuid will come, this event should be discarded. In my case, two kafka
>>> topics are reading. The first topic has a lot of fields, but other topics
>>> just have a *uuid* field, thus I have to enrich data using the same
>>> uuid for the events coming from the second topic.
>>>
>>> Stream1: Messages reading from the first topic. Read state from
>>> Cassandra using the *uuid*. If a state exists, ignore this event and *do
>>> not* emit to the Kafka. If state does not exist, save  this event to
>>> the Cassandra, then emit this event to the Kafka.
>>>
>>> Stream2: Messages reading from the second topic. Read state from
>>> Cassandra using the *uuid*. If state exists, check a column that
>>> represents this event came from topic2. If the value of this column is
>>> false, enrich the event using state and update the Cassandra column as
>>> true. If true, ignore this event because this event is a duplicate.
>>>
>>> def checkDeDuplication(event): Option[Event] = {
>>>   val state = readFromCassandra(state)
>>>   if (state exist) None //ignore this event
>>>   else {
>>> saveEventToCassandra(event)
>>> Some(event)
>>>   }
>>> }
>>>
>>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>>>   val state = readFromCassandra(state)
>>>   if (state does not exist) None //ignore this event
>>>   else {
>>> if (state.flag == true) None // ignore this event
>>> else {
>>>updateFlagAsTrueInCassandra(event)
>>>Some(event)
>>> }
>>>   }
>>> }
>>>
>>>
>>> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
>>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
>>> stream1.union(stream2).addSink(kafkaSink)
>>>
>>> 1- Is that a good approach?
>>>
>>> 2- Is Cassandra the right choice here? Note, the state size is very
>>> large and I have to feed the state from batch flow firstly. Thus I can not
>>> use the internal state like rocksdb.
>>>
>>> 3- Can i improve this logic?
>>>
>>> 4- May be any bottleneck in that flow? I think to use asyncMap functions
>>> for state read/write operations.
>>>
>>


Re: Approaches for external state for Flink

2021-04-24 Thread David Anderson
Oguzhan,

Note, the state size is very large and I have to feed the state from batch
> flow firstly. Thus I can not use the internal state like rocksdb.


How large is "very large"? Using RocksDB, several users have reported
working with jobs using many TBs of state.

And there are techniques for bootstrapping the state. That doesn't have to
be a showstopper.

May be any bottleneck in that flow? I think to use asyncMap functions for
> state read/write operations.


That's a good reason to reconsider using Flink state.

Regards,
David



On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events and
> sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external state
> store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the same
> uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same uuid
> for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from Cassandra
> using the *uuid*. If a state exists, ignore this event and *do not* emit
> to the Kafka. If state does not exist, save  this event to the Cassandra,
> then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from Cassandra
> using the *uuid*. If state exists, check a column that represents this
> event came from topic2. If the value of this column is false, enrich the
> event using state and update the Cassandra column as true. If true, ignore
> this event because this event is a duplicate.
>
> def checkDeDuplication(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state exist) None //ignore this event
>   else {
> saveEventToCassandra(event)
> Some(event)
>   }
> }
>
> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state does not exist) None //ignore this event
>   else {
> if (state.flag == true) None // ignore this event
> else {
>updateFlagAsTrueInCassandra(event)
>Some(event)
> }
>   }
> }
>
>
> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
> stream1.union(stream2).addSink(kafkaSink)
>
> 1- Is that a good approach?
>
> 2- Is Cassandra the right choice here? Note, the state size is very large
> and I have to feed the state from batch flow firstly. Thus I can not use
> the internal state like rocksdb.
>
> 3- Can i improve this logic?
>
> 4- May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.
>


Re: Question about snapshot file

2021-04-23 Thread David Anderson
Abdullah,

ReadRidesAndFaresSnapshot [1] is an example that shows how to use the State
Processor API to display the contents of a snapshot taken while running
RidesAndFaresSolution [2].

Hopefully that will help you get started.

[1]
https://github.com/ververica/flink-training/blob/master/state-processor/src/main/java/com/ververica/flink/training/exercises/ReadRidesAndFaresSnapshot.java
[2]
https://github.com/ververica/flink-training/blob/master/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java

Best regards,
David

On Fri, Apr 23, 2021 at 3:32 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> Thank you for your reply.
>
> I want to read the previous snapshot (if needed) at the time of operation.
> In [1], there is a portion:
>
> DataSet listState  = savepoint.readListState<>(
> "my-uid",
> "list-state",
> Types.INT);
>
>
> here, will the function savepoint.readliststate<> () work to read the
> previous snapshot?  If it is, then is the filename of a savepoint file
> similar to my-uid?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Thank you
>
>
>
>
> On Fri, Apr 23, 2021 at 1:11 AM Matthias Pohl 
> wrote:
>
>> What is it you're trying to achieve in general? The JavaDoc of
>> MetadataV2V3SerializerBase provides a description on the format of the
>> file. Theoretically, you could come up with custom code using the Flink
>> sources to parse the content of the file. But maybe, there's another way to
>> accomplish what you're trying to do.
>>
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>>
>> On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a savepoint or checkpointed file from my task. However, the file
>>> is binary. I want to see what the file contains.
>>>
>>> How is it possible to see what information the file has (or how it is
>>> possible to make it human readable?)
>>>
>>> Thank you
>>>
>>> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Abdullah,
 the metadata file contains handles to the operator states of the
 checkpoint [1]. You might want to have a look into the State Processor API
 [2].

 Best,
 Matthias

 [1]
 https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
 [2]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

 On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
 abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> (1) what 's the snapshot metadata file (binary) contains ? is it
> possible to read the snapshot metadata file by using Flink 
> Deserialization?
>
> (2) is there any function that can be used to see the previous
> states on time of operation?
>
> Thank you
>

>>


Re: WindowFunction is stuck until next message is processed although Watermark with idle timeout is applied.

2021-04-15 Thread David Anderson
The withIdleness option does not attempt to handle situations where all of
the sources are idle.

Flink operators with multiple input channels keep track of the current
watermark from each channel, and use the minimum of these watermarks as
their own watermark. withIdleness marks idle channels as inactive, which
then indicates to the runtime that those channels should not be taken into
consideration, so that their stalled watermark won't hold back the overall
watermark. This isn't enough to keep the watermarks advancing when
everything is idle.

Advancing the watermark when no events are flowing at all is something you
can do in your application, by implementing your own idleness detection and
advancing the watermark artificially based on the progress of system time.

Regards,
David

On Thu, Apr 15, 2021 at 3:04 AM Sung Gon Yi  wrote:

> Hello,
>
> I have a question about watermark with idle timeout.
>
> I made an example about it,
> https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala
>
> There is WindowFunction with 5 sec tumbling window and messages are
> imported per 120 sec.
> And idle timeout is set by 30 sec.
>
> However, when running, first message had been processed after 120 sec,
> which means next message has been imported.
>
> Please, tell me what I misunderstand about idle timeout and how to solve
> this problem.
>
> Thanks,
> Sung Gon
>


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread David Anderson
Yes, since the two streams have the same type, you can union the two
streams, key the resulting stream, and then apply something like a
RichFlatMapFunction. Or you can connect the two streams (again, they'll
need to be keyed so you can use state), and apply a RichCoFlatMapFunction.
You can use whichever of these approaches is simpler for your use case.

On Mon, Mar 29, 2021 at 7:56 AM vishalovercome  wrote:

> I've gone through the example as well as the documentation and I still
> couldn't understand whether my use case requires joining. 1. What would
> happen if I didn't join? 2. As the 2 incoming data streams have the same
> type, if joining is absolutely necessary then just a union
> (oneStream.union(anotherStream)) followed by a keyBy should be good enough
> right? I am asking this because I would prefer to use the simple
> RichMapFunction or RichFlatMapFunction as opposed to the
> RichCoFlatMapFunction. Thanks a lot!
> --
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: How to visualize the results of Flink processing or aggregation?

2021-03-26 Thread David Anderson
Prometheus is a metrics system; you can use Flink's Prometheus metrics
reporter to send metrics to Prometheus.

Grafana can also be connected to influxdb, and to databases like mysql and
postgresql, for which sinks are available.

And the Elasticsearch sink can be used to create visualizations with Kibana.

I'm sure there are other solutions as well, but these are some of the
popular ones.

Regards,
David

On Fri, Mar 26, 2021 at 5:15 PM Xiong Qiang 
wrote:

> Hi All,
>
> I am new to Flink, so forgive me if it is a naive question.
>
> The context is:
> We have a data streaming coming in, and we will use Flink applications to
> do the processing or aggregations. After the processing or aggregation, we
> need some approaches to visualize the results, to either build a dashboard
> or setup alerts, for example, using Prometheus and Grafana.
> However, after reading the documents (
> https://flink.apache.org/flink-architecture.html and more links) and
> examples (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html)
> (
> https://github.com/ververica/flink-training/blob/master/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
> , *I am still not able to close the gap between Flink and a
> monitoring/dashboard tool, e.g. Prometheus/Grafana. *
>
> *The question is:*
> *How are processing results connected/sinked from Flink to
> Prometheus/Grafana? *for example, in the fraud detection example, how is
> the account id = 3, send to Prometheus and Grafana, so that I have a
> dashboard showing there is one suspected account? In the taxi long rides
> example, how do I send the count of long rides from Flink to
> Prometheus/Grafana?
>
> I understand there are sinks (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/).
> However, I didn't see sinks for Prometheus.
>
> Hope I made my question clear.
>
> Thanks
>


  1   2   3   >