Re: Flink sink never executes

2021-01-04 Thread Kostas Kloudas
Hi Ben,

Sorry for the late reply but I guess that your question was answered
in StackOverflow, right?
Did that answer solve your problem?

Cheers,
Kostas

On Mon, Dec 21, 2020 at 9:09 AM Ben Beasley  wrote:
>
> First off I want to thank the folks in this email list for their help thus 
> far.
>
>
>
> I’m facing another strange issue where if I add a window to my stream, the 
> sink no longer executes. However the sink executes without the windowing. I 
> described my problem on stackoverflow so that the code is easy to read.
>
>
>
> I wonder if anyone can help me once more, I believe the solution could be 
> simple for someone familiar with the code. I believe I’ve followed the 
> tutorials and articles on the flink website correctly.


Re: No execution.target specified in your configuration file

2020-12-21 Thread Kostas Kloudas
Glad I could help!

On Mon, Dec 21, 2020 at 3:42 AM Ben Beasley  wrote:
>
> That worked. Thankyou, Kostas.
>
>
>
> From: Kostas Kloudas 
> Date: Sunday, December 20, 2020 at 7:21 AM
> To: Ben Beasley 
> Cc: user@flink.apache.org 
> Subject: Re: No execution.target specified in your configuration file
>
> Hi Ben,
>
> You can try using StreamExecutionEnvironment
> streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> instead of directly creating a new one. This will allow to pick up the
> configuration parameters you pass through the command line.
>
> I hope this helps,
> Kostas
>
> On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
> >
> > I was wondering if I could get help with the issue described in this 
> > stackoverflow post.


Re: No execution.target specified in your configuration file

2020-12-20 Thread Kostas Kloudas
Hi Ben,

You can try using StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
instead of directly creating a new one. This will allow to pick up the
configuration parameters you pass through the command line.

I hope this helps,
Kostas

On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
>
> I was wondering if I could get help with the issue described in this 
> stackoverflow post.


Re: Dynamic ad hoc query deployment strategy

2020-11-23 Thread Kostas Kloudas
Hi Lalala,

Even in session mode, the jobgraph is created before the job is
executed. So all the above hold.
Although I am not super familiar with the catalogs, what you want is
that two or more jobs share the same readers of a source. This is not
done automatically in DataStream or DataSet and I am pretty sure that
also Table and SQL do not perform any cross-query optimization.

In addition, even if they did, are you sure that this would be enough
for your queries? THe users will submit their queries at any point in
time and this would mean that each query would start processing from
where the reader is at that point in time, which is arbitrary. Is this
something that satisfies your requirements?

I will also include Dawid in the discussion to see if he has anything
to add about the Table API and SQL.

Cheers,
Kostas

On Fri, Nov 20, 2020 at 7:47 PM lalala  wrote:
>
> Hi Kostas,
>
> Thank you for your response.
>
> Is what you are saying valid for session mode? I can submit my jobs to the
> existing Flink session, will they be able to share the sources?
>
> We do register our Kafka tables to `GenericInMemoryCatalog`, and the
> documentation says `The GenericInMemoryCatalog is an in-memory
> implementation of a catalog. All objects will be available only for the
> lifetime of the session.`. I presume, in session mode, we can share Kafka
> source for multiple SQL jobs?
>
> That is not want we wanted for the best isolation, but if it is not possible
> with Flink, we are also good with session mode.
>
> Best regards,
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-20 Thread Kostas Kloudas
Hi Hector,

The main reasons for deprecating the readFileStream() was that:
1) it was only capable of parsing Strings and in a rather limited way
as one could not even specify the encoding
2) it was not fault-tolerant, so your concerns about exactly-once were
not covered

One concern that I can find about keeping the last read index for
every file that we have seen so far,
is that this would simply blow up the memory.

Two things I would like to also mention are that:
1) the method has been deprecated a long time ago.
2) there is a new FileSource coming with 1.12 that may be interesting
for you [1].

Cheers,
Kostas

 [1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java

On Tue, Nov 17, 2020 at 4:30 AM Hector He  wrote:
>
> May I have a ask about deprecating readFileStream(...), is there a
> alternative to this method? Source code lead me to use readFile instead, but
> it does not perform as readFileStream, readFileStream can reads file content
> incrementally, but readFile with FileProcessingMode.PROCESS_CONTINUOUSLY
> argument reads all file conent every time when the content changes. So why
> will Flink make readFileStream to be deprecated but without a better
> alternative?
>
> From the description of official document below link,
> FileProcessingMode.PROCESS_CONTINUOUSLY will break the “exactly-once”
> semantics.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread Kostas Kloudas
I am also cc'ing Timo to see if he has anything more to add on this.

Cheers,
Kostas

On Thu, Nov 19, 2020 at 9:41 PM Kostas Kloudas  wrote:
>
> Hi,
>
> Thanks for reaching out!
>
> First of all, I would like to point out that an interesting
> alternative to the per-job cluster could be running your jobs in
> application mode [1].
>
> Given that you want to run arbitrary SQL queries, I do not think you
> can "share" across queries the part of the job graph that reads a
> topic. In general, Flink (not only in SQL) creates the graph of a job
> before the job is executed. And especially in SQL you do not even have
> control over the graph, as the translation logic from query to
> physical operators is opaque and not exposed to the user.
>
> That said, you may want to have a look at [2]. It is pretty old but it
> describes a potentially similar usecase. Unfortunately, it does not
> support SQL.
>
> Cheers,
> Kostas
>
> [1] https://flink.apache.org/news/2020/07/14/application-mode.html
> [2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king
>
> On Sun, Nov 15, 2020 at 10:11 AM lalala  wrote:
> >
> > Hi all,
> >
> > I would like to consult with you regarding deployment strategies.
> >
> > We have +250 Kafka topics that we want users of the platform to submit SQL
> > queries that will run indefinitely. We have a query parsers to extract topic
> > names from user queries, and the application locally creates Kafka tables
> > and execute the query. The result can be collected to multiple sinks such as
> > databases, files, cloud services.
> >
> > We want to have the best isolation between queries, so in case of failures,
> > the other jobs will not get affected. We have a huge YARN cluster to handle
> > 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> > sense for the sake of isolation. However, that creates some scalability
> > problems. There might be SQL queries running on the same Kafka topic that we
> > do not want to read them again for each query in different sessions. The
> > ideal case is that we read the topic once and executes multiple queries on
> > this data to avoid rereading the same topic. That breaks the desire of a
> > fully isolated system, but it improves network and Kafka performance and
> > still provides isolation on the topic level as we just read the topic once
> > and execute multiple SQL queries on it.
> >
> > We are quite new to Flink, but we have experience with Spark. In Spark, we
> > can submit an application, and in master, that can listen a query queue and
> > submit jobs to the cluster dynamically from different threads. However, In
> > Flink, it looks like the main() has to produce the job the graph in advance.
> >
> > We do use an EMR cluster; what would you recommend for my use case?
> >
> > Thank you.
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamic ad hoc query deployment strategy

2020-11-19 Thread Kostas Kloudas
Hi,

Thanks for reaching out!

First of all, I would like to point out that an interesting
alternative to the per-job cluster could be running your jobs in
application mode [1].

Given that you want to run arbitrary SQL queries, I do not think you
can "share" across queries the part of the job graph that reads a
topic. In general, Flink (not only in SQL) creates the graph of a job
before the job is executed. And especially in SQL you do not even have
control over the graph, as the translation logic from query to
physical operators is opaque and not exposed to the user.

That said, you may want to have a look at [2]. It is pretty old but it
describes a potentially similar usecase. Unfortunately, it does not
support SQL.

Cheers,
Kostas

[1] https://flink.apache.org/news/2020/07/14/application-mode.html
[2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king

On Sun, Nov 15, 2020 at 10:11 AM lalala  wrote:
>
> Hi all,
>
> I would like to consult with you regarding deployment strategies.
>
> We have +250 Kafka topics that we want users of the platform to submit SQL
> queries that will run indefinitely. We have a query parsers to extract topic
> names from user queries, and the application locally creates Kafka tables
> and execute the query. The result can be collected to multiple sinks such as
> databases, files, cloud services.
>
> We want to have the best isolation between queries, so in case of failures,
> the other jobs will not get affected. We have a huge YARN cluster to handle
> 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> sense for the sake of isolation. However, that creates some scalability
> problems. There might be SQL queries running on the same Kafka topic that we
> do not want to read them again for each query in different sessions. The
> ideal case is that we read the topic once and executes multiple queries on
> this data to avoid rereading the same topic. That breaks the desire of a
> fully isolated system, but it improves network and Kafka performance and
> still provides isolation on the topic level as we just read the topic once
> and execute multiple SQL queries on it.
>
> We are quite new to Flink, but we have experience with Spark. In Spark, we
> can submit an application, and in master, that can listen a query queue and
> submit jobs to the cluster dynamically from different threads. However, In
> Flink, it looks like the main() has to produce the job the graph in advance.
>
> We do use an EMR cluster; what would you recommend for my use case?
>
> Thank you.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Kostas Kloudas
Hi Ashwin,

Do you have any filtering or aggregation (or any operation that emits
less data than it receives) in your logic? If yes, you could for
example put if before the reschaling operation so that it gets chained
to your source and you reduce the amount of data you ship through the
network. After that, then it boils down to optimizing your code I
guess, as Till said. Also you can check if the rescaling has any
effect, because if not, then you could also remove it.

Kostas

On Mon, Nov 9, 2020 at 10:12 AM ashwinkonale  wrote:
>
> Hi Till,
> Thanks a lot for the reply. The problem I am facing is as soon as I add
> network(remove chaining) to discarding sink, I have huge problem with
> throughput. Do you have any pointers on how can I go about debugging this ?
>
> - Ashwin
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Understanding kafka watermark strategy assigner

2020-11-09 Thread Kostas Kloudas
Hi Nikola,

Apart from the potential overhead you mentioned about having one more
operator, I cannot find any other. Also even this one I think is
negligible.
The reason why we recommend attaching the Watermark Generator to the
source is more about semantics rather than efficiency. It seems
natural for a pipeline whose logic depends on event time to have its
Watermarks generated at the source.

Cheers,
Kostas

On Sun, Nov 8, 2020 at 8:14 PM Nikola Hrusov  wrote:
>
> Hi,
>
> I am reading about the watermark creation of the kafka streams using the 
> article here: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> In there, it is a given example where the watermark assigner is directly 
> attached to the consumer like so (solution 1):
>
>> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic", 
>> new SimpleStringSchema(), properties);
>> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>> env.addSource(myConsumer)
>
>
> Then we can use that by adding it as a source and continue with the 
> application.
>
> My question is, would that have any/much difference against doing it after 
> the source? Something like this (solution 2):
>
>>
>> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic", 
>> new SimpleStringSchema(), properties);
>> env
>>
>> .addSource(myConsumer)
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>
>
> I can eventually think that it would create an extra operator, but is there 
> any other [unnecessary] overhead that solution 2 will give over solution 1?
> I tried running a simple job, but I couldn't see much difference. I would 
> like to know if there is something I am unaware of and I can do better.
>
> Regards,
> Nikola Hrusov
>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-11-06 Thread Kostas Kloudas
Hi Flavio,

Coould this https://issues.apache.org/jira/browse/FLINK-20020 help?

Cheers,
Kostas

On Thu, Nov 5, 2020 at 9:39 PM Flavio Pompermaier  wrote:
>
> Hi everybody,
> I was trying to use the JobListener in my job but onJobExecuted() on Flink 
> 1.11.0 but I can't understand if the job succeeded or not.
> If I look at the Javadoc of the JobListener.onJobExecute() [1] says "Callback 
> on job execution finished, successfully or unsuccessfully"
> but I can't find any simple way to infer if the job has finished successfully 
> or not.
> Do I need to perform another remote call from the client to get the job 
> details using the job id?
> I'm quite surprised that the execution result (FINISHED / CANCELED / FAILED) 
> in not available in the JobExecutionResult.
> Another strange thing is that the jobExecutionResult.getJobExecutionResult() 
> returns itself..is it correct?
>
> Thanks in advance,
> Flavio
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>
> On Fri, Oct 9, 2020 at 1:09 PM Matthias  wrote:
>>
>> Reviving this thread again after I came across FLINK-12214 [1] since there
>> are use cases which might benefit from this feature. Was there some
>> conclusion on public APIs in the meantime? Should we proceed with the
>> discussion here?
>>
>> Best,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12214
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809


Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Kostas Kloudas
Could you also post the ticket here @Flavio Pompermaier  and we will
have a look before the upcoming release.

Thanks,
Kostas

On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler  wrote:
>
> Good find, this is an oversight in the CliFrontendParser; no help is
> printed for the run-application action.
> Can you file a ticket?
>
> On 11/4/2020 10:53 AM, Flavio Pompermaier wrote:
> > Hello everybody,
> > I was looking into currently supported application-modes when
> > submitting a Flink job so I tried to use the CLI help (I'm using Flink
> > 11.0) but I can't find any help about he action "run-application" at
> > the moment...am I wrong? Is there any JIRA to address this missing
> > documentation?
> >
> > Best,
> > Flavio
>
>


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Thanks for the discussion!

>From this thread I do not see any objection with moving forward with
removing the sink.
Given this I will open a voting thread tomorrow.

Cheers,
Kostas

On Wed, Oct 28, 2020 at 6:50 PM Stephan Ewen  wrote:
>
> +1 to remove the Bucketing Sink.
>
> It has been very common in the past to remove code that was deprecated for 
> multiple releases in favor of reducing baggage.
> Also in cases that had no perfect drop-in replacement, but needed users to 
> forward fit the code.
> I am not sure I understand why this case is so different.
>
> Why the Bucketing Sink should be thrown out, in my opinion:
>
> The Bucketing sink makes it easier for users to add general Hadoop writes.
> But the price is that it easily leads to dataloss, because it assumes 
> flush()/sync() work reliably on Hadoop relicably, which they don't (HDFS 
> works somewhat, S3 works not at all).
> I think the Bucketing sink is a trap for users, that's why it was deprecated 
> long ago.
>
> The StreamingFileSink covers the majority of cases from the Bucketing Sink.
> It does have some friction when adding/wrapping some general Hadoop writers. 
> Parts will be solved with the transactional sink work.
> If something is missing and blocking users, we can prioritize adding it to 
> the Streaming File Sink. Also that is something we did before and it helped 
> being pragmatic with moving forward, rather than being held back by "maybe 
> there is something we don't know".
>
>
>
>
> On Wed, Oct 28, 2020 at 12:36 PM Chesnay Schepler  wrote:
>>
>> Then we can't remove it, because there is no way for us to ascertain
>> whether anyone is still using it.
>>
>> Sure, the user ML is the best we got, but you can't argue that we don't
>> want any users to be affected and then use an imperfect mean to find users.
>> If you are fine with relying on the user ML, then you _are_ fine with
>> removing it at the cost of friction for some users.
>>
>> To be clear, I, personally, don't have a problem with removing it (we
>> have removed other connectors in the past that did not have a migration
>> plan), I just reject he argumentation.
>>
>> On 10/28/2020 12:21 PM, Kostas Kloudas wrote:
>> > No, I do not think that "we are fine with removing it at the cost of
>> > friction for some users".
>> >
>> > I believe that this can be another discussion that we should have as
>> > soon as we establish that someone is actually using it. The point I am
>> > trying to make is that if no user is using it, we should remove it and
>> > not leave unmaintained code around.
>> >
>> > On Wed, Oct 28, 2020 at 12:11 PM Chesnay Schepler  
>> > wrote:
>> >> The alternative could also be to use a different argument than "no one
>> >> uses it", e.g., we are fine with removing it at the cost of friction for
>> >> some users because there are better alternatives.
>> >>
>> >> On 10/28/2020 10:46 AM, Kostas Kloudas wrote:
>> >>> I think that the mailing lists is the best we can do and I would say
>> >>> that they seem to be working pretty well (e.g. the recent Mesos
>> >>> discussion).
>> >>> Of course they are not perfect but the alternative would be to never
>> >>> remove anything user facing until the next major release, which I find
>> >>> pretty strict.
>> >>>
>> >>> On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  
>> >>> wrote:
>> >>>> If the conclusion is that we shouldn't remove it if _anyone_ is using
>> >>>> it, then we cannot remove it because the user ML obviously does not
>> >>>> reach all users.
>> >>>>
>> >>>> On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
>> >>>>> Hi all,
>> >>>>>
>> >>>>> I am bringing the up again to see if there are any users actively
>> >>>>> using the BucketingSink.
>> >>>>> So far, if I am not mistaken (and really sorry if I forgot anything),
>> >>>>> it is only a discussion between devs about the potential problems of
>> >>>>> removing it. I totally understand Chesnay's concern about not
>> >>>>> providing compatibility with the StreamingFileSink (SFS) and if there
>> >>>>> are any users, then we should not remove it without trying to find a
>> >>>>> solution for them.
>> >>>>>
>> >>>>> But if there are no use

Re: RestClusterClient and classpath

2020-10-28 Thread Kostas Kloudas
Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
 wrote:
>
> Always the same problem.
>
> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 10 more
>
> I've also tried with
>
> flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>
> but nothing changes.
>
> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler  wrote:
>>
>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things 
>> outside the usercode classlodaer (getPipelineFromProgram()), specifically 
>> the call to the main method.
>>
>> @klou This seems like wrong behavior?
>>
>> @Flavio What you could try in the meantime is wrap the call to 
>> createJobGraph like this:
>>
>> final ClassLoader contextClassLoader = 
>> Thread.currentThread().getContextClassLoader();
>> try {
>>
>> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>// do tstuff
>> } finally {
>>Thread.currentThread().setContextClassLoader(contextClassLoader);
>> }
>>
>>
>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>
>> Any help here?  How can I understand why the classes inside the jar are not 
>> found when creating the PackagedProgram?
>>
>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier  
>> wrote:
>>>
>>> In the logs I see that the jar is the classpath (I'm trying to debug the 
>>> program from the IDE)..isn'it?
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>> ...
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler  
>>> wrote:

 * your JobExecutor is _not_ putting it on the classpath.

 On 10/27/2020 10:36 AM, Chesnay Schepler wrote:

 Well it happens on the client before you even hit the RestClusterClient, 
 so I assume that either your jar is not packaged correctly or you your 
 JobExecutor is putting it on the classpath.

 On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:

 Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class 
 I'm trying to use as a client towards the Flink cluster - session mode).
 it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

 The code of getBatchEnv is:

 @Deprecated
   public static BatchEnv getBatchEnv() {
 // TODO use the following when ready to convert from/to datastream
 // return 
 getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
 ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment ret = BatchTableEnvironment.create(env);
 customizeEnv(ret);
 return new BatchEnv(env, ret);
   }

   private static void customizeEnv(TableEnvironment ret) {
 final Configuration conf = ret.getConfig().getConfiguration();
 // 
 conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
  2);
 conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
 conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
 FLINK_TEST_TMP_DIR);
 // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
 //NOSONAR
 // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
 0.4f);//NOSONAR
 // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 
 2);//NOSONAR
 // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 
 2);// NOSONAR
 conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// 
 NOSONAR
 conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
 conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
 conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
 NOSONAR
 final List kryoSerializers = new ArrayList<>();
 kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
 JodaDateTimeSerializer.class));
 kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
 

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
I think that the mailing lists is the best we can do and I would say
that they seem to be working pretty well (e.g. the recent Mesos
discussion).
Of course they are not perfect but the alternative would be to never
remove anything user facing until the next major release, which I find
pretty strict.

On Wed, Oct 28, 2020 at 10:04 AM Chesnay Schepler  wrote:
>
> If the conclusion is that we shouldn't remove it if _anyone_ is using
> it, then we cannot remove it because the user ML obviously does not
> reach all users.
>
> On 10/28/2020 9:28 AM, Kostas Kloudas wrote:
> > Hi all,
> >
> > I am bringing the up again to see if there are any users actively
> > using the BucketingSink.
> > So far, if I am not mistaken (and really sorry if I forgot anything),
> > it is only a discussion between devs about the potential problems of
> > removing it. I totally understand Chesnay's concern about not
> > providing compatibility with the StreamingFileSink (SFS) and if there
> > are any users, then we should not remove it without trying to find a
> > solution for them.
> >
> > But if there are no users then I would still propose to remove the
> > module, given that I am not aware of any efforts to provide
> > compatibility with the SFS any time soon.
> > The reasons for removing it also include the facts that we do not
> > actively maintain it and we do not add new features. As for potential
> > missing features in the SFS compared to the BucketingSink that was
> > mentioned before, I am not aware of any fundamental limitations and
> > even if there are, I would assume that the solution is not to direct
> > the users to a deprecated sink but rather try to increase the
> > functionality of the actively maintained one.
> >
> > Please keep in mind that the BucketingSink is deprecated since FLINK
> > 1.9 and there is a new File Sink that is coming as part of FLIP-143
> > [1].
> > Again, if there are any active users who cannot migrate easily, then
> > we cannot remove it before trying to provide a smooth migration path.
> >
> > Thanks,
> > Kostas
> >
> > [1] 
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> >
> > On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
> >> @Seth: Earlier in this discussion it was said that the BucketingSink
> >> would not be usable in 1.12 .
> >>
> >> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> >>> +1 It has been deprecated for some time and the StreamingFileSink has
> >>> stabalized with a large number of formats and features.
> >>>
> >>> Plus, the bucketing sink only implements a small number of stable
> >>> interfaces[1]. I would expect users to continue to use the bucketing sink
> >>> from the 1.11 release with future versions for some time.
> >>>
> >>> Seth
> >>>
> >>> https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >>>
> >>> On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >>>
> >>>> @Arvid Heise I also do not remember exactly what were all the
> >>>> problems. The fact that we added some more bulk formats to the
> >>>> streaming file sink definitely reduced the non-supported features. In
> >>>> addition, the latest discussion I found on the topic was [1] and the
> >>>> conclusion of that discussion seems to be to remove it.
> >>>>
> >>>> Currently, I cannot find any obvious reason why keeping the
> >>>> BucketingSink, apart from the fact that we do not have a migration
> >>>> plan unfortunately. This is why I posted this to dev@ and user@.
> >>>>
> >>>> Cheers,
> >>>> Kostas
> >>>>
> >>>> [1]
> >>>> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> >>>>
> >>>> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >>>>> I remember this conversation popping up a few times already and I'm in
> >>>>> general a big fan of removing BucketingSink.
> >>>>>
> >>>>> However, until now there were a few features lacking in 
> >>>>> StreamingFileSink
> >>>>> that are present in BucketingSink and that are being actively used (I
>

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-28 Thread Kostas Kloudas
Hi all,

I am bringing the up again to see if there are any users actively
using the BucketingSink.
So far, if I am not mistaken (and really sorry if I forgot anything),
it is only a discussion between devs about the potential problems of
removing it. I totally understand Chesnay's concern about not
providing compatibility with the StreamingFileSink (SFS) and if there
are any users, then we should not remove it without trying to find a
solution for them.

But if there are no users then I would still propose to remove the
module, given that I am not aware of any efforts to provide
compatibility with the SFS any time soon.
The reasons for removing it also include the facts that we do not
actively maintain it and we do not add new features. As for potential
missing features in the SFS compared to the BucketingSink that was
mentioned before, I am not aware of any fundamental limitations and
even if there are, I would assume that the solution is not to direct
the users to a deprecated sink but rather try to increase the
functionality of the actively maintained one.

Please keep in mind that the BucketingSink is deprecated since FLINK
1.9 and there is a new File Sink that is coming as part of FLIP-143
[1].
Again, if there are any active users who cannot migrate easily, then
we cannot remove it before trying to provide a smooth migration path.

Thanks,
Kostas

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

On Fri, Oct 16, 2020 at 4:36 PM Chesnay Schepler  wrote:
>
> @Seth: Earlier in this discussion it was said that the BucketingSink
> would not be usable in 1.12 .
>
> On 10/16/2020 4:25 PM, Seth Wiesman wrote:
> > +1 It has been deprecated for some time and the StreamingFileSink has
> > stabalized with a large number of formats and features.
> >
> > Plus, the bucketing sink only implements a small number of stable
> > interfaces[1]. I would expect users to continue to use the bucketing sink
> > from the 1.11 release with future versions for some time.
> >
> > Seth
> >
> > https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172
> >
> > On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:
> >
> >> @Arvid Heise I also do not remember exactly what were all the
> >> problems. The fact that we added some more bulk formats to the
> >> streaming file sink definitely reduced the non-supported features. In
> >> addition, the latest discussion I found on the topic was [1] and the
> >> conclusion of that discussion seems to be to remove it.
> >>
> >> Currently, I cannot find any obvious reason why keeping the
> >> BucketingSink, apart from the fact that we do not have a migration
> >> plan unfortunately. This is why I posted this to dev@ and user@.
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1]
> >> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
> >>
> >> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >>> I remember this conversation popping up a few times already and I'm in
> >>> general a big fan of removing BucketingSink.
> >>>
> >>> However, until now there were a few features lacking in StreamingFileSink
> >>> that are present in BucketingSink and that are being actively used (I
> >> can't
> >>> exactly remember them now, but I can look it up if everyone else is also
> >>> suffering from bad memory). Did we manage to add them in the meantime? If
> >>> not, then it feels rushed to remove it at this point.
> >>>
> >>> On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> >> wrote:
> >>>> @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> >>>> to migrate from the BucketingSink to the StreamingFileSink. It may be
> >>>> possible but it will require some effort because the logic would be
> >>>> "read the old state, commit it, and start fresh with the
> >>>> StreamingFileSink."
> >>>>
> >>>> On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> >>>> wrote:
> >>>>> On 13.10.20 14:01, David Anderson wrote:
> >>>>>> I thought this was waiting on FLIP-46 -- Graceful Shutdown
> >> Handling --
> >>>> and
> >>>>>> in fact, the StreamingFileSink is mentioned in that FLIP as a
> >>>> motivating
> >>>>>> use 

Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Kostas Kloudas
Thanks Piyush for the message.
After this, I revoke my +1. I agree with the previous opinions that we
cannot drop code that is actively used by users, especially if it
something that deep in the stack as support for cluster management
framework.

Cheers,
Kostas

On Fri, Oct 23, 2020 at 4:15 PM Piyush Narang  wrote:
>
> Hi folks,
>
>
>
> We at Criteo are active users of the Flink on Mesos resource management 
> component. We are pretty heavy users of Mesos for scheduling workloads on our 
> edge datacenters and we do want to continue to be able to run some of our 
> Flink topologies (to compute machine learning short term features) on those 
> DCs. If possible our vote would be not to drop Mesos support as that will tie 
> us to an old release / have to maintain a fork as we’re not planning to 
> migrate off Mesos anytime soon. Is the burden something that can be helped 
> with by the community? (Or are you referring to having to ensure PRs handle 
> the Mesos piece as well when they touch the resource managers?)
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> From: Till Rohrmann 
> Date: Friday, October 23, 2020 at 8:19 AM
> To: Xintong Song 
> Cc: dev , user 
> Subject: Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for starting this survey Robert! I second Konstantin and Xintong in 
> the sense that our Mesos user's opinions should matter most here. If our 
> community is no longer using the Mesos integration, then I would be +1 for 
> removing it in order to decrease the maintenance burden.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  wrote:
>
> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
>
> With my developer hat on, removing the Mesos support would definitely reduce 
> the maintaining overhead for the deployment and resource management related 
> components. On the other hand, the Flink on Mesos users' voices definitely 
> matter a lot for this community. Either way, it would be good to draw users 
> attention to this discussion early.
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  wrote:
>
> Hi Robert,
>
> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
> would still support it in Flink 1.12- with bug fixes for some time so that
> users have time to move on.
>
> It would certainly be very interesting to hear from current Flink on Mesos
> users, on how they see the evolution of this part of the ecosystem.
>
> Best,
>
> Konstantin


Re: [SURVEY] Remove Mesos support

2020-10-23 Thread Kostas Kloudas
+1 for adding a warning about the removal of Mesos support and I would
also propose to state explicitly in the warning the version that we
are planning to actually remove it (e.g. 1.13 or even 1.14 if we feel
it is too aggressive).

This will help as a reminder to users and devs about the upcoming
removal and it will avoid future, potentially endless, discussions.

Cheers,
Kostas

On Fri, Oct 23, 2020 at 2:03 PM Xintong Song  wrote:
>
> +1 for adding a warning in 1.12 about planning to remove Mesos support.
>
>
> With my developer hat on, removing the Mesos support would definitely reduce 
> the maintaining overhead for the deployment and resource management related 
> components. On the other hand, the Flink on Mesos users' voices definitely 
> matter a lot for this community. Either way, it would be good to draw users 
> attention to this discussion early.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Oct 23, 2020 at 7:53 PM Konstantin Knauf  wrote:
>>
>> Hi Robert,
>>
>> +1 to the plan you outlined. If we were to drop support in Flink 1.13+, we
>> would still support it in Flink 1.12- with bug fixes for some time so that
>> users have time to move on.
>>
>> It would certainly be very interesting to hear from current Flink on Mesos
>> users, on how they see the evolution of this part of the ecosystem.
>>
>> Best,
>>
>> Konstantin


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-15 Thread Kostas Kloudas
@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1] 
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
>
> I remember this conversation popping up a few times already and I'm in
> general a big fan of removing BucketingSink.
>
> However, until now there were a few features lacking in StreamingFileSink
> that are present in BucketingSink and that are being actively used (I can't
> exactly remember them now, but I can look it up if everyone else is also
> suffering from bad memory). Did we manage to add them in the meantime? If
> not, then it feels rushed to remove it at this point.
>
> On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas  wrote:
>
> > @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> > to migrate from the BucketingSink to the StreamingFileSink. It may be
> > possible but it will require some effort because the logic would be
> > "read the old state, commit it, and start fresh with the
> > StreamingFileSink."
> >
> > On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> > wrote:
> > >
> > > On 13.10.20 14:01, David Anderson wrote:
> > > > I thought this was waiting on FLIP-46 -- Graceful Shutdown Handling --
> > and
> > > > in fact, the StreamingFileSink is mentioned in that FLIP as a
> > motivating
> > > > use case.
> > >
> > > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46. Thanks
> > > for the reminder, we should close FLIP-46 now with an explanatory
> > > message to avoid confusion.
> >
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng


Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi Chesnay,

Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.

Kostas

On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler  wrote:
>
> Are older versions of the module compatible with 1.12+?
>
> On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
> > Hi all,
> >
> > As the title suggests, this thread is to discuss the removal of the
> > flink-connector-filesystem module which contains (only) the deprecated
> > BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
> > favor of the relatively recently introduced StreamingFileSink.
> >
> > For the sake of a clean and more manageable codebase, I propose to
> > remove this module for release-1.12, but of course we should see first
> > if there are any usecases that depend on it.
> >
> > Let's have a fruitful discussion.
> >
> > Cheers,
> > Kostas
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-13396
> >
>


[DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi all,

As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.

For the sake of a clean and more manageable codebase, I propose to
remove this module for release-1.12, but of course we should see first
if there are any usecases that depend on it.

Let's have a fruitful discussion.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13396


Re: Reading from Multiple Directories with StreamExecutionEnvironment

2020-10-06 Thread Kostas Kloudas
Hi Jason,

Your analysis seems correct.

As an alternative, you could:
1) either call readFile multiple times on the
StreamExecutionEnvironment (once for each dir you want to monitor) and
then union the streams, or
2) you could put all the dirs you want to monitor under a common
parent dir and specify that as the directory to monitor and set the
setNestedFileEnumeration() on the input format to search recursively.

As a side note, this is not going to be a problem anymore with the new
FileSource, although this will come in the next Flink release.

Cheers,
Kostas

On Tue, Oct 6, 2020 at 2:11 AM Jason Liu  wrote:
>
> Hi all,
>
> I came across this change that allows user to have multiple file paths to 
> read from in Flink. However, I have a question about how to use this feature 
> for StreamExecutionEnvironment.readFile(). It seems in readFile, the input 
> filePaths actually get overwritten here. So no matter what FileInputFormat I 
> pass into it, the filePaths will just get set to a single directory later. 
> Just curious if I'm missing something here?
>
> This is a sample code I have:
>
> // Read from S3 object to get the list of S3 paths.
> final List directoryList =
> getDirectoryList(someClient.getS3ObjectContentAsString(commonBucket, 
> directory.getKey()));
>
> inputFormat = new TextInputFormat(new Path(inputBucketProperty));
> inputFormat.setFilePaths((String[]) directoryList.toArray());
> inputFormat.setNestedFileEnumeration(true);
>
> streamEnv
> .readFile(inputFormat, "some path")
> .addSink(createSink());
>
> streamEnv.execute(getClass().getSimpleName());
>
> This is going to run on Kinesis Data Analytics, if that makes any difference.
>
> Thanks for the help, if any :)
> -Jason


Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-28 Thread Kostas Kloudas
Hi all,

I will have a look.

Kostas

On Mon, Sep 28, 2020 at 3:56 PM Till Rohrmann  wrote:
>
> Hi Cristian,
>
> thanks for reporting this issue. It looks indeed like a very critical problem.
>
> The problem seems to be that the ApplicationDispatcherBootstrap class 
> produces an exception (that the request job can no longer be found because of 
> a lost ZooKeeper connection) which will be interpreted as a job failure. Due 
> to this interpretation, the cluster will be shut down with a terminal state 
> of FAILED which will cause the HA data to be cleaned up. The exact problem 
> occurs in the JobStatusPollingUtils.getJobResult which is called by 
> ApplicationDispatcherBootstrap.getJobResult().
>
> I think there are two problems here: First of all not every exception 
> bubbling up in the future returned by 
> ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a 
> job failure. Some of them can also indicate a framework failure which should 
> not lead to the clean up of HA data. The other problem is that the polling 
> logic cannot properly handle a temporary connection loss to ZooKeeper which 
> is a normal situation.
>
> I am pulling in Aljoscha and Klou who worked on this feature and might be 
> able to propose a solution for these problems. I've also updated the JIRA 
> issue FLINK-19154.
>
> Cheers,
> Till
>
> On Wed, Sep 9, 2020 at 9:00 AM Yang Wang  wrote:
>>
>> > The job sub directory will be cleaned up when the job 
>> > finished/canceled/failed.
>> Since we could submit multiple jobs into a Flink session, what i mean is 
>> when a job
>> reached to the terminal state, the sub node(e.g. 
>> /flink/application_/running_job_registry/4d255397c7aeb5327adb567238c983c1)
>> on the Zookeeper will be cleaned up. But the root 
>> directory(/flink/application_/) still exists.
>>
>>
>> For your current case, it is a different case(perjob cluster). I think we 
>> need to figure out why the only
>> running job reached the terminal state. For example, the restart attempts 
>> are exhausted. And you
>> could find the following logs in your JobManager log.
>>
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> NoRestartBackoffTimeStrategy"
>>
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Cristian  于2020年9月9日周三 上午11:26写道:
>>>
>>> > The job sub directory will be cleaned up when the job 
>>> > finished/canceled/failed.
>>>
>>> What does this mean?
>>>
>>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the 
>>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>>
>>> The only cases where I expect Flink to clean up the checkpoint data from ZK 
>>> is when I explicitly stop or cancel the job (in those cases the job manager 
>>> takes a savepoint before cleaning up zk and finishing the cluster).
>>>
>>> Which is not the case here. Flink was on autopilot here and decided to wipe 
>>> my poor, good checkpoint metadata as the logs show.
>>>
>>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>>
>>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS, 
>>> will only be cleaned up
>>> when the Flink cluster reached terminated state.
>>>
>>> So if you are using a session cluster, the root cluster node on Zk will be 
>>> cleaned up after you manually
>>> stop the session cluster. The job sub directory will be cleaned up when the 
>>> job finished/canceled/failed.
>>>
>>> If you are using a job/application cluster, once the only running job 
>>> finished/failed, all the HA data will
>>> be cleaned up. I think you need to check the job restart strategy you have 
>>> set. For example, the following
>>> configuration will make the Flink cluster terminated after 10 attempts.
>>>
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Cristian  于2020年9月9日周三 上午12:28写道:
>>>
>>>
>>> I'm using the standalone script to start the cluster.
>>>
>>> As far as I can tell, it's not easy to reproduce. We found that zookeeper 
>>> lost a node around the time this happened, but all of our other 75 Flink 
>>> jobs which use the same setup, version and zookeeper, didn't have any 
>>> issues. They didn't even restart.
>>>
>>> So unfortunately I don't know how to reproduce this. All I know is I can't 
>>> sleep. I have nightmares were my precious state is deleted. I wake up 
>>> crying and quickly start manually savepointing all jobs just in case, 
>>> because I feel the day of reckon is near. Flinkpocalypse!
>>>
>>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>>
>>> Thanks a lot for reporting this problem here Cristian!
>>>
>>> I am not super familiar with the involved components, but the behavior you 
>>> are describing doesn't sound right to me.
>>> Which entrypoint are you using? This is logged at the beginning, like this: 
>>> "2020-09-08 14:45:32,807 INFO  
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Starting 
>>> 

Re: Better way to share large data across task managers

2020-09-25 Thread Kostas Kloudas
Hi Dongwon,

Yes, you are right that I assume that broadcasting occurs once. This
is what I meant by "If you know the data in advance". Sorry for not
being clear. If you need to periodically broadcast new versions of the
data, then I cannot find a better solution than the one you propose
with the static var.

Cheers,
Kostas

On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim  wrote:
>
> Hi Kostas,
>
> Thanks for the input!
>
> BTW, I guess you assume that the broadcasting occurs just once for
> bootstrapping, huh?
> My job needs not only bootstrapping but also periodically fetching a
> new version of data from some external storage.
>
> Thanks,
>
> Dongwon
>
> > 2020. 9. 23. 오전 4:59, Kostas Kloudas  작성:
> >
> > Hi Dongwon,
>
>
>
>
>
> >
> > If you know the data in advance, you can always use the Yarn options
> > in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> > the data you want only once to each Yarn container (i.e. TM) and then
> > write a udf which reads them in the open() method. This will allow the
> > data to be shipped only once per TM but then each of the tasks will
> > have its own copy in memory of course. By default the visibility of
> > the files that you ship is set to APPLICATION [2], if I am not
> > mistaken so if more than one TMs go to the same node, then you will
> > have even less copies shipped.
> >
> > Does this help with your usecase?
> >
> > Cheers,
> > Kostas
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> > [2] 
> > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
> >
> >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
> >> Hi,
> >> I'm using Flink broadcast state similar to what Fabian explained in [1]. 
> >> One difference might be the size of the broadcasted data; the size is 
> >> around 150MB.
> >> I've launched 32 TMs by setting
> >> - taskmanager.numberOfTaskSlots : 6
> >> - parallelism of the non-broadcast side : 192
> >> Here's some questions:
> >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it 
> >> right?
> >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> >> each TM can read the broadcasted data? I'm considering implementing a 
> >> static class for the non-broadcast side to directly load data only once on 
> >> each TaskManager instead of the broadcast state (FYI, I'm using per-job 
> >> clusters on YARN, so each TM is only for a single job). However, I'd like 
> >> to use Flink native facilities if possible.
> >> The type of broadcasted data is Map with around 600K entries, 
> >> so every time the data is broadcasted a lot of GC is inevitable on each TM 
> >> due to the (de)serialization cost.
> >> Any advice would be much appreciated.
> >> Best,
> >> Dongwon
> >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Re: Better way to share large data across task managers

2020-09-22 Thread Kostas Kloudas
Hi Dongwon,

If you know the data in advance, you can always use the Yarn options
in [1] (e.g. the "yarn.ship-directories") to ship the directories with
the data you want only once to each Yarn container (i.e. TM) and then
write a udf which reads them in the open() method. This will allow the
data to be shipped only once per TM but then each of the tasks will
have its own copy in memory of course. By default the visibility of
the files that you ship is set to APPLICATION [2], if I am not
mistaken so if more than one TMs go to the same node, then you will
have even less copies shipped.

Does this help with your usecase?

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
[2] 
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html

On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>
> Hi,
>
> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
> difference might be the size of the broadcasted data; the size is around 
> 150MB.
>
> I've launched 32 TMs by setting
> - taskmanager.numberOfTaskSlots : 6
> - parallelism of the non-broadcast side : 192
>
> Here's some questions:
> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> each TM can read the broadcasted data? I'm considering implementing a static 
> class for the non-broadcast side to directly load data only once on each 
> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
> on YARN, so each TM is only for a single job). However, I'd like to use Flink 
> native facilities if possible.
>
> The type of broadcasted data is Map with around 600K entries, so 
> every time the data is broadcasted a lot of GC is inevitable on each TM due 
> to the (de)serialization cost.
>
> Any advice would be much appreciated.
>
> Best,
>
> Dongwon
>
> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas  wrote:
>
> Hi Guowei,
>
> Thanks for the insightful comment!
>
> I agree that this can be a limitation of the current runtime, but I
> think that this FLIP can go on as it discusses mainly the semantics
> that the DataStream API will expose when applied on bounded data.
> There will definitely be other FLIPs that will actually handle the
> runtime-related topics.
>
> But it is good to document them nevertheless so that we start soon
> ironing out the remaining rough edges.
>
> Cheers,
> Kostas
>
> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
> >
> > Hi, Klou
> >
> > Thanks for your proposal. It's a very good idea.
> > Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> > AUTOMATIC execution mode maybe we could not pick BATCH execution mode even 
> > if all sources are bounded. For example some applications would use the 
> > `CheckpointListener`, which is not available in the BATCH mode in current 
> > implementation.
> > So maybe we need more checks in the AUTOMATIC execution mode.
> >
> > Best,
> > Guowei
> >
> >
> > On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
> >>
> >> Hi all,
> >>
> >> Thanks for the comments!
> >>
> >> @Dawid: "execution.mode" can be a nice alternative and from a quick
> >> look it is not used currently by any configuration option. I will
> >> update the FLIP accordingly.
> >>
> >> @David: Given that having the option to allow timers to fire at the
> >> end of the job is already in the FLIP, I will leave it as is and I
> >> will update the default policy to be "ignore processing time timers
> >> set by the user". This will allow existing dataStream programs to run
> >> on bounded inputs. This update will affect point 2 in the "Processing
> >> Time Support in Batch" section.
> >>
> >> If these changes cover your proposals, then I would like to start a
> >> voting thread tomorrow evening if this is ok with you.
> >>
> >> Please let me know until then.
> >>
> >> Kostas
> >>
> >> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  
> >> wrote:
> >> >
> >> > Being able to optionally fire registered processing time timers at the 
> >> > end of a job would be interesting, and would help in (at least some of) 
> >> > the cases I have in mind. I don't have a better idea.
> >> >
> >> > David
> >> >
> >> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  
> >> > wrote:
> >> >>
> >> >> Hi Kurt and David,
> >> >>
> >> >> Thanks a lot for the insightful feedback!
> >> >>
> >> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> >> agree with you that it requires a lot more work and careful thinking
> >> >> on the semantics. This FLIP was written under the assumption that if
> >> >> the user wants to have checkpoints on bounded input, he/she will have
> >> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> >> can be handled as a separate topic in the future.
> >> >>
> >> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> >> what are the plans there at the scheduling level, as one could imagine
> >> >> in the future that in mixed workloads, we schedule first all the
> >> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> >> subgraph per application, which is going to be scheduled after all
> >> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> >> towards that direction.
> >> >>
> >> >>
> >> >> @David: The processing time timer handling is a topic that has also
> >> >> been discussed in the community in the past, and I do not remember any
> >> >> final conclusion unfortunately.
> >> >>
> >> >> In the current context and for bounded input, 

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Kostas Kloudas
Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:
>
> Hi, Klou
>
> Thanks for your proposal. It's a very good idea.
> Just a little comment about the "Batch vs Streaming Scheduling".  In the 
> AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if 
> all sources are bounded. For example some applications would use the 
> `CheckpointListener`, which is not available in the BATCH mode in current 
> implementation.
> So maybe we need more checks in the AUTOMATIC execution mode.
>
> Best,
> Guowei
>
>
> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> Thanks for the comments!
>>
>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>> look it is not used currently by any configuration option. I will
>> update the FLIP accordingly.
>>
>> @David: Given that having the option to allow timers to fire at the
>> end of the job is already in the FLIP, I will leave it as is and I
>> will update the default policy to be "ignore processing time timers
>> set by the user". This will allow existing dataStream programs to run
>> on bounded inputs. This update will affect point 2 in the "Processing
>> Time Support in Batch" section.
>>
>> If these changes cover your proposals, then I would like to start a
>> voting thread tomorrow evening if this is ok with you.
>>
>> Please let me know until then.
>>
>> Kostas
>>
>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:
>> >
>> > Being able to optionally fire registered processing time timers at the end 
>> > of a job would be interesting, and would help in (at least some of) the 
>> > cases I have in mind. I don't have a better idea.
>> >
>> > David
>> >
>> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:
>> >>
>> >> Hi Kurt and David,
>> >>
>> >> Thanks a lot for the insightful feedback!
>> >>
>> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> >> agree with you that it requires a lot more work and careful thinking
>> >> on the semantics. This FLIP was written under the assumption that if
>> >> the user wants to have checkpoints on bounded input, he/she will have
>> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> >> can be handled as a separate topic in the future.
>> >>
>> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> >> should be set to STREAMING. That is why the AUTOMATIC option sets
>> >> scheduling to BATCH only if all the sources are bounded. I am not sure
>> >> what are the plans there at the scheduling level, as one could imagine
>> >> in the future that in mixed workloads, we schedule first all the
>> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> >> subgraph per application, which is going to be scheduled after all
>> >> Bounded ones have finished. Essentially the bounded subgraphs will be
>> >> used to bootstrap the unbounded one. But, I am not aware of any plans
>> >> towards that direction.
>> >>
>> >>
>> >> @David: The processing time timer handling is a topic that has also
>> >> been discussed in the community in the past, and I do not remember any
>> >> final conclusion unfortunately.
>> >>
>> >> In the current context and for bounded input, we chose to favor
>> >> reproducibility of the result, as this is expected in batch processing
>> >> where the whole input is available in advance. This is why this
>> >> proposal suggests to not allow processing time timers. But I
>> >> understand your argument that the user may want to be able to run the
>> >> same pipeline on batch and streaming this is why we added the two
>> >> options under future work, namely (from the FLIP):
>> >>
>> >> ```
>> >> Future Work: In the future we may consider adding as options the

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-20 Thread Kostas Kloudas
Hi all,

Thanks for the comments!

@Dawid: "execution.mode" can be a nice alternative and from a quick
look it is not used currently by any configuration option. I will
update the FLIP accordingly.

@David: Given that having the option to allow timers to fire at the
end of the job is already in the FLIP, I will leave it as is and I
will update the default policy to be "ignore processing time timers
set by the user". This will allow existing dataStream programs to run
on bounded inputs. This update will affect point 2 in the "Processing
Time Support in Batch" section.

If these changes cover your proposals, then I would like to start a
voting thread tomorrow evening if this is ok with you.

Please let me know until then.

Kostas

On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:
>
> Being able to optionally fire registered processing time timers at the end of 
> a job would be interesting, and would help in (at least some of) the cases I 
> have in mind. I don't have a better idea.
>
> David
>
> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:
>>
>> Hi Kurt and David,
>>
>> Thanks a lot for the insightful feedback!
>>
>> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> agree with you that it requires a lot more work and careful thinking
>> on the semantics. This FLIP was written under the assumption that if
>> the user wants to have checkpoints on bounded input, he/she will have
>> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> can be handled as a separate topic in the future.
>>
>> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> should be set to STREAMING. That is why the AUTOMATIC option sets
>> scheduling to BATCH only if all the sources are bounded. I am not sure
>> what are the plans there at the scheduling level, as one could imagine
>> in the future that in mixed workloads, we schedule first all the
>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> subgraph per application, which is going to be scheduled after all
>> Bounded ones have finished. Essentially the bounded subgraphs will be
>> used to bootstrap the unbounded one. But, I am not aware of any plans
>> towards that direction.
>>
>>
>> @David: The processing time timer handling is a topic that has also
>> been discussed in the community in the past, and I do not remember any
>> final conclusion unfortunately.
>>
>> In the current context and for bounded input, we chose to favor
>> reproducibility of the result, as this is expected in batch processing
>> where the whole input is available in advance. This is why this
>> proposal suggests to not allow processing time timers. But I
>> understand your argument that the user may want to be able to run the
>> same pipeline on batch and streaming this is why we added the two
>> options under future work, namely (from the FLIP):
>>
>> ```
>> Future Work: In the future we may consider adding as options the capability 
>> of:
>> * firing all the registered processing time timers at the end of a job
>> (at close()) or,
>> * ignoring all the registered processing time timers at the end of a job.
>> ```
>>
>> Conceptually, we are essentially saying that we assume that batch
>> execution is assumed to be instantaneous and refers to a single
>> "point" in time and any processing-time timers for the future may fire
>> at the end of execution or be ignored (but not throw an exception). I
>> could also see ignoring the timers in batch as the default, if this
>> makes more sense.
>>
>> By the way, do you have any usecases in mind that will help us better
>> shape our processing time timer handling?
>>
>> Kostas
>>
>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>> >
>> > Kostas,
>> >
>> > I'm pleased to see some concrete details in this FLIP.
>> >
>> > I wonder if the current proposal goes far enough in the direction of 
>> > recognizing the need some users may have for "batch" and "bounded 
>> > streaming" to be treated differently. If I've understood it correctly, the 
>> > section on scheduling allows me to choose STREAMING scheduling even if I 
>> > have bounded sources. I like that approach, because it recognizes that 
>> > even though I have bounded inputs, I don't necessarily want batch 
>> > processing semantics. I think it makes sense to extend this idea to 
>> > processing time support as well.
>> >
>> > My thinking is that sometimes in 

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-18 Thread Kostas Kloudas
ecution or be ignored (but not throw an exception). I
>> could also see ignoring the timers in batch as the default, if this
>> makes more sense.
>>
>> By the way, do you have any usecases in mind that will help us better
>> shape our processing time timer handling?
>>
>> Kostas
>>
>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>> >
>> > Kostas,
>> >
>> > I'm pleased to see some concrete details in this FLIP.
>> >
>> > I wonder if the current proposal goes far enough in the direction of 
>> > recognizing the need some users may have for "batch" and "bounded 
>> > streaming" to be treated differently. If I've understood it correctly, the 
>> > section on scheduling allows me to choose STREAMING scheduling even if I 
>> > have bounded sources. I like that approach, because it recognizes that 
>> > even though I have bounded inputs, I don't necessarily want batch 
>> > processing semantics. I think it makes sense to extend this idea to 
>> > processing time support as well.
>> >
>> > My thinking is that sometimes in development and testing it's reasonable 
>> > to run exactly the same job as in production, except with different 
>> > sources and sinks. While it might be a reasonable default, I'm not 
>> > convinced that switching a processing time streaming job to read from a 
>> > bounded source should always cause it to fail.
>> >
>> > David
>> >
>> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>> >>
>> >> Hi all,
>> >>
>> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> >> API in favour of the DataStream API and the Table API. After this work
>> >> is done, the user will be able to write a program using the DataStream
>> >> API and this will execute efficiently on both bounded and unbounded
>> >> data. But before we reach this point, it is worth discussing and
>> >> agreeing on the semantics of some operations as we transition from the
>> >> streaming world to the batch one.
>> >>
>> >> This thread and the associated FLIP [2] aim at discussing these issues
>> >> as these topics are pretty important to users and can lead to
>> >> unpleasant surprises if we do not pay attention.
>> >>
>> >> Let's have a healthy discussion here and I will be updating the FLIP
>> >> accordingly.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >> [1] 
>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> >> [2] 
>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
+1 for removing them.

>From a quick look, most of them (not all) have been deprecated a long time ago.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 9:37 PM Dawid Wysakowicz  wrote:
>
> @David Yes, my idea was to remove any use of fold method and all related 
> classes including WindowedStream#fold
>
> @Klou Good idea to also remove the deprecated enableCheckpointing() & 
> StreamExecutionEnvironment#readFile and alike. I did another pass over some 
> of the classes and thought we could also drop:
>
> ExecutionConfig#set/getCodeAnalysisMode
> ExecutionConfig#disable/enableSysoutLogging
> ExecutionConfig#set/isFailTaskOnCheckpointError
> ExecutionConfig#isLatencyTrackingEnabled
>
> As for the `forceCheckpointing` I am not fully convinced to doing it. As far 
> as I know iterations still do not participate in checkpointing correctly. 
> Therefore it still might make sense to force it. In other words there is no 
> real alternative to that method. Unless we only remove the methods from 
> StreamExecutionEnvironment and redirect to the setter in CheckpointConfig. 
> WDYT?
>
> An updated list of methods I suggest to remove:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> Bear in mind that majority of the options listed above in ExecutionConfig 
> take no effect. They were left there purely to satisfy the binary 
> compatibility. Personally I don't see any benefit of leaving a method and 
> silently dropping the underlying feature. The only configuration that is 
> respected is setting the number of execution retries.
>
> I also wanted to make it explicit that most of the changes above would result 
> in a binary incompatibility and require additional exclusions in the japicmp. 
> Those are:
>
> ExecutionConfig#set/getCodeAnalysisMode (deprecated in 1.9)
> ExecutionConfig#disable/enableSysoutLogging (deprecated in 1.10)
> ExecutionConfig#set/isFailTaskOnCheckpointError (deprecated in 1.9)
> ExecutionConfig#isLatencyTrackingEnabled (deprecated in 1.7)
> ExecutionConfig#(get)/setNumberOfExecutionRetries() (deprecated in 1.1?)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
> StreamExecutionEnvironment#readFile,readFileStream(...),socketTextStream(...),socketTextStream(...)
>  (deprecated in 1.2)
>
> Looking forward to more opinions on the issue.
>
> Best,
>
> Dawid
>
>
> On 17/08/2020 12:49, Kostas Kloudas wrote:
>
> Thanks a lot for starting this Dawid,
>
> Big +1 for the proposed clean-up, and I would also add the deprecated
> methods of the StreamExecutionEnvironment like:
>
> enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
> enableCheckpointing()
> isForceCheckpointing()
>
> readFile(FileInputFormat inputFormat,String
> filePath,FileProcessingMode watchType,long interval, FilePathFilter
> filter)
> readFileStream(...)
>
> socketTextStream(String hostname, int port, char delimiter, long maxRetry)
> socketTextStream(String hostname, int port, char delimiter)
>
> There are more, like the (get)/setNumberOfExecutionRetries() that were
> deprecated long ago, but I have not investigated to see if they are
> actually easy to remove.
>
> Cheers,
> Kostas
>
> On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
>  wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators 

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-17 Thread Kostas Kloudas
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson  wrote:
>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of 
> recognizing the need some users may have for "batch" and "bounded streaming" 
> to be treated differently. If I've understood it correctly, the section on 
> scheduling allows me to choose STREAMING scheduling even if I have bounded 
> sources. I like that approach, because it recognizes that even though I have 
> bounded inputs, I don't necessarily want batch processing semantics. I think 
> it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to 
> run exactly the same job as in production, except with different sources and 
> sinks. While it might be a reasonable default, I'm not convinced that 
> switching a processing time streaming job to read from a bounded source 
> should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas  wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2] 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-17 Thread Kostas Kloudas
Thanks a lot for starting this Dawid,

Big +1 for the proposed clean-up, and I would also add the deprecated
methods of the StreamExecutionEnvironment like:

enableCheckpointing(long interval, CheckpointingMode mode, boolean force)
enableCheckpointing()
isForceCheckpointing()

readFile(FileInputFormat inputFormat,String
filePath,FileProcessingMode watchType,long interval, FilePathFilter
filter)
readFileStream(...)

socketTextStream(String hostname, int port, char delimiter, long maxRetry)
socketTextStream(String hostname, int port, char delimiter)

There are more, like the (get)/setNumberOfExecutionRetries() that were
deprecated long ago, but I have not investigated to see if they are
actually easy to remove.

Cheers,
Kostas

On Mon, Aug 17, 2020 at 10:53 AM Dawid Wysakowicz
 wrote:
>
> Hi devs and users,
>
> I wanted to ask you what do you think about removing some of the deprecated 
> APIs around the DataStream API.
>
> The APIs I have in mind are:
>
> RuntimeContext#getAllAccumulators (deprecated in 0.10)
> DataStream#fold and all related classes and methods such as FoldFunction, 
> FoldingState, FoldingStateDescriptor ... (deprecated in 1.3/1.4)
> StreamExecutionEnvironment#setStateBackend(AbstractStateBackend) (deprecated 
> in 1.5)
> DataStream#split (deprecated in 1.8)
> Methods in (Connected)DataStream that specify keys as either indices or field 
> names such as DataStream#keyBy, DataStream#partitionCustom, 
> ConnectedStream#keyBy,  (deprecated in 1.11)
>
> I think the first three should be straightforward. They are long deprecated. 
> The getAccumulators method is not used very often in my opinion. The same 
> applies to the DataStream#fold which additionally is not very performant. 
> Lastly the setStateBackend has an alternative with a class from the 
> AbstractStateBackend hierarchy, therefore it will be still code compatible. 
> Moreover if we remove the #setStateBackend(AbstractStateBackend) we will get 
> rid off warnings users have right now when setting a statebackend as the 
> correct method cannot be used without an explicit casting.
>
> As for the DataStream#split I know there were some objections against 
> removing the #split method in the past. I still believe the output tags can 
> replace the split method already.
>
> The only problem in the last set of methods I propose to remove is that they 
> were deprecated only in the last release and those method were only partially 
> deprecated. Moreover some of the methods were not deprecated in 
> ConnectedStreams. Nevertheless I'd still be inclined to remove the methods in 
> this release.
>
> Let me know what do you think about it.
>
> Best,
>
> Dawid


Re: getting error after upgrade Flink 1.11.1

2020-08-13 Thread Kostas Kloudas
Hi Dasraj,

Yes, I would recommend to use Public and, if necessary, PublicEvolving
APIs as they provide better guarantees for future maintenance.
Unfortunately there are no Docs about which APIs are public or
publiceEvolving but you can see the annotations of the classes in the
source code.
I guess you have access to the source code given that previously you
were using the cluster client directly.

You can always of course use Flink's REST API [1] to submit a job,
which provides stability guarantees.

If you want to risk a bit more, you can check the PipelineExecutors in
the codebase to see how job submission is currently done within Flink.
This will give you some ideas I hope about how to proceed.

I hope this helps,
Kostas

[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

On Fri, Aug 7, 2020 at 8:07 AM dasraj  wrote:
>
> Hi Kostas,
>
> I am trying to migrate our code base to use new ClusterClient method for job
> submission.
> As you recommending to use new publicEvolving APIs, any doc or link for
> reference will be helpful.
>
> Thanks,
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManagers are still up even after job execution completed in PerJob deployment mode

2020-08-13 Thread Kostas Kloudas
Hi Narasimha,

I am not sure why the TMs are not shutting down, as Yun said, so I am
cc'ing Till here as he may be able to shed some light.
For the application mode, the page in the documentation that you
pointed is the recommended way to deploy an application in application
mode.

Cheers,
Kostas

On Mon, Aug 10, 2020 at 11:16 AM narasimha  wrote:
>
> Thanks, Yun for the prompt reply.
>
> TaskManager was actively looking for ResourceManager, on timeout of 5 mins it 
> got terminated.
>
> Any recommendations around this? Or is this the way this will work.
>
> What should be done around this to make the application start in application 
> deployment mode?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#start-flink-application
>
> Here it has shown to invoke Flink binary to start. Is this the preferred way?
>
>
> On Mon, Aug 10, 2020 at 1:46 PM Yun Tang  wrote:
>>
>> Hi
>>
>> From your description, the task managers are still alive even the job is 
>> finished and job manager has shut down?
>> If so, I think this is really weird, could you check what the TM is doing 
>> via jstack and the logs in job manager and idle task manager?
>> The task manager should be released when the JM is shutting down.
>> Moreover, idle task manager would also release after 30 seconds by default 
>> [1].
>>
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resourcemanager-taskmanager-timeout
>>
>> Best
>> Yun Tang
>>
>>
>> 
>> From: narasimha 
>> Sent: Monday, August 10, 2020 15:36
>> To: user@flink.apache.org 
>> Subject: TaskManagers are still up even after job execution completed in 
>> PerJob deployment mode
>>
>>
>> I'm trying out Flink Per-Job deployment using docker-compose.
>>
>> Configurations:
>>
>> version: "2.2"
>> jobs:
>>   jobmanager:
>> build: ./
>> image: flink_local:1.1
>> ports:
>>   - "8081:8081"
>> command: standalone-job --job-classname com.organization.BatchJob
>> environment:
>>   - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> parallelism.default: 2
>>   taskmanager:
>> image: flink_local:1.1
>> depends_on:
>>   - jobmanager
>> command: taskmanager
>> scale: 1
>> environment:
>>   - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> parallelism.default: 2
>>
>> Flink image is extended with job.jar, Job executed successfully.
>>
>> JobManager exited after the job is completed, but is still running, which is 
>> not expected.
>>
>> Any configurations have to be added to exit both JobManager and TaskManger?
>>
>> Versions:
>>
>> Flink - 1.11.0
>>
>> Java - 1.8
>>
>>
>> --
>> A.Narasimha Swamy
>>
>
>
> --
> A.Narasimha Swamy
>


[DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-12 Thread Kostas Kloudas
Hi all,

As described in FLIP-131 [1], we are aiming at deprecating the DataSet
API in favour of the DataStream API and the Table API. After this work
is done, the user will be able to write a program using the DataStream
API and this will execute efficiently on both bounded and unbounded
data. But before we reach this point, it is worth discussing and
agreeing on the semantics of some operations as we transition from the
streaming world to the batch one.

This thread and the associated FLIP [2] aim at discussing these issues
as these topics are pretty important to users and can lead to
unpleasant surprises if we do not pay attention.

Let's have a healthy discussion here and I will be updating the FLIP
accordingly.

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522


Re: getting error after upgrade Flink 1.11.1

2020-08-04 Thread Kostas Kloudas
Hi Dasraj,

You are right. On your previous email I did not pay attention that you
migrated from 1.9.
Since 1.9 the ClusterClient has changed significantly as it is not
annotated as @Public API.

I am not sure how easy it is to use the old logic in your settings.
You could try copying the old code but I would recommend migrating to
the new codebase and use public or publicEvolving APIs. This will make
it easier for you to maintain your code in the future, as these APIs
come with stronger backwards compatibility guarantees.

Cheers,
Kostas


On Tue, Aug 4, 2020 at 2:40 PM dasraj  wrote:
>
> Hi Kostas,
>
> ClusterClient class is not our custom class it is in
> org.apache.flink.client.program package.
> i can see defination of ClusterClient has changed from 1.9.2 to 1.11.1
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/client/program/ClusterClient.html
> 
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/client/program/ClusterClient.html
> 
>
>
> Thanks,
>
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: getting error after upgrade Flink 1.11.1

2020-08-04 Thread Kostas Kloudas
Hi Dasraj,

Could you please specify where is the clusterClient.run() method and
how does it submit a job to a cluster?
Is the clusterClient your custom code?

Any details will help us pin down the problem.
One thing that is worth looking at is the release-notes of 1.11 [1].
There you will find all the user-facing changes that came with the
release.
For example, one note that may be relevant to you is [2].

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.11.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.11.html#web-submission-behaves-the-same-as-detached-mode

On Tue, Aug 4, 2020 at 9:07 AM dasraj  wrote:
>
> Hi,
>
> Currently we are using Flink 1.9.2 in our project and we are submitting Job
> from jar.
> below is that code we are using currently for jobsubmission:
>
> clusterClient.setDetached(true);
> clusterClient.run(packagedProgram,
> Integer.valueOf(params.get(Constants.PARAMS_PARALLELISM)));
>
> After upgrading it to Flink 1.11.1, above piece of code is not working.
> Also new packagedProgram constructor is expecting "Configuration" as
> parameter.
>
> How can we change that to work for our needs?
>
>
> Thanks,
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Kostas Kloudas
Thanks a lot for the update Tomasz and keep up posted if it happens again.

Kostas

On Fri, Jul 24, 2020 at 6:37 PM Tomasz Dudziak  wrote:
>
> Yes, the job was running and the REST server as well. No JobMaster failures 
> noticed.
> I used a test cluster deployed on a bunch of VM's and bare metal boxes.
> I am afraid, I can no longer reproduce this issue. It occurred a couple days 
> ago and lasted for an entire day with jobs being quite often erratically 
> reported as Not Found. As I said, I noticed that another query immediately 
> after the one that returned Not Found consistently returned a correct result.
> It had never occurred before and I am afraid now I could no longer observe it 
> again. I appreciate it does not give too much information so I will come back 
> with more info on this thread if it happens again.
>
> -----Original Message-
> From: Kostas Kloudas 
> Sent: 24 July 2020 15:46
> To: Tomasz Dudziak 
> Cc: user@flink.apache.org; Chesnay Schepler 
> Subject: Re: REST API randomly returns Not Found for an existing job
>
> Hi Tomasz,
>
> Thanks a lot for reporting this issue. If you have verified that the job is 
> running AND that the REST server is also up and running (e.g.
> check the overview page) then I think that this should not be happening. I am 
> cc'ing Chesnay who may have an additional opinion on this.
>
> Cheers,
> Kostas
>
> On Thu, Jul 23, 2020 at 12:59 PM Tomasz Dudziak  wrote:
> >
> > Hi,
> >
> >
> >
> > I have come across an issue related to GET /job/:jobId endpoint from 
> > monitoring REST API in Flink 1.9.0. A few seconds after successfully 
> > starting a job and confirming its status as RUNNING, that endpoint would 
> > return 404 (Not Found). Interestingly, querying immediately again 
> > (literally a millisecond later) would return a valid result. I later 
> > noticed a similar behaviour in regard to a finished job as well. At certain 
> > points in time that endpoint would arbitrarily return 404, but similarly 
> > querying again would succeed. I saw this strange behaviour only recently 
> > and it used to work fine before.
> >
> >
> >
> > Do you know what could be the root cause of this? At the moment, as a
> > workaround I just query a job a couple of times in a row to ensure
> > whether it definitely does not exist or it is just being misreported
> > as non-existent, but this feels a bit like cottage industry…
> >
> >
> >
> > Kind regards,
> >
> > Tomasz
> >
> >
> >
> > Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street,
> > London, SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 024 7061
> >
> >
> >
> >
> >
> > This e-mail and any attachments are confidential to the addressee(s) and 
> > may contain information that is legally privileged and/or confidential. 
> > Please refer to http://www.mwam.com/email-disclaimer-uk for important 
> > disclosures regarding this email. If we collect and use your personal data 
> > we will use it in accordance with our privacy policy, which can be reviewed 
> > at https://www.mwam.com/privacy-policy.
> >
> > Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> > Authority. Marshall Wace LLP is a limited liability partnership registered 
> > in England and Wales with registered number OC302228 and registered office 
> > at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving 
> > this e-mail as a client, or an investor in an investment vehicle, managed 
> > or advised by Marshall Wace North America L.P., the sender of this e-mail 
> > is communicating with you in the sender's capacity as an associated or 
> > related person of Marshall Wace North America L.P., which is registered 
> > with the US Securities and Exchange Commission as an investment adviser.
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. Please 
> refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
> regarding this email. If we collect and use your personal data we will use it 
> in accordance with our privacy policy, which can be reviewed at 
> https://www.mwam.com/privacy-policy .
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P., which is registered with the US 
> Securities and Exchange Commission as an investment adviser.


Re: Flink state reconciliation

2020-07-24 Thread Kostas Kloudas
Hi Alex,

Maybe Seth (cc'ed) may have an opinion on this.

Cheers,
Kostas

On Thu, Jul 23, 2020 at 12:08 PM Александр Сергеенко
 wrote:
>
> Hi,
>
> We use so-called "control stream" pattern to deliver settings to the Flink 
> job using Apache Kafka topics. The settings are in fact an unlimited stream 
> of events originating from the master DBMS, which acts as a single point of 
> truth concerning the rules list.
>
> It may seems odd, since Flink guarantees the "exactly once" delivery 
> semantics, while a service, which provides the rules publishing mechanism to 
> Kafka is written using Akka Streams and guarantees the "at least once" 
> semantics while the rule handling inside Flink Job implemented in an 
> idempotent manner, but: we have to manage some cases when we need to execute 
> a reconciliation between the current rules stored at the master DBMS and the 
> existing Flink State.
>
> We've looked at the Flink's tooling and found out that the State Processor 
> API can possibly solve our problem, so we basically have to implement a 
> periodical process, which unloads the State to some external file (.csv) and 
> then execute a comparison between the set and the information given at the 
> master system.
> Basically it looks like the lambda architecture approach while Flink is 
> supposed to implement the kappa architecture and in that case our 
> reconciliation problem looks a bit far-fetched.
>
> Are there any best practices or some patterns addressing such scenarios in 
> Flink?
>
> Great thanks for any possible assistance and ideas.
>
> -
> Alex Sergeenko
>


Re: Printing effective config for flint 1.11 cli

2020-07-24 Thread Kostas Kloudas
Hi Senthil,

You can see the configuration from the WebUI or you can get from the
REST API[1].
In addition, if you enable debug logging, you will have a line
starting with "Effective executor configuration:" in your client logs
(although I am not 100% sure if this will contain all the
configuration parameters).

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

On Fri, Jul 24, 2020 at 4:28 PM Senthil Kumar  wrote:
>
> Hello,
>
>
>
> My understanding is that flink consumes the config from the config file as 
> well as those specified via the -D  option.
>
> I assume that the -D will override the values from the config file?
>
>
>
> Is there a way to somehow see what the effective config is?
>
> i.e. print all of the config values that flink is going to be invoked with?
>
>
>
> We ran into an issue with the flink stop  command.
>
> It was exiting (after about a minute) with 
> java.util.concurrent.TimeoutException exception
>
> at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>
>
>
> Following the docs, I tried to issue the command with -D “client.timeout=10 
> min”, but it seemed to have made no difference.
>
> That made me wonder just what config values were actually being used.
>
>
>
> Cheers
>
> Kumar


Re: Changing watermark in the middle of a flow

2020-07-24 Thread Kostas Kloudas
Hi Lorenzo,

If you want to learn how Flink uses watermarks, it is worth checking [1].

Now in a nutshell, what a watermark will do in a pipeline is that it
may fire timers that you may have registered, or windows that you may
have accumulated.
If you have no time-sensitive operations between the first and the
second watermark generators, then I do not think you have to worry
(although it would help if you could share a bit more about your
pipeline in order to have a more educated estimation). If you have
windows, then your windows will fire and the emitted elements will
have the timestamp of the end of the window.

After the second watermark assigner, the watermarks coming from the
previous one are discarded and they are not flowing in the pipeline
anymore. You will only have the new watermarks.

A problem may arise if, for example, the second watermark generator
emits watermarks with smaller values than the first but the timestamps
of the elements are assigned based on the progress of the first
generator (e.g. windows fired) and now all your elements are
considered "late".

I hope that the above paint the big picture of what is happening in
your pipeline.

Again, I may be missing something so feel free to send more details
about your pipeline so that we can help a bit more.

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/timely-stream-processing.html

On Wed, Jul 22, 2020 at 9:35 AM Lorenzo Nicora  wrote:
>
> Hi
>
> I have a linear streaming flow with a single source and multiple sinks to 
> publish intermediate results.
> The time characteristic is Event Time and I am adding one 
> AssignerWithPeriodicWatermarks immediately after the source.
> I need to add a different assigner, in the middle of the flow, to change the 
> event time - i.e. extracting a different field from the record as event time.
>
> I am not sure I completely understand the implications of changing event time 
> and watermark in the middle of a flow.
>
> Can anybody give me a hint or direct me to any relevant documentation?
>
> Lorenzo


Re: Question on Pattern Matching

2020-07-24 Thread Kostas Kloudas
Hi Basanth,

If I understand your usecase correctly:

1) you want to find all A->B->C->D
2) for all of them you want to know how long it took to complete
3) if one completed within X it is considered ok, if not, it is
considered problematic
4) you want to count each one of them

One way to go is through CEP as Chesnay suggested and count the
resulting matches. This will give you all the requirements apart from
the one about knowing how long it took for the problematic ones to
complete.

One solution to solve this, is to specify a within() clause that is,
for example, 2x or 5x the SLA and then, for all successful (within 5x)
matches, you do the categorization manually (e,g. using a
ProcessFunction to split them into <= SLA and > SLA). After all, I
would assume that if you do not want to keep infinite state your job
has to have a cut off (in the above example 5x), after which a pattern
is considered as failed and you stop tracking it.

Another solution is to go with a ProcessFunction[1] since the
beginning and implement your logic, but if your elements arrive
"out-of-order" e.g. if B may arrive before A, then your code may need
to be pretty complicated. If out-of-orderness is not an issue then the
example in [1] can help, but still the solution will be more
complicated I guess than simply using CEP.

Hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example

On Thu, Jul 23, 2020 at 1:07 PM Basanth Gowda  wrote:
>
> Yes - I am able to process matched out patterns.
>
> Let's suppose I have an order fulfillment process.
>
> I want to know how many fulfillments have not met SLA and further how late 
> they are and track until they are fulfilled.
>
> From what I tried with samples, once the pattern timeout, it is discarded and 
> events that come after that are ignored (not applied to the pattern).
>
> Is there a better way to do it using Table API? Where I am able to emit an 
> event (alert) when the timeout happens, and it continues to alert me - hey 
> you fulfillment is delayed by 6 hours, 12 hours and so on and also know when 
> it is finally completed.
>
> On Thu, Jul 16, 2020 at 3:08 PM Chesnay Schepler  wrote:
>>
>> Have you read this part of the documentation?
>>
>> From what I understand, it provides you hooks for processing matched/timed 
>> out patterns.
>>
>>
>> On 16/07/2020 20:23, Basanth Gowda wrote:
>>
>> Hello,
>>
>> We have a use case where we want to know when a certain pattern doesn't 
>> complete within a given time frame.
>>
>> For Example A -> B -> C -> D (needs to complete in 10 minutes)
>>
>> Now with Flink if event D doesn't happen in 10 minutes, the pattern is 
>> discarded and we can get notified. We also want to track how many of them 
>> completed (even if they meet SLA). How do we achieve this with FLINK CEP or 
>> other mechanisms?
>>
>> thank you,
>> Basanth
>>
>>


Re: REST API randomly returns Not Found for an existing job

2020-07-24 Thread Kostas Kloudas
Hi Tomasz,

Thanks a lot for reporting this issue. If you have verified that the
job is running AND that the REST server is also up and running (e.g.
check the overview page) then I think that this should not be
happening. I am cc'ing Chesnay who may have an additional opinion on
this.

Cheers,
Kostas

On Thu, Jul 23, 2020 at 12:59 PM Tomasz Dudziak  wrote:
>
> Hi,
>
>
>
> I have come across an issue related to GET /job/:jobId endpoint from 
> monitoring REST API in Flink 1.9.0. A few seconds after successfully starting 
> a job and confirming its status as RUNNING, that endpoint would return 404 
> (Not Found). Interestingly, querying immediately again (literally a 
> millisecond later) would return a valid result. I later noticed a similar 
> behaviour in regard to a finished job as well. At certain points in time that 
> endpoint would arbitrarily return 404, but similarly querying again would 
> succeed. I saw this strange behaviour only recently and it used to work fine 
> before.
>
>
>
> Do you know what could be the root cause of this? At the moment, as a 
> workaround I just query a job a couple of times in a row to ensure whether it 
> definitely does not exist or it is just being misreported as non-existent, 
> but this feels a bit like cottage industry…
>
>
>
> Kind regards,
>
> Tomasz
>
>
>
> Tomasz Dudziak | Marshall Wace LLP, George House, 131 Sloane Street, London, 
> SW1X 9AT | E-mail: t.dudz...@mwam.com | Tel: +44 207 024 7061
>
>
>
>
>
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. Please 
> refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
> regarding this email. If we collect and use your personal data we will use it 
> in accordance with our privacy policy, which can be reviewed at 
> https://www.mwam.com/privacy-policy.
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P., which is registered with the US 
> Securities and Exchange Commission as an investment adviser.


Re: Dynamic partitioner for Flink based on incoming load

2020-06-25 Thread Kostas Kloudas
Hi Alexander,

Routing of input data in Flink can be done through keying and this can
guarantee collocation constraints. This means that you can send two
records to the same node by giving them the same key, e.g. the topic
name. Keep in mind that elements with different keys do not
necessarily go to different nodes, as key assignment to nodes is
random.

Given this, you could initially key by topic, so that all records of a
topic go to the same node. This node will compute statistics about the
topic, e.g. elem/sec (t) and based on thresholds assign new keys to
each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
will go to different machines but the probability of this happening
will increase with the parallelism of your job. Finally, based on your
bucket assigner and the rolling policy, you can redirect the elements
to the same bucket, e.g. TOPIC and tune how many part-files you will
have based on part-file size and/or time.

Will this help you with your use-case?

Cheers,
Kostas




On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
 wrote:
>
> Maybe I misreading the documentation, but:
> "Data within the partition directories are split into part files. Each 
> partition will contain at least one part file for each subtask of the sink 
> that has received data for that partition."
>
> So, it is 1 partition per subtask. I'm trying to figure out how to 
> dynamically adjust which subtask is getting the data to minimize the number 
> of subtasks writing into a specific partition.
>
> Alex
>
> On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman  wrote:
>>
>> You can achieve this in Flink 1.10 using the StreamingFileSink.
>>
>> I’d also like to note that Flink 1.11 (which is currently going through 
>> release testing and should be available imminently) has support for exactly 
>> this functionality in the table API.
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>>
>>
>> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik  
>> wrote:
>>>
>>> Hello!
>>>
>>> We are working an a Flink Streaming job that reads data from multiple Kafka 
>>> topics and writes them to DFS. We are using StreamingFileSink with custom 
>>> implementation for GCS FS and it generates a lot of files as streams are 
>>> partitioned among multiple JMs. In the ideal case we should have at most 1 
>>> file per kafka topic per interval. We also have some heavy topics and some 
>>> pretty light ones, so the solution should also be smart to utilize 
>>> resources efficiently.
>>>
>>> I was thinking we can partition based on how much data is ingested in the 
>>> last minute or so to make sure: messages from the same topic are routed to 
>>> the same (or minimal number of ) file if there are enough resources to do 
>>> so. Think bin packing.
>>>
>>> Is it a good idea? Is there a built in way to achieve it? If not, is there 
>>> a way to push state into the partitioner (or even kafka client to 
>>> repartition in the source)? I was thinking that I can run a side stream 
>>> that will calculate data volumes and then broadcast it into the main 
>>> stream, so partitioner can make a decision, but it feels a bit complex.
>>>
>>> Another way is to modify kafka client to track messages per topics and make 
>>> decision at that layer.
>>>
>>> Am I on the right path?
>>>
>>> Thank you


Re: Shipping Filesystem Plugins with YarnClusterDescriptor

2020-06-11 Thread Kostas Kloudas
Hi John,

I think that using different plugins is not going to be an issue,
assuming that the scheme of your FS's do not collide. This is already
the case for S3 within Flink, where we have 2 implementations, one
based on Presto and one based on Hadoop. For the first you can use the
scheme s3p while for the latter s3a.

Now for different versions of the same plugin, this can be an issue in
the case that all of them are present concurrently in your plugins
directory. But is this the case, or only the latest version of a given
plugin is present?

Keep in mind that after uploading, the "remote" plugins dir is not
shared among applications but it is "private" to each one of them.

Cheers,
Kostas

On Thu, Jun 11, 2020 at 5:12 PM John Mathews  wrote:
>
> So I think that will work, but it has some limitations. Namely, when 
> launching clusters through a service (which is our use case), it can be the 
> case that multiple different clients want clusters with different plugins or 
> different versions of a given plugin, but because the FlinkClusterDescriptor 
> currently reads where to get the plugins to ship from an environment 
> variable, there is a race condition where that directory could contain 
> plugins from multiple different in-flight requests to spin up a cluster.
>
> I think a possible solution is to expose configuration on the 
> YarnClusterDescriptor that is similar to the shipFiles list, but is instead a 
> shipPlugins list, that way, the plugins that get shipping are per yarn 
> cluster request instead of on a global level.
>
> Do you see any workarounds for the issue I described? Also, does the idea I 
> propose make sense as a solution?
>
>
>
> On Wed, Jun 10, 2020 at 9:16 PM Yangze Guo  wrote:
>>
>> Hi, John,
>>
>> AFAIK, Flink will automatically help you to ship the "plugins/"
>> directory of your Flink distribution to Yarn[1]. So, you just need to
>> make a directory in "plugins/" and put your custom jar into it. Do you
>> meet any problem with this approach?
>>
>> [1] 
>> https://github.com/apache/flink/blob/216f65fff10fb0957e324570662d075be66bacdf/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L770
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Jun 10, 2020 at 11:29 PM John Mathews  wrote:
>> >
>> > Hello,
>> >
>> > I have a custom filesystem that I am trying to migrate to the plugins 
>> > model described here: 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#adding-a-new-pluggable-file-system-implementation,
>> >  but it is unclear to me how to dynamically get the plugins directory to 
>> > be available when launching using a Yarn Cluster Descriptor. One thought 
>> > was to add the plugins to the shipFilesList, but I don't think that would 
>> > result in the plugins being in the correct directory location for Flink to 
>> > discover it.
>> >
>> > Is there another way to get the plugins onto the host when launching the 
>> > cluster? Or is there a different recommended way of doing this? Happy to 
>> > answer any questions if something is unclear.
>> >
>> > Thanks so much for your help!
>> >
>> > John


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-10 Thread Kostas Kloudas
Hi Alan,

Unfortunately not but the release is expected to come out in the next
couple of weeks, so then it will be available.
Until then, you can either copy the code of the AvroWriterFactory to
your project and use it from there, or download the project from
github, as you said.

Cheers,
Kostas

On Wed, Jun 10, 2020 at 9:24 AM Alan Żur  wrote:
>
> Hi Kostas,
>
> is this release available in maven central or should I download project from
> github?
>
> Thanks,
> Alan
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-09 Thread Kostas Kloudas
Hi Alan,

In the upcoming Flink 1.11 release, there will be support for Avro
using the AvroWriterFactory as seen in [1].
Do you think that this can solve your problem?

You can also download the current release-1.11 branch and also test it
out to see if it fits your needs.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-11395

On Tue, Jun 9, 2020 at 4:23 PM Alan Żur  wrote:
>
>
>
>
>
> Hi,
>
>
>
> I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, 
> however I’ve encountered problem with Avro writing to hdfs. Currently we’re 
> using Bucketing sink – which is deprecated. I’ve managed to replace few 
> Bucketing sinks with StreamingFileSink with row format. However I don’t have 
> any idea how to tackle Avro and Writer<> implementation.
>
>
>
> @Override
> protected void applySink(DataStream outputStream) {
> outputStream
> .keyBy(Feature::getSessionId)
> .addSink(createSink())
> .uid(UID_PART.concat("sink-v1"))
> .name(UID_PART.concat("hdfs_bucketing_sink"));
> }
>
> private SinkFunction createSFSink() {
> return StreamingFileSink
> .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
> ParquetAvroWriters.forGenericRecord(new 
> ComboFeatureAvroWriter().createSchema()))
> .build();
> }
>
> private BucketingSink createSink() {
> return new BucketingSink(hdfsPath)
> .setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", 
> ZoneOffset.UTC))
> .setBatchSize(batchSize)
> .setBatchRolloverInterval(batchRollingInterval)
> .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
> .setUseTruncate(useTruncate)
> .setWriter(new ComboFeatureAvroWriter());
> }
>
> Above function createSFSink() I took from 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to 
> write empty GenericRecord map just to get rid of compilation error – failed 
> (still giving improper type error). I’ve also tried to use 
> ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed


Re: Age old stop vs cancel debate

2020-06-09 Thread Kostas Kloudas
I understand. Thanks for looking into it Senthil!

Kostas

On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar  wrote:
>
> OK, will do and report back.
>
> We are on 1.9.1,
>
> 1.10 will take some time __
>
> On 6/9/20, 2:06 AM, "Kostas Kloudas"  wrote:
>
> Hi Senthil,
>
> From a quick look at the code, it seems that the cancel() of your
> source function should be called, and the thread that it is running on
> should be interrupted.
>
> In order to pin down the problem and help us see if this is an actual
> bug, could you please:
> 1) enable debug logging and see if you can spot some lines like this:
>
> "Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
> similar with synchronous savepoint in it
>
> and any other message afterwards with -ID in it to see if the
> savepoint is completed successfully.
>
> 2) could you see if this behavior persists in the FLINK-1.10?
>
> Thanks,
> Kostas
>
> On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
> >
> > Robert,
> >
> >
> >
> > Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
> >
> >
> >
> > Cheers
> >
> > Kumar
> >
> >
> >
> > From: Robert Metzger 
> > Date: Tuesday, June 2, 2020 at 2:40 AM
> > To: Senthil Kumar 
> > Cc: "user@flink.apache.org" 
> > Subject: Re: Age old stop vs cancel debate
> >
> >
> >
> > Hi Kumar,
> >
> >
> > this is more a Java question than a Flink question now :) If it is 
> easily possible from your code, then I would regularly check the isRunning 
> flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
> >
> > If this makes your code very complicated, then you could work with 
> manually interrupting your worker thread. I would only use this method if you 
> are sure your code (and the libraries you are using) are properly handling 
> interrupts.
> >
> > Sorry that I can not give you a more actionable response. It depends a 
> lot on the structure of your code and the libraries you are calling into.
> >
> >
> >
> > Best,
> >
> > Robert
> >
> >
> >
> >
> >
> > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  
> wrote:
> >
> > Hi Robert,
> >
> >
> >
> > Would appreciate more insights please.
> >
> >
> >
> > What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
> >
> >
> >
> > It certainly receives the exception when the flink job is issued a 
> cancel command.
> >
> >
> >
> > In both cases (cancel and stop) the cancel() method is getting called 
> (to set the isRunning variable to false)
> >
> >
> >
> > However, given that the thread does not get interrupted in stop, we are 
> not in a position to check the isRunning variable.
> >
> >
> >
> >
> >
> > For now, we are doing a Thread.sleep  every 5 minutes (instead of the 
> normal interval which is in hours).
> >
> > Sleeping for 5 minutes gives us a chance to check the isRunning 
> variable.
> >
> >
> >
> > Another approach would be to save the currentThread 
> (Thread.currentThread()) before doing a Thread.sleep())
> >
> > and manually calling Thread.interrupt() from the cancel function.
> >
> >
> >
> > What is your recommendation?
> >
> >
> >
> > Cheers
> >
> > Kumar
> >
> >
> >
> >
> >
> > From: Robert Metzger 
> > Date: Friday, May 29, 2020 at 4:38 AM
> > To: Senthil Kumar 
> > Cc: "user@flink.apache.org" 
> > Subject: Re: Age old stop vs cancel debate
> >
> >
> >
> > Hi Kumar,
> >
> >
> >
> > They way you've implemented your custom source sounds like the right 
> way: Having a "running" flag checked by the run() method and changing it in 
> cancel().
> >
> > Also, it is good that you are p

Re: Stopping a job

2020-06-09 Thread Kostas Kloudas
Hi all,

Just for future reference, there is an ongoing discussion on the topic at
another thread found in [1].
So please post any relevant comments there :)

Cheers,
Kostas

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Age-old-stop-vs-cancel-debate-td35514.html#a35615

On Tue, Jun 9, 2020 at 7:36 AM M Singh  wrote:

> Thanks Kostas, Arvid, and Senthil for your help.
>
> On Monday, June 8, 2020, 12:47:56 PM EDT, Senthil Kumar <
> senthi...@vmware.com> wrote:
>
>
> I am just stating this for completeness.
>
>
>
> When a job is cancelled, Flink sends an Interrupt signal to the Thread
> running the Source.run method
>
>
>
> For some reason (unknown to me), this does not happen when a Stop command
> is issued.
>
>
>
> We ran into some minor issues because of said behavior.
>
>
>
> *From: *Kostas Kloudas 
> *Date: *Monday, June 8, 2020 at 2:35 AM
> *To: *Arvid Heise 
> *Cc: *M Singh , User-Flink 
> *Subject: *Re: Stopping a job
>
>
>
> What Arvid said is correct.
>
> The only thing I have to add is that "stop" allows also exactly-once sinks
> to push out their buffered data to their final destination (e.g.
> Filesystem). In other words, it takes into account side-effects, so it
> guarantees exactly-once end-to-end, assuming that you are
> using exactly-once sources and sinks. Cancel with savepoint on the other
> hand did not necessarily and committing side-effects is was following a
> "best-effort" approach.
>
>
>
> For more information you can check [1].
>
>
>
> Cheers,
>
> Kostas
>
>
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D103090212=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560377282=AaA89A3Cq3iVoArqAS3BuvxGPFphztC4g7O6P96JxEs%3D=0>
>
>
>
> On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise  wrote:
>
> It was before I joined the dev team, so the following are kind of
> speculative:
>
>
>
> The concept of stoppable functions never really took off as it was a bit
> of a clumsy approach. There is no fundamental difference between stopping
> and cancelling on (sub)task level. Indeed if you look in the twitter source
> of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
> that this is probably true for all sources.
>
>
>
> So what is the difference between cancel and stop then? It's more the way
> on how you terminate the whole DAG. On cancelling, you cancel() on all
> tasks more or less simultaneously. If you want to stop, it's more a
> fine-grain cancel, where you stop first the sources and then let the tasks
> close themselves when all upstream tasks are done. Just before closing the
> tasks, you also take a snapshot. Thus, the difference should not be visible
> in user code but only in the Flink code itself (task/checkpoint coordinator)
>
>
>
> So for your question:
>
> 1. No, as on task level stop() and cancel() are the same thing on UDF
> level.
>
> 2. Yes, stop will be more graceful and creates a snapshot. [2]
>
> 3. Not that I am aware of. In the whole flink code base, there are no more
> (see javadoc). You could of course check if there are some in Bahir. But it
> shouldn't really matter. There is no huge difference between stopping and
> cancelling if you wait for a checkpoint to finish.
>
> 4. Okay you answered your second question ;) Yes cancel with savepoint =
> stop now to make it easier for new users.
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.6%2Fflink-connectors%2Fflink-connector-twitter%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fconnectors%2Ftwitter%2FTwitterSource.java%23L180-L190=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275=7xQ3BbQUveflErmTg34QsKvwOjlLnwS41xaoscjd57k%3D=0>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
> <https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.10%2Fops%2Fcli.html=02%7C01%7Csenthilku%40vmware.com%7C08b9084d245a427ac48108d80b86f556%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637272021560387275=u89koNzR4Ho2%2FPzQWyeEvYPX60c9FbP3kBrrngH

Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Kostas Kloudas
Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


Re: Age old stop vs cancel debate

2020-06-09 Thread Kostas Kloudas
Hi Senthil,

>From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
similar with synchronous savepoint in it

and any other message afterwards with -ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger 
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily 
> possible from your code, then I would regularly check the isRunning flag (by 
> having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with manually 
> interrupting your worker thread. I would only use this method if you are sure 
> your code (and the libraries you are using) are properly handling interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a lot on 
> the structure of your code and the libraries you are calling into.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel 
> command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to set 
> the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are not 
> in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
> interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread (Thread.currentThread()) 
> before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger 
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: 
> Having a "running" flag checked by the run() method and changing it in 
> cancel().
>
> Also, it is good that you are properly handling the interrupt set by Flink 
> (some people ignore InterruptedExceptions, which make it difficult (basically 
> impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via 
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps 
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
> the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the 
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not get 
> interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which 
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink job 
> receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got removed 
> in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar


Re: Stopping a job

2020-06-08 Thread Kostas Kloudas
What Arvid said is correct.
The only thing I have to add is that "stop" allows also exactly-once sinks
to push out their buffered data to their final destination (e.g.
Filesystem). In other words, it takes into account side-effects, so it
guarantees exactly-once end-to-end, assuming that you are
using exactly-once sources and sinks. Cancel with savepoint on the other
hand did not necessarily and committing side-effects is was following a
"best-effort" approach.

For more information you can check [1].

Cheers,
Kostas

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise  wrote:

> It was before I joined the dev team, so the following are kind of
> speculative:
>
> The concept of stoppable functions never really took off as it was a bit
> of a clumsy approach. There is no fundamental difference between stopping
> and cancelling on (sub)task level. Indeed if you look in the twitter source
> of 1.6 [1], cancel() and stop() are doing the exact same thing. I'd assume
> that this is probably true for all sources.
>
> So what is the difference between cancel and stop then? It's more the way
> on how you terminate the whole DAG. On cancelling, you cancel() on all
> tasks more or less simultaneously. If you want to stop, it's more a
> fine-grain cancel, where you stop first the sources and then let the tasks
> close themselves when all upstream tasks are done. Just before closing the
> tasks, you also take a snapshot. Thus, the difference should not be visible
> in user code but only in the Flink code itself (task/checkpoint coordinator)
>
> So for your question:
> 1. No, as on task level stop() and cancel() are the same thing on UDF
> level.
> 2. Yes, stop will be more graceful and creates a snapshot. [2]
> 3. Not that I am aware of. In the whole flink code base, there are no more
> (see javadoc). You could of course check if there are some in Bahir. But it
> shouldn't really matter. There is no huge difference between stopping and
> cancelling if you wait for a checkpoint to finish.
> 4. Okay you answered your second question ;) Yes cancel with savepoint =
> stop now to make it easier for new users.
>
> [1]
> https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
>
> On Sun, Jun 7, 2020 at 1:04 AM M Singh  wrote:
>
>>
>> Hi Arvid:
>>
>> Thanks for the links.
>>
>> A few questions:
>>
>> 1. Is there any particular interface in 1.9+ that identifies the source
>> as stoppable ?
>> 2. Is there any distinction b/w stop and cancel  in 1.9+ ?
>> 3. Is there any list of sources which are documented as stoppable besides
>> the one listed in your SO link ?
>> 4. In 1.9+ there is flink stop command and a flink cancel command. (
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).
>> So it appears that flink stop will take a savepoint and the call cancel,
>> and cancel will just cancel the job (looks like cancel with savepoint is
>> deprecated in 1.10).
>>
>> Thanks again for your help.
>>
>>
>>
>> On Saturday, June 6, 2020, 02:18:57 PM EDT, Arvid Heise <
>> ar...@ververica.com> wrote:
>>
>>
>> Yes, it seems as if FlinkKinesisConsumer does not implement it.
>>
>> Here are the links to the respective javadoc [1] and code [2]. Note that
>> in later releases (1.9+) this interface has been removed. Stop is now
>> implemented through a cancel() on source level.
>>
>> In general, I don't think that in a Kinesis to Kinesis use case, stop is
>> needed anyways, since there is no additional consistency expected over a
>> normal cancel.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
>> [2]
>> https://github.com/apache/flink/blob/release-1.6/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
>>
>> On Sat, Jun 6, 2020 at 8:03 PM M Singh  wrote:
>>
>> Hi Arvid:
>>
>> I check the link and it indicates that only Storm SpoutSource,
>> TwitterSource and NifiSource support stop.
>>
>> Does this mean that FlinkKinesisConsumer is not stoppable ?
>>
>> Also, can you please point me to the Stoppable interface mentioned in the
>> link ?  I found the following but am not sure if TwitterSource implements
>> it :
>>
>> https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
>>
>> Thanks
>>
>>
>>
>>
>>
>> On Friday, June 5, 2020, 02:48:49 PM EDT, Arvid Heise <
>> ar...@ververica.com> wrote:
>>
>>
>> Hi,
>>
>> could you check if this SO thread [1] helps you already?
>>
>> [1]
>> https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
>>
>> On Thu, Jun 4, 2020 at 7:43 PM M Singh  wrote:
>>
>> 

Re: Flink s3 streaming performance

2020-06-05 Thread Kostas Kloudas
Hi all,

@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in Flink [1] for a bit more efficient implementation.

Cheers,
Kostas

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java


On Fri, Jun 5, 2020 at 7:56 PM Arvid Heise  wrote:

> Hi Venkata,
>
> are the many small files intended or is it rather an issue of our commit
> on checkpointing? If so then FLINK-11499 [1] should help you. Design is
> close to done, unfortunately implementation will not make it into 1.11.
>
> In any case, I'd look at the parameter fs.s3a.connection.maximum, as you
> store both state and data on S3. I'd probably go with slot*3 or even higher.
>
> Lastly, the way you output elements looks also a bit suspicious.
> PrintStream is not known for great performance. I'm also surprised that it
> works without manual flushing.
>
> [1] https://issues.apache.org/jira/browse/FLINK-11499
>
> On Mon, Jun 1, 2020 at 9:52 AM Jörn Franke  wrote:
>
>> I think S3 is a wrong storage backend for this volumes of small messages.
>> Try to use a NoSQL database or write multiple messages into one file in
>> S3 (1 or 10)
>>
>> If you still want to go with your scenario then try a network optimized
>> instance and use s3a in Flink and configure s3 entropy.
>>
>> Am 31.05.2020 um 15:30 schrieb venkata sateesh` kolluru <
>> vkollur...@gmail.com>:
>>
>> 
>> Hi David,
>>
>> The avg size of each file is around 30KB and I have checkpoint interval
>> of 5 minutes. Some files are even 1 kb, because of checkpoint some files
>> are merged into 1 big file around 300MB.
>>
>> With 120 million files and 4Tb, if the rate of transfer is 300 per
>> minute, it is taking weeks to write to s3.
>>
>> I have tried to increase parallelism of sink but I dont see any
>> improvement.
>>
>> The sink record is Tuple3, the actual content of
>> file is f2. This is content is written to /f0/f1/part*-*
>>
>> I guess the prefix determination in custombucketassigner wont be causing
>> this delay?
>>
>> Could you please shed some light on writing custom s3 sink ?
>>
>> Thanks
>>
>>
>> On Sun, May 31, 2020, 6:34 AM David Magalhães 
>> wrote:
>>
>>> Hi Venkata.
>>>
>>> 300 requests per minute look like a 200ms per request, which should be a
>>> normal response time to send a file if there isn't any speed limitation
>>> (how big are the files?).
>>>
>>> Have you changed the parallelization to be higher than 1? I also
>>> recommend to limit the source parallelization, because it can consume
>>> pretty fast from Kafka and create some kind of backpressure.
>>>
>>> I don't any much experience with StreamingFileSink, because I've ended
>>> up using a custom S3Sink, but I did have some issues writing to S3 because
>>> the request wasn't parallelised. Check this thread,
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallisation-of-S3-write-sink-td34064.html#a34070
>>>
>>> On Sun, May 31, 2020 at 1:32 AM venkata sateesh` kolluru <
>>> vkollur...@gmail.com> wrote:
>>>
 Hello,

 I have posted the same in stackoverflow but didnt get any response. So
 posting it here for help.


 https://stackoverflow.com/questions/62068787/flink-s3-write-performance-optimization?noredirect=1#comment109814428_62068787

 Details:

 I am working on a flink application on kubernetes(eks) which consumes
 data from kafka and write it to s3.

 We have around 120 million xml messages of size 4TB in kafka. Consuming
 from kafka is super fast.

 These are just string messages from kafka.

 There is a high back pressure while writing to s3. We are not even
 hitting the s3 PUT request limit which is arounf 3500 requests/sec. I am
 seeing only 300 writes per minute to S3 which is very slow.

 I am using StreamFileSink to write to s3 with Rolling policy as
 OnCheckpointPolicy.

 Using flink-s3-fs-hadoop-*.jar and s3:// as path (not using any s3a or
 s3p)

 Other than this I dont have any config related to s3

 StreamingFileSink> sink = 
 StreamingFileSink
 .forRowFormat(new Path(s3://BUCKET),
 (Tuple3 element, OutputStream 
 stream) -> {
 PrintStream out = new PrintStream(stream);
 out.println(element.f2);
 })
 // Determine component type for each record
 .withBucketAssigner(new CustomBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withBucketCheckInterval((TimeUnit.MINUTES.toMillis(1)))
 .build();

 Is there 

Re: ClusterClientFactory selection

2020-05-27 Thread Kostas Kloudas
Hi Singh,

The only thing to add to what Yang said is that the "execution.target"
configuration option (in the config file) is also used for the same
purpose from the execution environments.

Cheers,
Kostas

On Wed, May 27, 2020 at 4:49 AM Yang Wang  wrote:
>
> Hi M Singh,
>
> The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
> could check YarnClusterClientFactory#isCompatibleWith for how it is activated.
> The cli option / configuration is "-e/--executor" or execution.target (e.g. 
> yarn-per-job).
>
>
> Best,
> Yang
>
> M Singh  于2020年5月26日周二 下午6:45写道:
>>
>> Hi:
>>
>> I wanted to find out which parameter/configuration allows flink cli pick up 
>> the appropriate cluster client factory (especially in the yarn mode).
>>
>> Thanks


[DISCUSS] Remove dependency shipping through nested jars during job submission.

2020-05-20 Thread Kostas Kloudas
Hi all,

I would like to bring the discussion in
https://issues.apache.org/jira/browse/FLINK-17745 to the dev mailing
list, just to hear the opinions of the community.

In a nutshell, in the early days of Flink, users could submit their
jobs as fat-jars that had a specific structure. More concretely, the
user could put the dependencies of the submitted job in a lib/ folder
within his/her jar and Flink would search within the user's jar for
such a folder, and if this existed, it would extract the nested jars,
ship them independently and add them to the classpath. Finally, it
would also ship the fat-jar itself so that the user-code is available
at the cluster (for details see [1]).

This way of submission was NOT documented anywhere and it has the
obvious shortcoming that the "nested" jars will be shipped twice. In
addition, it makes the codebase a bit more difficult to maintain, as
this constitutes another way of submitting stuff.

Given the above, I would like to propose to remove this codepath. But
given that there are users using the hidden feature, I would like to
discuss 1) how many such users exist, 2) how difficult it is for them
to "migrate" to a different way of submitting jobs, and 3) if the rest
of the community agrees on removing it.

I post this on both dev and user ML so that we have better coverage.

Looking forward to a fruitful discussion,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java#L222


Re: Fault tolerance in Flink file Sink

2020-04-27 Thread Kostas Kloudas
Hi Eyal and Dawid,

@Eyal I think Dawid explained pretty well what is happening and why in
distributed settings, the underlying FS on which the StreamingFileSink
writes has to be durable and accessible to all parallel instances of
the job. Please let us know if you have any further questions.

Cheers,
Kostas

On Mon, Apr 27, 2020 at 9:52 AM Eyal Pe'er  wrote:
>
> Hi Dawid,
> Thanks for the very detailed answer and the correct assumptions (I am using 
> row format).
>
> I tried not using NFS/S3, but seems like it is the only option I have.
>
> Best regards
>
> Eyal Peer
>
> From: Dawid Wysakowicz 
> Sent: Friday, April 24, 2020 4:20 PM
> To: Eyal Pe'er ; user 
> Subject: Re: Fault tolerance in Flink file Sink
>
>
>
> Hi Eyal,
>
> First of all I would say a local filesystem is not a right choice for what 
> you are trying to achieve. I don't think you can achive a true exactly once 
> policy in this setup. Let me elaborate why.
>
> Let me clarify a bit how the StreamingFileSink works.  The interesting bit is 
> how it behaves on checkpoints. The behavior is controlled by a RollingPolicy. 
> As you have not said what format you use lets assume you use row format 
> first. For a row format the default rolling policy (when to change the file 
> from in-progress to pending) is it will be rolled if the file reaches 128MB, 
> the file is older than 60 sec or it has not been written to for 60 sec. It 
> does not roll on a checkpoint. Moreover StreamingFileSink considers the 
> filesystem as a durable sink that can be accessed after a restore. That 
> implies that it will try to append to this file when restoring from 
> checkpoint/savepoint.
>
> Even if you rolled the files on every checkpoint you still might face the 
> problem that you can have some leftovers because the StreamingFileSink moves 
> the files from pending to complete after the checkpoint is completed. If a 
> failure happens between finishing the checkpoint and moving the files it will 
> not be able to move them after a restore (it would do it if had an access).
>
> Lastly a completed checkpoint will contain offsets of records that were 
> processed successfully end-to-end, that means records that are assumed 
> committed by the StreamingFileSink. This can be records written to an 
> in-progress file with a pointer in a StreamingFileSink checkpointed metadata, 
> records in a "pending" file with an entry in a StreamingFileSink checkpointed 
> metadata that this file has been completed or records in "finished" files.[1]
>
> Therefore as you can see there are multiple scenarios when the 
> StreamingFileSink has to access the files after a restart.
>
> Last last thing, you mentioned "committing to the "bootstrap-server". Bear in 
> mind that Flink does not use offsets committed back to Kafka for guaranteeing 
> consistency. It can write those offsets back but only for 
> monitoring/debugging purposes. Flink stores/restores the processed offsets 
> from its checkpoints.[3]
>
> Let me know if it helped. I tried my best ;) BTW I highly encourage reading 
> the linked sources as they try to describe all that in a more structured way.
>
> I am also cc'ing Kostas who knows more about the StreamingFileSink than I 
> do., so he can maybe correct me somewhere.
>
>  Best,
>
> Dawid
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
>
> [3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>
> On 23/04/2020 12:11, Eyal Pe'er wrote:
>
> Hi all,
> I am using Flink streaming with Kafka consumer connector (FlinkKafkaConsumer) 
> and file Sink (StreamingFileSink) in a cluster mode with exactly once policy.
>
> The file sink writes the files to the local disk.
>
> I’ve noticed that if a job fails and automatic restart is on, the task 
> managers look for the leftovers files from the last failing job (hidden 
> files).
>
> Obviously, since the tasks can be assigned to different task managers, this 
> sums up to more failures over and over again.
>
> The only solution I found so far is to delete the hidden files and resubmit 
> the job.
>
> If I get it right (and please correct me If I wrong), the events in the 
> hidden files were not committed to the bootstrap-server, so there is no data 
> loss.
>
>
>
> Is there a way, forcing Flink to ignore the files that were written already? 
> Or maybe there is a better way to implement the solution (maybe somehow with 
> savepoints)?
>
>
>
> Best regards
>
> Eyal Peer
>
>


Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
I would say so, yes.
Also could you set the paths where you want to use Presto to "s3p", as
described in [1], just to be sure that there is not ambiguity.

You could also make use of [2].

And thanks for looking into it!

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific
[2] 
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose  wrote:
>
> Btw, I ran the same exact code on a local Flink cluster run with 
> `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the 
> part files do not roll over; however, with the local filesystem it works 
> perfectly. Should I be looking at the S3Committer in Flink to see if there is 
> something odd going on?
>
> On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose  wrote:
>>
>> Nope just the s3a. I'll keep looking around to see if there is anything else 
>> I can see. If you think of anything else to try, let me know.
>>
>> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas  wrote:
>>>
>>> It should not be a problem because from what you posted, you are using
>>> "s3a" as the scheme for s3.
>>> Are you using "s3p" for Presto? This should also be done in order for
>>> Flink to understand where to use the one or the other.
>>>
>>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
>>> >
>>> > Lastly, could it be the way I built the flink image for kube? I added 
>>> > both the presto and Hadoop plugins
>>> >
>>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
>>> >>
>>> >> Sorry realized this came off the user list by mistake. Adding the thread 
>>> >> back in.
>>> >>
>>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
>>> >>>
>>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> >>> don't know all the places to look for the logs. Been looking at the 
>>> >>> task manager logs and don't see any exceptions there. Not sure where to 
>>> >>> look for s3 exceptions in particular.
>>> >>>
>>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:
>>> >>>>
>>> >>>> Yes, this is why I reached out for further information.
>>> >>>>
>>> >>>> Incrementing the part counter is the responsibility of the
>>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>> >>>> in the local FS.
>>> >>>> Now if it is on the S3 side, it would help if you have any more info,
>>> >>>> for example any logs from S3, to see if anything went wrong on their
>>> >>>> end.
>>> >>>>
>>> >>>> So your logs refer to normal execution, i.e. no failures and no
>>> >>>> restarting, right?
>>> >>>>
>>> >>>> Cheers,
>>> >>>> Kostas
>>> >>>>
>>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose  
>>> >>>> wrote:
>>> >>>> >
>>> >>>> > Surprisingly the same code running against the local filesystem 
>>> >>>> > works perfectly. The part counter increments correctly.
>>> >>>> >
>>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas  
>>> >>>> > wrote:
>>> >>>> >>
>>> >>>> >> Hi Roshan,
>>> >>>> >>
>>> >>>> >> Your logs refer to a simple run without any failures or re-running
>>> >>>> >> from a savepoint, right?
>>> >>>> >>
>>> >>>> >> I am asking because I am trying to reproduce it by running a 
>>> >>>> >> modified
>>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>> >>>> >> The ITCase runs against the local filesystem, and not S3, but I 
>>> >>>> >> added
>>> >>>> >> the OutputFileConfig and it seems that the part counter is increases
>>> >>>> >> as expected.
>>> >>>> >>
>>> >>>> >> Is there any other information that would help us reproduc

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
>
> Lastly, could it be the way I built the flink image for kube? I added both 
> the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread 
>> back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> don't know all the places to look for the logs. Been looking at the task 
>>> manager logs and don't see any exceptions there. Not sure where to look for 
>>> s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:
>>>>
>>>> Yes, this is why I reached out for further information.
>>>>
>>>> Incrementing the part counter is the responsibility of the
>>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>> in the local FS.
>>>> Now if it is on the S3 side, it would help if you have any more info,
>>>> for example any logs from S3, to see if anything went wrong on their
>>>> end.
>>>>
>>>> So your logs refer to normal execution, i.e. no failures and no
>>>> restarting, right?
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose  wrote:
>>>> >
>>>> > Surprisingly the same code running against the local filesystem works 
>>>> > perfectly. The part counter increments correctly.
>>>> >
>>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas  wrote:
>>>> >>
>>>> >> Hi Roshan,
>>>> >>
>>>> >> Your logs refer to a simple run without any failures or re-running
>>>> >> from a savepoint, right?
>>>> >>
>>>> >> I am asking because I am trying to reproduce it by running a modified
>>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>>> >> the OutputFileConfig and it seems that the part counter is increases
>>>> >> as expected.
>>>> >>
>>>> >> Is there any other information that would help us reproduce the issue?
>>>> >>
>>>> >> Cheers,
>>>> >> Kostas
>>>> >>
>>>> >> [1] 
>>>> >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>> >>
>>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose  
>>>> >> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I am trying to get the parquet writer to write to s3; however, the 
>>>> >> > files do not seem to be rolling over. The same file 
>>>> >> > "part-0-0.parquet" is being created each time. Like the 'partCounter" 
>>>> >> > is not being updated? Maybe the Bucket is being recreated each time? 
>>>> >> > I don't really know... Here are some logs:
>>>> >> >
>>>> >> > 2020-04-09 01:28:10,350 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>>> >> > 2020-04-09 01:28:10,589 INFO 
>>>> >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
>>>> >> > Subtask 0 received completion notification for checkpoint with id=2.
>>>> >> > 2020-04-09 01:28:10,589 INFO 
>>>> >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
>>>> >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
>>>> >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>>> >> > 2020-04-09 01:29:10,350 INFO

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
Hi Roshan,

Your logs refer to a simple run without any failures or re-running
from a savepoint, right?

I am asking because I am trying to reproduce it by running a modified
ParquetStreamingFileSinkITCase [1] and so far I cannot.
The ITCase runs against the local filesystem, and not S3, but I added
the OutputFileConfig and it seems that the part counter is increases
as expected.

Is there any other information that would help us reproduce the issue?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose  wrote:
>
> Hi,
>
> I am trying to get the parquet writer to write to s3; however, the files do 
> not seem to be rolling over. The same file "part-0-0.parquet" is being 
> created each time. Like the 'partCounter" is not being updated? Maybe the 
> Bucket is being recreated each time? I don't really know... Here are some 
> logs:
>
> 2020-04-09 01:28:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=2 (max part counter=2).
> 2020-04-09 01:28:10,589 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=2.
> 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> 2020-04-09 01:29:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=3 (max part counter=3).
> 2020-04-09 01:29:10,520 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=3.
> 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> And a part of my code:
>
> ```
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //env.setParallelism(2);
> env.enableCheckpointing(6L);
> ///PROPERTIES Added
> Schema schema = bro_conn.getClassSchema();
>
> OutputFileConfig config = OutputFileConfig
> .builder()
> .withPartSuffix(".parquet")
> .build();
>
> final StreamingFileSink sink = StreamingFileSink
> .forBulkFormat(new Path("s3a:///bro_conn/"), 
> ParquetAvroWriters.forGenericRecord(schema))
> //.withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withOutputFileConfig(config)
> //.withBucketAssigner(new PartitioningBucketAssigner())
> .build();
>
> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis", new SimpleStringSchema(), consumerConfig));
>
> kinesis.flatMap(new JsonAvroParser())
> .addSink(sink);
>
>
> env.execute("Bro Conn");
>
> ```
>
> I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom 
> image to add the presto/hadoop plugin.
>
> Thanks again!


Re: Object has non serializable fields

2020-03-24 Thread Kostas Kloudas
Hi Eyal,

This is a known issue which is fixed now (see [1]) and will be part of
the next releases.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-16371

On Tue, Mar 24, 2020 at 11:10 AM Eyal Pe'er  wrote:
>
> Hi all,
>
> I am trying to write a sink function that retrieves string and creates 
> compressed files in time buckets.
>
> The code is pretty straight forward and based on CompressWriterFactoryTest
>
>
>
> import org.apache.flink.core.fs.Path;
>
> import org.apache.flink.formats.compress.CompressWriterFactory;
>
> import org.apache.flink.formats.compress.extractor.DefaultExtractor;
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
>
> import org.apache.flink.formats.compress.CompressWriters;
>
> import org.apache.hadoop.conf.Configuration;
>
>
>
> CompressWriterFactory writer = CompressWriters.forExtractor(new 
> DefaultExtractor())
>
> .withHadoopCompression("Gzip", new Configuration());
>
>
>
> StreamingFileSink.forBulkFormat(new Path(getTargetPath()), writer)
>
> .withBucketAssigner(new 
> DateTimeBucketAssigner<>(getDataTimeBucketFormat(getDataTimeBucket(.build();
>
>
>
>
>
>
>
> When I tried to add it as a sink (dataStream.addSink) the app crashed due to:
>
>
>
> org.apache.hadoop.io.compress.GzipCodec@55e3d6c3 is not serializable. The 
> object probably contains or references non serializable fields.
>
>
>
> Well, I guess I used something wrong, but I am not sure what ?
>
> Or maybe I should convert the SinkFunction to serializable one, but how can I 
> do it?
>
> Best regards
>
> Eyal Peer
>
>


Re: Very large _metadata file

2020-03-05 Thread Kostas Kloudas
Hi Jacob,

As I said previously I am not 100% sure what can be causing this
behavior, but this is a related thread here:
https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E

Which you can re-post your problem and monitor for answers.

Cheers,
Kostas

On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>
> Kostas and Gordon,
>
> Thanks for the suggestions! I'm on RocksDB. We don't have that setting 
> configured so it should be at the default 1024b. This is the full "state.*" 
> section showing in the JobManager UI.
>
>
>
> Jacob
>
> On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai  
> wrote:
>>
>> Hi Jacob,
>>
>> Apart from what Klou already mentioned, one slightly possible reason:
>>
>> If you are using the FsStateBackend, it is also possible that your state is 
>> small enough to be considered to be stored inline within the metadata file.
>> That is governed by the "state.backend.fs.memory-threshold" configuration, 
>> with a default value of 1024 bytes, or can also be configured with the 
>> `fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
>> The purpose of that threshold is to ensure that the backend does not create 
>> a large amount of very small files, where potentially the file pointers are 
>> actually larger than the state itself.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:
>>>
>>> Hi Jacob,
>>>
>>> Could you specify which StateBackend you are using?
>>>
>>> The reason I am asking is that, from the documentation in [1]:
>>>
>>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>>> state will be stored in the _metadata file. Since it is
>>> self-contained, you may move the file and restore from any location."
>>>
>>> I am also cc'ing Gordon who may know a bit more about state formats.
>>>
>>> I hope this helps,
>>> Kostas
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>>>
>>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>>> >
>>> > Per the documentation:
>>> >
>>> > "The meta data file of a Savepoint contains (primarily) pointers to all 
>>> > files on stable storage that are part of the Savepoint, in form of 
>>> > absolute paths."
>>> >
>>> > I somehow have a _metadata file that's 1.9GB. Running strings on it I 
>>> > find 962 strings, most of which look like HDFS paths, which leaves a lot 
>>> > of that file-size unexplained. What else is in there, and how exactly 
>>> > could this be happening?
>>> >
>>> > We're running 1.6.
>>> >
>>> > Jacob
>
>
>
> --
> Jacob Sevart
> Software Engineer, Safety


Re: StreamingFileSink Not Flushing All Data

2020-03-05 Thread Kostas Kloudas
Thanks Austin,

If the CompressionWriterFactory works for you in 1.10, then you can copy it
as is in 1.9 and use it. The BulkWriter interfaces have not changed between
the versions (as far as I recall). But please keep in mind that there is a
bug in the CompressWriterFactory with a pending PR that fixes it (
https://github.com/apache/flink/pull/11307). So if you copy and try to use
it please include the patch from that PR.

As for the documentation, if you are willing to contribute that would be a
great help. You can open an issue and submit a PR with an example, as done
for the other bulk formats in the documentation here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bulk-encoded-formats

Let us know if it works for you!

Cheers,
Kostas

On Thu, Mar 5, 2020 at 1:43 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Kostas,
>
> We’re a little bit off from a 1.10 update but I can certainly see if that
> CompressWriterFactory might solve my use case for when we do.
>
> If there is anything I can do to help document that feature, please let me
> know.
>
> Thanks!
>
> Austin
>
> On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas  wrote:
>
>> Hi Austin,
>>
>> I will have a look at your repo. In the meantime, given that [1] is
>> already merged in 1.10,
>> would upgrading to 1.10 and using the newly introduced
>> CompressWriterFactory be an option for you?
>>
>> It is unfortunate that this feature was not documented.
>>
>> Cheers,
>> Kostas
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13634
>>
>>
>> On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy
>>> Kostas -- strange though, as I wasn't using a bounded source when I first
>>> ran into this issue. I have updated the example repo to use an unbounded
>>> source[1], and the same file corruption problems remain.
>>>
>>> Anything else I could be doing wrong with the compression stream?
>>>
>>> Thanks again,
>>> Austin
>>>
>>> [1]:
>>> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>>>
>>> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas 
>>> wrote:
>>>
>>>> Hi Austin and Rafi,
>>>>
>>>> @Rafi Thanks for providing the pointers!
>>>> Unfortunately there is no progress on the FLIP (or the issue).
>>>>
>>>> @ Austin In the meantime, what you could do --assuming that your input
>>>> is bounded --  you could simply not stop the job after the whole input is
>>>> processed, then wait until the output is committed, and then cancel the
>>>> job. I know and I agree that this is not an elegant solution but it is a
>>>> temporary workaround.
>>>>
>>>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> This happens because StreamingFileSink does not support a finite input
>>>>> stream.
>>>>> In the docs it's mentioned under "Important Considerations":
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> This behaviour often surprises users...
>>>>>
>>>>> There's a FLIP
>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>>>>  and
>>>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>>>>> fixing this. I'm not sure what's the status though, maybe Kostas can 
>>>>> share.
>>>>>
>>>>> Thanks,
>>>>> Rafi
>>>>>
>>>>>
>>>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>>>> austin.caw...@gmail.com> wrote:
>>>>>
>>>>>> Hi Dawid and Kostas,
>>>>>>
>>>>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>>>>> together an example repo that reproduces the issue[1], because I did have
>>>>>> checkpointing enabled in my previous case -- still must be doing 
>>>>>> something
>>>>>> wrong with that config

Re: Very large _metadata file

2020-03-04 Thread Kostas Kloudas
Hi Jacob,

Could you specify which StateBackend you are using?

The reason I am asking is that, from the documentation in [1]:

"Note that if you use the MemoryStateBackend, metadata and savepoint
state will be stored in the _metadata file. Since it is
self-contained, you may move the file and restore from any location."

I am also cc'ing Gordon who may know a bit more about state formats.

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html

On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>
> Per the documentation:
>
> "The meta data file of a Savepoint contains (primarily) pointers to all files 
> on stable storage that are part of the Savepoint, in form of absolute paths."
>
> I somehow have a _metadata file that's 1.9GB. Running strings on it I find 
> 962 strings, most of which look like HDFS paths, which leaves a lot of that 
> file-size unexplained. What else is in there, and how exactly could this be 
> happening?
>
> We're running 1.6.
>
> Jacob


Re: StreamingFileSink Not Flushing All Data

2020-03-04 Thread Kostas Kloudas
Hi Austin,

I will have a look at your repo. In the meantime, given that [1] is already
merged in 1.10,
would upgrading to 1.10 and using the newly introduced
CompressWriterFactory be an option for you?

It is unfortunate that this feature was not documented.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13634


On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi all,
>
> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas
> -- strange though, as I wasn't using a bounded source when I first ran into
> this issue. I have updated the example repo to use an unbounded source[1],
> and the same file corruption problems remain.
>
> Anything else I could be doing wrong with the compression stream?
>
> Thanks again,
> Austin
>
> [1]:
> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>
> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas  wrote:
>
>> Hi Austin and Rafi,
>>
>> @Rafi Thanks for providing the pointers!
>> Unfortunately there is no progress on the FLIP (or the issue).
>>
>> @ Austin In the meantime, what you could do --assuming that your input is
>> bounded --  you could simply not stop the job after the whole input is
>> processed, then wait until the output is committed, and then cancel the
>> job. I know and I agree that this is not an elegant solution but it is a
>> temporary workaround.
>>
>> Hopefully the FLIP and related issue is going to be prioritised soon.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>>
>>> Hi,
>>>
>>> This happens because StreamingFileSink does not support a finite input
>>> stream.
>>> In the docs it's mentioned under "Important Considerations":
>>>
>>> [image: image.png]
>>>
>>> This behaviour often surprises users...
>>>
>>> There's a FLIP
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>
>>>  and
>>> an issue <https://issues.apache.org/jira/browse/FLINK-13103> about
>>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hi Dawid and Kostas,
>>>>
>>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>>> together an example repo that reproduces the issue[1], because I did have
>>>> checkpointing enabled in my previous case -- still must be doing something
>>>> wrong with that config though.
>>>>
>>>> Thanks!
>>>> Austin
>>>>
>>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>>
>>>>
>>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
>>>> wrote:
>>>>
>>>>> Hi Austin,
>>>>>
>>>>> Dawid is correct in that you need to enable checkpointing for the
>>>>> StreamingFileSink to work.
>>>>>
>>>>> I hope this solves the problem,
>>>>> Kostas
>>>>>
>>>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>>>  wrote:
>>>>> >
>>>>> > Hi Austing,
>>>>> >
>>>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>>>> checkpoints. If you don't have checkpoints enabled it might happen that 
>>>>> not
>>>>> all data is flushed.
>>>>> >
>>>>> > I think you can also adjust that behavior with:
>>>>> >
>>>>> > forBulkFormat(...)
>>>>> >
>>>>> > .withRollingPolicy(/* your custom logic */)
>>>>> >
>>>>> > I also cc Kostas who should be able to correct me if I am wrong.
>>>>> >
>>>>> > Best,
>>>>> >
>>>>> > Dawid
>>>>> >
>>>>> > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>>>>> >
>>>>> > Hi there,
>>>>> >
>>>>> > Using Flink 1.9.1, trying to write .tgz files with the
>>>>> StreamingFileSink#BulkWriter. It seems like flushing the output stream
>>>>> doesn't flush all the dat

Re: StreamingFileSink Not Flushing All Data

2020-02-24 Thread Kostas Kloudas
Hi Austin,

Dawid is correct in that you need to enable checkpointing for the
StreamingFileSink to work.

I hope this solves the problem,
Kostas

On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
 wrote:
>
> Hi Austing,
>
> If I am not mistaken the StreamingFileSink by default flushes on checkpoints. 
> If you don't have checkpoints enabled it might happen that not all data is 
> flushed.
>
> I think you can also adjust that behavior with:
>
> forBulkFormat(...)
>
> .withRollingPolicy(/* your custom logic */)
>
> I also cc Kostas who should be able to correct me if I am wrong.
>
> Best,
>
> Dawid
>
> On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
>
> Hi there,
>
> Using Flink 1.9.1, trying to write .tgz files with the 
> StreamingFileSink#BulkWriter. It seems like flushing the output stream 
> doesn't flush all the data written. I've verified I can create valid files 
> using the same APIs and data on there own, so thinking it must be something 
> I'm doing wrong with the bulk format. I'm writing to the local filesystem, 
> with the `file://` protocol.
>
> For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 
> 1.20.
>
> Here's a runnable example of the issue:
>
> import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
> import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
> import 
> org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
> import org.apache.flink.api.common.serialization.BulkWriter;
> import org.apache.flink.core.fs.FSDataOutputStream;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.Serializable;
> import java.nio.charset.StandardCharsets;
>
> class Scratch {
>   public static class Record implements Serializable {
> private static final long serialVersionUID = 1L;
>
> String id;
>
> public Record() {}
>
> public Record(String id) {
>   this.id = id;
> }
>
> public String getId() {
>   return id;
> }
>
> public void setId(String id) {
>   this.id = id;
> }
>   }
>
>   public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> TarArchiveOutputStream taos = new TarArchiveOutputStream(new 
> GzipCompressorOutputStream(new 
> FileOutputStream("/home/austin/Downloads/test.tgz")));
> TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", 
> "test"));
> String fullText = "hey\nyou\nwork";
> byte[] fullTextData = fullText.getBytes();
> fileEntry.setSize(fullTextData.length);
> taos.putArchiveEntry(fileEntry);
> taos.write(fullTextData, 0, fullTextData.length);
> taos.closeArchiveEntry();
> taos.flush();
> taos.close();
>
> StreamingFileSink textSink = StreamingFileSink
> .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
> new BulkWriter.Factory() {
>   @Override
>   public BulkWriter create(FSDataOutputStream out) throws 
> IOException {
> final TarArchiveOutputStream compressedOutputStream = new 
> TarArchiveOutputStream(new GzipCompressorOutputStream(out));
>
> return new BulkWriter() {
>   @Override
>   public void addElement(Record record) throws IOException {
> TarArchiveEntry fileEntry = new 
> TarArchiveEntry(String.format("%s.txt", record.id));
> byte[] fullTextData = 
> "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
> fileEntry.setSize(fullTextData.length);
> compressedOutputStream.putArchiveEntry(fileEntry);
> compressedOutputStream.write(fullTextData, 0, 
> fullTextData.length);
> compressedOutputStream.closeArchiveEntry();
>   }
>
>   @Override
>   public void flush() throws IOException {
> compressedOutputStream.flush();
>   }
>
>   @Override
>   public void finish() throws IOException {
> this.flush();
>   }
> };
>   }
> })
> .withBucketCheckInterval(1000)
> .build();
>
> env
> .fromElements(new Record("1"), new Record("2"))
> .addSink(textSink)
> .name("Streaming File Sink")
> .uid("streaming-file-sink");
> env.execute("streaming file sink test");
>   }
> }
>
>
> From the stat/ hex dumps, you can see that the first bits are there, but are 
> then cut off:
>
> ~/Downloads » stat test.tgz
>   File: test.tgz
>   Size: 114   Blocks: 8  IO Block: 4096   

Re: FlinkCEP questions - architecture

2020-02-17 Thread Kostas Kloudas
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a 

Re: CEP with changing threshold

2020-02-13 Thread Kostas Kloudas
Hi Hemant,

Why not using simple connected streams, one containing the
measurements, and the other being the control stream with the
thresholds which are updated from time to time.
Both will be keyed by the device class, to make sure that the
measurements and the thresholds for a specific device class will go to
the same machines.

The "current" thresholds you keep them in state as they come from the
control stream and the measurements you also keep them in a mapState
keyed by their timestamp.

When an element comes from the measurements side, your
KeyedCoProcessFunction fetches the thresholds from the "control state"
and goes to the elements state and fetches all the elements for N
units of time in the past and does the computation and purges
measurements that are too old to be useful (so that your state does
not grow indefinitely).

This solution does not use CEP but it gives you the freedom to do any
optimisations related to your usecase.

I hope this helps,
Kostas

On Wed, Feb 12, 2020 at 10:40 AM hemant singh  wrote:
>
> Hello Flink Users,
>
> I have a requirement to generate alerts for metrics like for example - if cpu 
> utilization spike i.e cpu_utilization > threshold (>90%) n number of time in 
> x minutes then generate alerts. For this I am using the CEP module. However, 
> one of the requirements is for different devices the threshold can be 
> different as ell as x and n in above statement. Moreover, for different 
> device class this will be different, also this can change in future.
> I am thinking of using Broadcast State Pattern and enrich the metrics stream 
> with this thresholds & rule and use it later in CEP pattern. One issue is how 
> to make sure that if new threshold values come in how the broadcast stream 
> will change. I have an understanding that if I can introduce a watermark in 
> broadcast stream when values change the KeyedBroadcastProcessFunction will 
> have latest values streamed.
> Is my understanding correct and if anyone has implemented something like this 
> can weigh in if this is right way to do it.
>
> Thanks,
> Hemant
>


Re: Dedup all data in stream

2020-02-13 Thread Kostas Kloudas
Hi Akshay,

Is your usecase that the input stream consists of metrics from these
1000s of resources, the ProcessFunction aggregates
them in windows of 2min and does some analysis on these metrics and
this analysis may take more than 2 min so you create backpressure to
the source?

If this case, if the metric records are timestamped, then you can use
event time and you will have your metrics in the correct timestamp
order
and eventually your stream will catch up (assuming that you have
enough resources - parallelism).
If you want this analysis of the incoming metrics to be performed by
another thread and while this is happening
ignore any other incoming records, then you should look towards the
direction AsyncIO that I posted previously.

This will guarantee that you will have fault tolerance and
asynchronous processing.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 6:33 PM Akshay Shinde  wrote:
>
> Hi Kostas
>
> We are doing scans on 1000s of resources which we want to do it at some 
> interval which is currently 2 mins. Scanning is the same operation we want to 
> perform at every 2 minutes to check if everything is ok or not. Sometimes 
> this scan operation takes lot of time which results in lag and in stream 
> (which we produce from source function) we are getting multiple sets of data 
> for same 1000s of resources. At this time we are okay if perform scan 
> operation only once for all the set that are present currently in stream.
>
> Parallelism for source function is 1 and for Process function its currently 2.
>
> Thanks for the response.
>
> —
> Akshay
>
> > On Feb 12, 2020, at 2:07 AM, Kostas Kloudas  wrote:
> >
> > Hi Akshay,
> >
> > Could you be more specific on what you are trying to achieve with this 
> > scheme?
> >
> > I am asking because if your source is too fast and you want it to slow
> > it down so that it produces data at the same rate as your process
> > function can consume them, then Flink's backpressure will eventually
> > do this.
> >
> > If you want your process function to discard incoming elements (and
> > not queue them) if it is in the process of processing another element,
> > then this implies a multithreaded process function and I would look
> > maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
> > setting a flag as busy while processing and as false when it is done
> > and ready to process the next element.
> >
> > Also, in order to help, I would need more information about the stream
> > being keyed or non-keyed and the parallelism of the source compared to
> > that of the process function.
> >
> > I hope this helps,
> > Kostas
> >
> > [1] 
> > https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html__;!!GqivPVa7Brio!NE2BACN8q5265oyZUvBg44u3mq7sGn96u3rPtcVbFq9DovpIa1KtilsCXW3mtYofoQw$
> >
> > On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde  
> > wrote:
> >>
> >> Hi Community
> >>
> >> In our Flink job, in source we are creating our own stream to process n 
> >> number of objects per 2 minutes. And in process function for each object 
> >> from generated source stream we are doing some operation which we expect 
> >> to get finished in 2 minutes.
> >>
> >> Every 2 minutes we are generating same ’N’ objects in stream which process 
> >> function will process.  But in some cases process function is taking 
> >> longer time around 10 minutes. In this case stream will have 5 number of 
> >> sets for ’N’ objects as process function is waiting for 10 minutes as 
> >> source is adding ’N’ objects in stream at every 2 minutes. Problem is we 
> >> don’t want to process these objects 5 times, we want it to process only 
> >> once for the latest ’N’ objects.
> >>
> >> This lag can be more or less from process function which results in lag 
> >> from source to process function in job execution.
> >>
> >>
> >> Thanks in advance !!!
>


Re: Aggregation for last n seconds for each event

2020-02-13 Thread Kostas Kloudas
Hi Oleg,

With the approach with the MapState you can always fire on every
incoming element :)
You just iterate in the map state and find all the elements that have
timestamp (key) between the timestamp of the current element (NOW) and
and NOW-N.

Anyway, if Fanbin's solution works, then you can always use that!

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь  wrote:
>
> Hi Kostas,
>
> Thanks for your reply!
> Yes, you understand me correctly. However, I also want the stream to be keyed 
> to process it in parallel. I'm afraid the approach with MapState you 
> suggested doesn't really suite my use case because I need to fire on every 
> incoming event.
> Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT 
> ROW" looks 100% like what I need, but I haven't tried it yet.
> Also wondering if it might be expressed in DataStream API.
>
> ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas :
>>
>> Hi Oleg,
>>
>> Could you be more specific on what do you mean by
>> "for events of last n seconds(time units in general) for every incoming 
>> event."?
>>
>> Do you mean that you have a stream of parallelism 1 and you want for
>> each incoming element to have your function fire with input the event
>> itself and all the events that arrived within the last N time units?
>> If this is the case, you can use a dummy key to key your stream to
>> have access to keyed state, then use Map State with key being the
>> timestamp and value being a list of the already seen elements with
>> that timestamp and whenever an element arrives, you can register a
>> timer to fire N time units in the future. Then, when the timer fires,
>> you can iterate over the map, fetch the elements you are interested
>> in, and clean-up whatever you will not need anymore.
>>
>> For an example you could look at [1].
>>
>> I hope this helps,
>> Kostas
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu  wrote:
>> >
>> > can u do
>> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>> >
>> > On Tue, Feb 11, 2020 at 12:15 PM oleg  wrote:
>> >>
>> >> Hi Community,
>> >>
>> >> I do streaming in event time and I want to preserve ordering and late
>> >> events. I have a use case where I need to fire an aggregation function
>> >> for events of last n seconds(time units in general) for every incoming
>> >> event.
>> >>
>> >> It seems to me that windowing is not suitable since it may be expressed
>> >> either in time or in events count, not "last n seconds for each single
>> >> event".
>> >>
>> >> Is there an idiomatic way to do this? Any examples or help are
>> >> appreciated. Thanks in advance.
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Oleg Bonar
>> >>


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva,

Yes, the same applies to the Operator API as the output is not
thread-safe and there is no way of "checkpointing" the "in-flight"
data without explicit handling.
If you want to dig deeper, I would recommend to have a look also at
the source code of the AsyncWaitOperator to see how you could bypass
these limitations with a custom operator. In fact, you may also be
able to optimise your operator for your specific usecase.

Cheers,
Kostas

On Wed, Feb 12, 2020 at 1:02 PM Salva Alcántara  wrote:
>
> Would your comment still apply if I was using AbstractStreamOperator (passing
> its output when registering the callbacks) instead of a UDF? Maybe the
> situation changes if I use the Operator API instead...
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dedup all data in stream

2020-02-12 Thread Kostas Kloudas
Hi Akshay,

Could you be more specific on what you are trying to achieve with this scheme?

I am asking because if your source is too fast and you want it to slow
it down so that it produces data at the same rate as your process
function can consume them, then Flink's backpressure will eventually
do this.

If you want your process function to discard incoming elements (and
not queue them) if it is in the process of processing another element,
then this implies a multithreaded process function and I would look
maybe towards the AsyncIO [1] pattern with the AsyncFunction somehow
setting a flag as busy while processing and as false when it is done
and ready to process the next element.

Also, in order to help, I would need more information about the stream
being keyed or non-keyed and the parallelism of the source compared to
that of the process function.

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Wed, Feb 12, 2020 at 3:34 AM Akshay Shinde  wrote:
>
> Hi Community
>
> In our Flink job, in source we are creating our own stream to process n 
> number of objects per 2 minutes. And in process function for each object from 
> generated source stream we are doing some operation which we expect to get 
> finished in 2 minutes.
>
> Every 2 minutes we are generating same ’N’ objects in stream which process 
> function will process.  But in some cases process function is taking longer 
> time around 10 minutes. In this case stream will have 5 number of sets for 
> ’N’ objects as process function is waiting for 10 minutes as source is adding 
> ’N’ objects in stream at every 2 minutes. Problem is we don’t want to process 
> these objects 5 times, we want it to process only once for the latest ’N’ 
> objects.
>
> This lag can be more or less from process function which results in lag from 
> source to process function in job execution.
>
>
> Thanks in advance !!!


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-12 Thread Kostas Kloudas
Hi Salva and Yun,

Yun is correct on that the collector is not thread-safe so writing
should be guarded.

In addition, such a pattern that issues a request to a 3rd party
multi-threaded library and registers a callback for the future does
not play well with checkpointing. In your case, if a failure happens,
the data (or requests) that is "in-flight" are not part of any
checkpoint, thus you may have data loss. Your pattern seems more
suitable to the AsyncIO pattern [1] supported by Flink and it may make
sense to use that for you project.

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

On Wed, Feb 12, 2020 at 9:03 AM Yun Gao  wrote:
>
>   Hi Salva,
>
> As far as I know,
>1. Out : Collector[T] could not support multi-thread 
> accessing, namely there could be only one thread writing records at one time. 
> If there are multiple threads using `out`, the access should need to be 
> coordinated in some way (for example, use lock, or use a queue to cache 
> record and let a single thread output them with `out`.
>2. With the current implementation, `Out` would not be 
> recreated. However, I think it is implementation related, if the processing 
> logic still happens in the `processElement` method, is it possible to always 
> use the `out` object passed into the method?
>
>
>Best,
>Yun
>
>
>
> --
> From:Salva Alcántara 
> Send Time:2020 Feb. 12 (Wed.) 13:33
> To:user 
> Subject:Using multithreaded library within ProcessFunction with callbacks 
> relying on the out parameter
>
> I am working on a `CoProcessFunction` that uses a third party library for
> detecting certain patterns of events based on some rules. So, in the end,
> the `ProcessElement1` method is basically forwarding the events to this
> library and registering a callback so that, when a match is detected, the
> CoProcessFunction can emit an output event. For achieving this, the callback
> relies on a reference to the `out: Collector[T]` parameter in
> `ProcessElement1`.
>
> Having said that, I am not sure whether this use case is well-supported by
> Flink, since:
>
> 1. There might be multiple threads spanned by the third party library (let's
> I have not any control over the amount of threads spanned, this is decided
> by the library)
> 2. I am not sure whether `out` might be recreated or something by Flink at
> some point, invalidating the references in the callbacks, making them crash
>
> So far I have not observed any issues, but I have just run my program in the
> small. It would be great to hear from the experts whether my approach is
> valid or not.
>
> PS: Also posted in
> https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: rocksdb max open file descriptor issue crashed application

2020-02-12 Thread Kostas Kloudas
Hi Apoorv,

I am not so familiar with the internal of RocksDB and how the number
of open files correlates with the number of (keyed) states and the
parallelism you have, but as a starting point you can have a look to
[1] for recommendations on how to tune RocksDb for large state and I
am also cc'ing Andrey who may have some more knowledge on the topic.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#incremental-checkpoints

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:55 AM Apoorv Upadhyay
 wrote:
>
> Hi,
>
> Below is the error I am getting :
>
> 2020-02-08 05:40:24,543 INFO  org.apache.flink.runtime.taskmanager.Task   
>  - order-steamBy-api-order-ip (3/6) 
> (34c7b05d5a75dbbcc5718acf6b18) switched from RUNNING to CANCELING.
> 2020-02-08 05:40:24,543 INFO  org.apache.flink.runtime.taskmanager.Task   
>  - Triggering cancellation of task code 
> order-steamBy-api-order-ip (3/6) (34c7b05d5a75dbbcc5718acf6b18).
> 2020-02-08 05:40:24,543 ERROR 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - 
> Caught unexpected exception.
> java.io.IOException: Error while opening RocksDB instance.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
> at 
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.rocksdb.RocksDBException: While open directory: 
> /hadoop/yarn/local/usercache/flink/appcache/application_1580464300238_0045/flink-io-d947dea6-270b-44c0-94ca-4a49dbf02f52/job_97167effbb11a8e9ffcba36be7e4da80_op_CoStreamFlatMap_51abbbda2947171827fd9e53509c2fb4__4_6__uuid_3f8c7b20-6d17-43ad-a016-8d08f7ed9d50/db:
>  Too many open files
> at org.rocksdb.RocksDB.open(Native Method)
> at org.rocksdb.RocksDB.open(RocksDB.java:286)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
> ... 17 more
> 2020-02-08 05:40:24,544 INFO  org.apache.flink.runtime.taskmanager.Task   
>  - order-status-mapping-join (4/6) 
> (4409b4e2d93f0441100f0f1575a1dcb9) switched from CANCELING to CANCELED.
> 2020-02-08 05:40:24,544 INFO  org.apache.flink.runtime.taskmanager.Task   
>  - Freeing task resources for order-status-mapping-join (4/6) 
> (4409b4e2d93f0441100f0f1575a1dcb9).
> 2020-02-08 05:40:24,543 ERROR 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - 
> Caught unexpected exception.
> java.io.IOException: Error while opening RocksDB instance.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
> at 
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
> at 
> 

Re: Aggregation for last n seconds for each event

2020-02-12 Thread Kostas Kloudas
Hi Oleg,

Could you be more specific on what do you mean by
"for events of last n seconds(time units in general) for every incoming event."?

Do you mean that you have a stream of parallelism 1 and you want for
each incoming element to have your function fire with input the event
itself and all the events that arrived within the last N time units?
If this is the case, you can use a dummy key to key your stream to
have access to keyed state, then use Map State with key being the
timestamp and value being a list of the already seen elements with
that timestamp and whenever an element arrives, you can register a
timer to fire N time units in the future. Then, when the timer fires,
you can iterate over the map, fetch the elements you are interested
in, and clean-up whatever you will not need anymore.

For an example you could look at [1].

I hope this helps,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu  wrote:
>
> can u do
> RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>
> On Tue, Feb 11, 2020 at 12:15 PM oleg  wrote:
>>
>> Hi Community,
>>
>> I do streaming in event time and I want to preserve ordering and late
>> events. I have a use case where I need to fire an aggregation function
>> for events of last n seconds(time units in general) for every incoming
>> event.
>>
>> It seems to me that windowing is not suitable since it may be expressed
>> either in time or in events count, not "last n seconds for each single
>> event".
>>
>> Is there an idiomatic way to do this? Any examples or help are
>> appreciated. Thanks in advance.
>>
>>
>> Best regards,
>>
>> Oleg Bonar
>>


Re: Flink complaining when trying to write to s3 in Parquet format

2020-02-12 Thread Kostas Kloudas
Hi Fatima,

I am not super familiar with Parquet but your issue seems to be
related to [1], which seems to be expected behaviour on the Parquet
side.
The reason for this behaviour seems to be the format of the parquet
files which store only the leaf fields but not the structure of the
groups, so if a group has no fields, its schema cannot be inferred.
Given this, I do not think that it is a bug but feel free to check
further and let us know if I am wrong.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/PARQUET-278

On Tue, Feb 11, 2020 at 11:20 PM Fatima Omer  wrote:
>
> I have a java app that is using a flink SQL query to perform aggregations on 
> a data stream being read in from Kafka. Attached is the java file for 
> reference.
>
> The query results are being written to s3. I can write successfully in Json 
> format but when I try to use Parquet format, flink complains that min_ts is 
> an optional group. I have verified that min_ts can never be null in our 
> scheme of things.
>
> Would appreciate help on this. Thanks!
>
> Stack trace:
>
> Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a 
> schema with an empty group: optional group min_ts {
>
> }
>
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
>
> at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)
>
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)
>
> at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
>
> at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)
>
> at 
> org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
>
> at 
> org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:233)
>
> at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)
>
> at 
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)
>
> at 
> com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)
>
> at 
> com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)
>
> at 
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
>
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)
>
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
>
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> ... 50 more
>
>
>


Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

2020-02-12 Thread Kostas Kloudas
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith  wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the 
> task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:650)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too 
> many open files
> at org.apache.kafka.common.network.Selector.(Selector.java:154)
> at org.apache.kafka.common.network.Selector.(Selector.java:188)
> at org.apache.kafka.common.network.Selector.(Selector.java:192)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> at 
> sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.(Selector.java:152)
> ... 14 more


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-05 Thread Kostas Kloudas
Hi Mark,

This feature of customizing the rolling policy even for bulk formats will
be in the upcoming 1.10 release as described in [1]
although the documentation for the feature is pending [2]. But I hope that
it will be merged on time for the release.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-13027
[2] https://issues.apache.org/jira/browse/FLINK-15476

On Mon, Feb 3, 2020 at 8:14 PM Kostas Kloudas  wrote:

> Hi Mark,
>
> Currently no, but if rolling on every checkpoint is ok with you, in future
> versions it is easy to allow to roll on every checkpoint, but also on
> inactivity intervals.
>
> Cheers,
> Kostas
>
> On Mon, Feb 3, 2020 at 5:24 PM Mark Harris 
> wrote:
>
>> Hi Kostas,
>>
>> Thanks for your help here - I think we're OK with the increased heap
>> size, but happy to explore other alternatives.
>>
>> I see the problem - we're currently using a BulkFormat, which doesn't
>> seem to let us override the rolling policy. Is there an equivalent for the
>> BulkFormat?
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Kostas Kloudas 
>> *Sent:* 03 February 2020 15:39
>> *To:* Mark Harris 
>> *Cc:* Piotr Nowojski ; Cliff Resnick <
>> cre...@gmail.com>; David Magalhães ; Till
>> Rohrmann ; flink-u...@apache.org <
>> flink-u...@apache.org>
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi Mark,
>>
>> You can use something like the following and change the intervals
>> accordingly:
>>
>> final StreamingFileSink sink = StreamingFileSink
>>   .forRowFormat(new Path(outputPath), new
>> SimpleStringEncoder<>("UTF-8"))
>>.withRollingPolicy(
>>DefaultRollingPolicy.builder()
>>   .
>> withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>>   .
>> withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>>   .
>> withMaxPartSize(1024 * 1024 * 1024)
>>   .
>> build()
>>       ) .build();
>>
>> Let me know if this solves the problem.
>>
>> Cheers,
>> Kostas
>>
>> On Mon, Feb 3, 2020 at 4:11 PM Mark Harris 
>> wrote:
>>
>> Hi Kostas,
>>
>> Sorry, stupid question: How do I set that for a StreamingFileSink?
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Kostas Kloudas 
>> *Sent:* 03 February 2020 14:58
>> *To:* Mark Harris 
>> *Cc:* Piotr Nowojski ; Cliff Resnick <
>> cre...@gmail.com>; David Magalhães ; Till
>> Rohrmann ; flink-u...@apache.org <
>> flink-u...@apache.org>
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi Mark,
>>
>> Have you tried to set your rolling policy to close inactive part files
>> after some time [1]?
>> If the part files in the buckets are inactive and there are no new part
>> files, then the state handle for those buckets will also be removed.
>>
>> Cheers,
>> Kostas
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html
>>
>>
>>
>> On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
>> wrote:
>>
>> Hi all,
>>
>> The out-of-memory heap dump had the answer - the job was failing with an
>> OutOfMemoryError because the activeBuckets members of 3 instances of
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
>> filling a significant enough part of the memory of the taskmanager that no
>> progress was being made. Increasing the memory available to the TM seems to
>> have fixed the problem.
>>
>> I think the DeleteOnExit problem will mean it needs to be restarted every
>> few weeks, but that's acceptable for now.
>>
>> Thanks again,
>>
>> Mark
>> --
>> *From:* Mark Harris 
>> *Sent:* 30 January 2020 14:36
>> *To:* Piotr Nowojski 
>> *Cc:* Cliff Resnick ; David Magalhães <
>> speeddra...@gmail.com>; Till Rohrmann ;
>> flink-u...@apache.org ; kkloudas <
>> kk

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Kostas Kloudas
Hi Mark,

You can use something like the following and change the intervals
accordingly:

final StreamingFileSink sink = StreamingFileSink
  .forRowFormat(new Path(outputPath), new
SimpleStringEncoder<>("UTF-8"))
   .withRollingPolicy(
   DefaultRollingPolicy.builder()
  .
withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
  .
withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
  .
withMaxPartSize(1024 * 1024 * 1024)
  .build
()
  ) .build();

Let me know if this solves the problem.

Cheers,
Kostas

On Mon, Feb 3, 2020 at 4:11 PM Mark Harris  wrote:

> Hi Kostas,
>
> Sorry, stupid question: How do I set that for a StreamingFileSink?
>
> Best regards,
>
> Mark
> ----------
> *From:* Kostas Kloudas 
> *Sent:* 03 February 2020 14:58
> *To:* Mark Harris 
> *Cc:* Piotr Nowojski ; Cliff Resnick <
> cre...@gmail.com>; David Magalhães ; Till Rohrmann
> ; flink-u...@apache.org 
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi Mark,
>
> Have you tried to set your rolling policy to close inactive part files
> after some time [1]?
> If the part files in the buckets are inactive and there are no new part
> files, then the state handle for those buckets will also be removed.
>
> Cheers,
> Kostas
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html
>
>
>
> On Mon, Feb 3, 2020 at 3:54 PM Mark Harris 
> wrote:
>
> Hi all,
>
> The out-of-memory heap dump had the answer - the job was failing with an
> OutOfMemoryError because the activeBuckets members of 3 instances of
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
> filling a significant enough part of the memory of the taskmanager that no
> progress was being made. Increasing the memory available to the TM seems to
> have fixed the problem.
>
> I think the DeleteOnExit problem will mean it needs to be restarted every
> few weeks, but that's acceptable for now.
>
> Thanks again,
>
> Mark
> --
> *From:* Mark Harris 
> *Sent:* 30 January 2020 14:36
> *To:* Piotr Nowojski 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> Thanks for your help with this. 
>
> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
>
> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
>
> Usually the task runs for about 15 minutes before it restarts, usually due
> to with an "java.lang.OutOfMemoryError: Java heap space" exception.
>
> The figures came from a MemoryAnalyzer session on a manual memory dump
> from one of the taskmanagers. The total size of that heap was only 1.8gb.
> In that heap, 1.7gb is taken up by the static field "files" in
> DeleteOnExitHook, which is a linked hash set containing the 9 million
> strings.
>
> A full example of one the path is
> /tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per
> char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and
> another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding
> on to an array of 16777216 HashMap$Node elements, the LinkedHashMap can
> only contribute another 20MB or so.
> I goofed in not taking that 85% figure from MemoryAnalyzer - it tells
> me DeleteOnExitHook is responsible for 96.98% of the heap dump.
>
> Looking at the files it managed to write before this started to happen
> regularly, it looks like they're being written approximately every 3
> minutes. I'll triple check our config, but I'm reasonably sure the job is
> configured to checkpoint every 15 minutes - could something else be causing
> it to write?
>
> This may all be a red herring - something else may be taking up the
> taskmanagers memory which didn't make it into that heap dump. I plan to
> repeat the analysis on a heapdump created
> by  -XX:+HeapDumpOnOutOfMemoryError shortly.
>
> Best regards,
>
> Mark
>
> --
> *From:* Piotr Nowojski 
> *Sent:* 30 January 2020 13:44
>

Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-02-03 Thread Kostas Kloudas
Hi Mark,

Have you tried to set your rolling policy to close inactive part files
after some time [1]?
If the part files in the buckets are inactive and there are no new part
files, then the state handle for those buckets will also be removed.

Cheers,
Kostas

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html



On Mon, Feb 3, 2020 at 3:54 PM Mark Harris  wrote:

> Hi all,
>
> The out-of-memory heap dump had the answer - the job was failing with an
> OutOfMemoryError because the activeBuckets members of 3 instances of
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets were
> filling a significant enough part of the memory of the taskmanager that no
> progress was being made. Increasing the memory available to the TM seems to
> have fixed the problem.
>
> I think the DeleteOnExit problem will mean it needs to be restarted every
> few weeks, but that's acceptable for now.
>
> Thanks again,
>
> Mark
> --
> *From:* Mark Harris 
> *Sent:* 30 January 2020 14:36
> *To:* Piotr Nowojski 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> Thanks for your help with this. 
>
> The EMR cluster has 3 15GB VMs, and the flink cluster is started with:
>
> /usr/lib/flink/bin/yarn-session.sh -d -n 3 -tm 5760 -jm 5760 -s 3
>
> Usually the task runs for about 15 minutes before it restarts, usually due
> to with an "java.lang.OutOfMemoryError: Java heap space" exception.
>
> The figures came from a MemoryAnalyzer session on a manual memory dump
> from one of the taskmanagers. The total size of that heap was only 1.8gb.
> In that heap, 1.7gb is taken up by the static field "files" in
> DeleteOnExitHook, which is a linked hash set containing the 9 million
> strings.
>
> A full example of one the path is
> /tmp/hadoop-yarn/s3a/s3ablock-0001-6061210725685.tmp, at for 120 bytes per
> char[] for a solid 1.2gb of chars. Then 200mb for their String wrappers and
> another 361MB for LinkedHashMap$Entry objects. Despite valiantly holding
> on to an array of 16777216 HashMap$Node elements, the LinkedHashMap can
> only contribute another 20MB or so.
> I goofed in not taking that 85% figure from MemoryAnalyzer - it tells
> me DeleteOnExitHook is responsible for 96.98% of the heap dump.
>
> Looking at the files it managed to write before this started to happen
> regularly, it looks like they're being written approximately every 3
> minutes. I'll triple check our config, but I'm reasonably sure the job is
> configured to checkpoint every 15 minutes - could something else be causing
> it to write?
>
> This may all be a red herring - something else may be taking up the
> taskmanagers memory which didn't make it into that heap dump. I plan to
> repeat the analysis on a heapdump created
> by  -XX:+HeapDumpOnOutOfMemoryError shortly.
>
> Best regards,
>
> Mark
>
> --
> *From:* Piotr Nowojski 
> *Sent:* 30 January 2020 13:44
> *To:* Mark Harris 
> *Cc:* Cliff Resnick ; David Magalhães <
> speeddra...@gmail.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> What is your job setup? Size of the nodes, memory settings of the
> Flink/JVM?
>
> 9 041 060 strings is awfully small number to bring down a whole cluster.
> With each tmp string having ~30 bytes, that’s only 271MB. Is this really
> 85% of the heap? And also, with parallelism of 6 and checkpoints every 15
> minutes, 9 000 000 of leaked strings should happen only after one month
>  assuming 500-600 total number of buckets. (Also assuming that there is a
> separate file per each bucket).
>
> Piotrek
>
> On 30 Jan 2020, at 14:21, Mark Harris  wrote:
>
> Trying a few different approaches to the fs.s3a.fast.upload settings has
> bought me no joy - the taskmanagers end up simply crashing or complaining
> of high GC load. Heap dumps suggest that this time they're clogged with
> buffers instead, which makes sense.
>
> Our job has parallelism of 6 and checkpoints every 15 minutes - if
> anything, we'd like to increase the frequency of that checkpoint duration.
> I suspect this could be affected by the partition structure we were
> bucketing to as well, and at any given moment we could be receiving data
> for up to 280 buckets at once.
> Could this be a factor?
>
> Best regards,
>
> Mark
> --
> *From:* Piotr Nowojski 
> *Sent:* 27 January 2020 16:16
> *To:* Cliff Resnick 
> *Cc:* David Magalhães ; Mark Harris <
> mark.har...@hivehome.com>; Till Rohrmann ;
> flink-u...@apache.org ; kkloudas <
> kklou...@apache.org>
> *Subject:* Re: GC overhead limit exceeded, memory 

Re: FileStreamingSink is using the same counter for different files

2020-01-28 Thread Kostas Kloudas
Hi Pawel,

You are correct that the write method invocation is guaranteed to be
thread safe for the same sub operator instance.
But I am not sure if having a unique counter per subtask across
buckets would add much to the user experience of the sink.
I think that in both cases, the interpretation of the part files would
be the same.

I may be wrong though so please let me know if this is a deal breaker for you.

Cheers,
Kostas


On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek
 wrote:
>
> Hi Kostas,
>
> Thanks for confirming that. I started thinking it might be useful or more 
> user friendly to use unique counter across buckets for the same operator 
> subtask?
> The way I could imagine this working is to pass max counter to the 
> https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204
>  write method? or bucket holding instance of Buckets class and accessing 
> global counter from there? As far as I know the write method invocation is 
> guaranteed to be thread safe for the same sub operator instance.
>
> Thanks,
> Pawel
>
>
> On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas  wrote:
>>
>> Hi Pawel,
>>
>> You are correct that counters are unique within the same bucket but
>> NOT across buckets. Across buckets, you may see the same counter being
>> used.
>> The max counter is used only upon restoring from a failure, resuming
>> from a savepoint or rescaling and this is done to guarantee that n
>> valid data are overwritten while limiting the state that Flink has to
>> keep internally. For a more detailed discussion about the why, you can
>> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
>>  wrote:
>> >
>> > I have looked into the source code and it looks likes that the same 
>> > counter counter value being used in two buckets is correct.
>> > Each Bucket class 
>> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>> >  is passed partCounter in the constructor. Whenever part file is rolled 
>> > over then counter is incremented within the scope of this bucket. It can 
>> > happen that there are two or more active buckets and counter is increased 
>> > independently inside them so that they are become equal. However, global 
>> > max counter maintained by Buckets class always keeps the max part counter 
>> > so that when new bucket is created is passed the correct part counter.
>> >
>> > I have done my analysis based on the logs from my job. I highlighted the 
>> > same counter value used for part-0-8.
>> >
>> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 received completion notification for checkpoint with id=7.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
>> > checkpoint.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
>> > Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
>> > bucketPath=s3://xxx
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
>> > element
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 
>> > 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
>> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - 
&g

Re: FileStreamingSink is using the same counter for different files

2020-01-24 Thread Kostas Kloudas
Hi Pawel,

You are correct that counters are unique within the same bucket but
NOT across buckets. Across buckets, you may see the same counter being
used.
The max counter is used only upon restoring from a failure, resuming
from a savepoint or rescaling and this is done to guarantee that n
valid data are overwritten while limiting the state that Flink has to
keep internally. For a more detailed discussion about the why, you can
have a look here: https://issues.apache.org/jira/browse/FLINK-13609

Cheers,
Kostas

On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
 wrote:
>
> I have looked into the source code and it looks likes that the same counter 
> counter value being used in two buckets is correct.
> Each Bucket class 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
>  is passed partCounter in the constructor. Whenever part file is rolled over 
> then counter is incremented within the scope of this bucket. It can happen 
> that there are two or more active buckets and counter is increased 
> independently inside them so that they are become equal. However, global max 
> counter maintained by Buckets class always keeps the max part counter so that 
> when new bucket is created is passed the correct part counter.
>
> I have done my analysis based on the logs from my job. I highlighted the same 
> counter value used for part-0-8.
>
> 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=7.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=8 (max part counter=7).
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> received completion notification for checkpoint with id=8.
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to 
> element
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing for checkpoint with id=9 (max part counter=9).
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_55_00Z on 
> checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 
> checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and 
> bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to 
> element
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 
> opening 

Re: Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-15 Thread Kostas Kloudas
Oops, sorry for not sending the reply to everyone
and thanks David for reposting it here.
Great to hear that you solved your issue!

Kostas



On Wed, Jan 15, 2020 at 1:57 PM David Magalhães  wrote:
>
> Sorry, I've only saw the replies today.
>
> Regarding my previous email,
>
>> Still, there is something missing in this solution to close a window for 
>> with a giving timeout, so it can write into the sink the last events if no 
>> more events are sent.
>
>
> I've fixed this using a custom trigger,
>
> val flag = ctx.getPartitionedState(valueStateDescriptor).value()
>
> // Flag only used to register one trigger per window. Flag is cleaned when 
> FIRE action is executed.
> if (!flag) {
>   val delay = window.getEnd - window.getStart
>   ctx.getPartitionedState(valueStateDescriptor).update(true)
>   ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime + delay)
>   ctx.registerEventTimeTimer(window.maxTimestamp())
> }
>
> TriggerResult.CONTINUE
>
> Leonard, by "duplicated events" I mean store the same event on different 
> parquet files, since the file format was "part-X-Y". So, if I start to 
> process the same stream again (from a point in the past) I couldn't overwrite 
> the files with exactly the same name.
>
> I think I've read a blogpost about them (Pinterest), I will check the video.
>
> Kostas, replied to only me, I'm adding his response here.
>
>> Hi David,
>> I skimmed through the solution with the window before the sink.
>> If this solution fits your needs, I think you could:
>> 1)  just specify a BucketAssigner instead of writing a custom sink,
>> this will allow you to not lose any functionality from the
>> StreamingFileSink
>> 2)  for the timeout requirement, you could use a (keyed) process
>> function with map state to hold your event-time windows. The key will
>> be the window start (or interval) and you can register timers to fire
>> at the end of the window or after a certain period of inactivity. I
>> think that [1] can be a good starting point.
>> I hope this helps,
>> Kostas
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
>
> I think I can only define partition name on BucketAssigner, because I don't 
> want to have many partition (currently I've accountId and MM (year and 
> month)). I've checked that on Flink 1.10 [1] we will have access to configure 
> a prefix and suffix for the filename, where I could add the day and hour to 
> the prefix, and when I needed to store again the same events I could start 
> from specific time (probably match with a Kafka offset) and remove the files 
> with prefix date newer than this time.
>
> The only scenario for this case is when for some reason Flink is writing bad 
> files (events with wrong information for some reason), that need to be stored 
> (processed) again.
>
> For 2), my implementation with the trigger solved this.
>
> [1] 
> https://github.com/apache/flink/blob/master/docs/dev/connectors/streamfile_sink.md
>
> On Tue, Jan 14, 2020 at 6:28 PM Till Rohrmann  wrote:
>>
>> Hi David,
>>
>> I'm pulling in Kostas who worked on the StreamingFileSink and might be able 
>> to answer some of your questions.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu  wrote:
>>>
>>> Hi, David
>>>
>>> For you first description, I’m a little confused about duplicated records 
>>> when backfilling, could you describe your usage scenario/code more?
>>>
>>> I remembered a backfill user solution from Pinterest which is very similar 
>>> to yours and using Flink too[1], hope that can help you.
>>>
>>> Best,
>>> Leonard
>>>
>>> [1] 
>>> https://www.youtube.com/watch?v=3-X6FJ5JS4E=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz=64
>>>
>>> 在 2020年1月10日,12:14,David Magalhães  写道:
>>>
>>> Hi, I'm working for the first time with Flink and I'm trying to create 
>>> solution that will store events from Kafka into Parquet files in S3. This 
>>> also should support re-injection of events from Parquet files into a Kafka 
>>> topic.
>>>
>>> Here is the code with a simple usage of StreamingFileSink with BulkEncode 
>>> that will get the events and store in parquet files. The files will be 
>>> partition by account_id and year and month (MM). The issue with this 
>>> approach is when running the backfill from a certain point in time, it will 
>>> be hard to not generate duplicated events, since we will not override the 
>>> same files, as the filename is generate by 
>>> "part--".
>>>
>>> To add predictability, I've used a tumbling window to aggregate multiple 
>>> GenericRecord, in order to write the parquet file with a list of them. For 
>>> that I've created a custom file sink, but I'm not sure of the properties I 
>>> am going to lose compared to the Streaming File Sink. Here is the code. 
>>> Still, there is something missing in this solution to close a window for 
>>> with a giving timeout, so it can write into the sink the last events if no 
>>> more events are sent.
>>>
>>> Another 

Re: StreamingFileSink doesn't close multipart uploads to s3?

2020-01-15 Thread Kostas Kloudas
Hi Ken, Jingsong and Li,

Sorry for the late reply.

As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and termination
due to failure.
Given this, we cannot commit the files, because in case of failure
they should be reverted.

Actually we are currently updating the StreamingFileSink docs to
includes this among other things.
Also the differentiation between normal termination and termination
due to failure will hopefully be part of Flink 1.11 and
this is the FLIP to check
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs.

Cheers,
Kostas

On Fri, Jan 10, 2020 at 4:45 PM Ken Krugler  wrote:
>
> Hi Kostas,
>
> I didn’t see a follow-up to this, and have also run into this same issue of 
> winding up with a bunch of .inprogress files when a bounded input stream ends 
> and the job terminates.
>
> When StreamingFileSystem.close() is called, shouldn’t all buckets get 
> auto-rolled, so that the .inprogress files become part-xxx files?
>
> Thanks,
>
> — Ken
>
>
> On Dec 9, 2019, at 6:56 PM, Jingsong Li  wrote:
>
> Hi Kostas,
>
> I  took a look to StreamingFileSink.close, it just delete all temporary 
> files. I know it is for failover. When Job fail, it should just delete temp 
> files for next restart.
> But for testing purposes, we just want to run a bounded streaming job. If 
> there is no checkpoint trigger, no one will move the final temp files to 
> output path, so the result of this job is wrong.
> Do you have any idea about this? Can we distinguish "fail close" from 
> "success finish close" in StreamingFileSink?
>
> Best,
> Jingsong Lee
>
> On Mon, Dec 9, 2019 at 10:32 PM Kostas Kloudas  wrote:
>>
>> Hi Li,
>>
>> This is the expected behavior. All the "exactly-once" sinks in Flink
>> require checkpointing to be enabled.
>> We will update the documentation to be clearer in the upcoming release.
>>
>> Thanks a lot,
>> Kostas
>>
>> On Sat, Dec 7, 2019 at 3:47 AM Li Peng  wrote:
>> >
>> > Ok I seem to have solved the issue by enabling checkpointing. Based on the 
>> > docs (I'm using 1.9.0), it seemed like only 
>> > StreamingFileSink.forBulkFormat() should've required checkpointing, but 
>> > based on this experience, StreamingFileSink.forRowFormat() requires it 
>> > too! Is this the intended behavior? If so, the docs should probably be 
>> > updated.
>> >
>> > Thanks,
>> > Li
>> >
>> > On Fri, Dec 6, 2019 at 2:01 PM Li Peng  wrote:
>> >>
>> >> Hey folks, I'm trying to get StreamingFileSink to write to s3 every 
>> >> minute, with flink-s3-fs-hadoop, and based on the default rolling policy, 
>> >> which is configured to "roll" every 60 seconds, I thought that would be 
>> >> automatic (I interpreted rolling to mean actually close a multipart 
>> >> upload to s3).
>> >>
>> >> But I'm not actually seeing files written to s3 at all, instead I see a 
>> >> bunch of open multipart uploads when I check the AWS s3 console, for 
>> >> example:
>> >>
>> >>  "Uploads": [
>> >> {
>> >> "Initiated": "2019-12-06T20:57:47.000Z",
>> >> "Key": "2019-12-06--20/part-0-0"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T20:57:47.000Z",
>> >> "Key": "2019-12-06--20/part-1-0"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:03:12.000Z",
>> >> "Key": "2019-12-06--21/part-0-1"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:04:15.000Z",
>> >> "Key": "2019-12-06--21/part-0-2"
>> >> },
>> >> {
>> >> "Initiated": "2019-12-06T21:22:23.000Z"
>> >> "Key": "2019-12-06--21/part-0-3"
>> >> }
>> >> ]
>> >>
>> >> And these uploads are being open for a long time. So far after an hour, 
>> >> none of the uploads have been closed. Is this the expected behavior? If I 
>> >> wanted to get these uploads to actually write to s3 quickly, do I need to 
>> >> configure the hadoop stuff to get that done, like setting a smaller 
>> >> buffer/partition size to force it to upload?
>> >>
>> >> Thanks,
>> >> Li
>
>
>
> --
> Best, Jingsong Lee
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


Re: Deprecated SplitStream class - what should be use instead.

2019-12-20 Thread Kostas Kloudas
Hi Krzysztof,

If I get it correctly, your main reason behind not using side-outputs
is that it seems that "side-output", by the name, seems to be a
"second class citizen"  compared to the main output.
I see your point but in terms of functionality, there is no difference
between the different outputs from Flink's perspective. Both create
DataStreams that are full integrated with Flink's fault-tolerant state
handling (if checkpointing is enabled) and event-time handling. So I
believe it is safe to use them for your usecase.

I hope this helps,
Kostas

On Thu, Dec 19, 2019 at 10:30 PM KristoffSC
 wrote:
>
> Kostas, thank you for your response,
>
> Well although the Side Outputs would do the job, I was just surprised that
> those are the replacements for stream splitting.
>
> The thing is, and this is might be only a subjective opinion, it that I
> would assume that Side Outputs should be used only to produce something
> aside of the main processing function like control messages or some
> leftovers.
>
> In my case, I wanted to simply split the stream into two new streams based
> on some condition.
> With side outputs I will have to "treat" the second stream as a something
> additional to the main processing result.
>
> Like it is written in the docs:
> "*In addition* to the main stream that results from DataStream
> operations(...)"
>
> or
> "The type of data in the result streams does not have to match the type of
> data in the *main *stream and the types of the different side outputs can
> also differ. "
>
>
> I'm my case I don't have any "addition" to my main stream and actually both
> spitted streams are equally important :)
>
> So by writing that side outputs are not good for my use case I meant that
> they are not fitting conceptually, at least in my opinion.
>
> Regards,
> Krzysztof
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread Kostas Kloudas
Hi Kristoff,

The recommended alternative is to use SideOutputs as described in [1].
Could you elaborate why you think side outputs are not a good choice
for your usecase?

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

On Thu, Dec 19, 2019 at 5:13 PM KristoffSC
 wrote:
>
> Hi,
> I've noticed that SplitStream class is marked as deprecated, although split
> method of DataStream is not.
> Also there is no alternative proposed in SplitStream doc for it.
>
> In my use case I will have a stream of events that I have to split into two
> separate streams based on some function. Events with field that meets some
> condition should go to the first stream, where all other should go to the
> different stream.
>
> Later both streams should be processed in a different manner.
>
> I was planing to use approach presented here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/
>
> SplitStream split = someDataStream.split(new
> OutputSelector() {
> @Override
> public Iterable select(Integer value) {
> List output = new ArrayList();
> if (value % 2 == 0) {
> output.add("even");
> }
> else {
> output.add("odd");
> }
> return output;
> }
> });
>
> But it turns out that SplitStream is deprecated.
> Also I've found similar question on SO
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> I don't fink filter and SideOutputs are good choice here.
>
> I will be thankful for an any suggestion.
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3A "Data read has a different length than the expected" issue root cause

2019-12-17 Thread Kostas Kloudas
Thanks a lot for reporting this!

I believe that this can be really useful for the community!

Cheers,
Kostas

On Tue, Dec 17, 2019 at 1:29 PM spoganshev  wrote:
>
> In case you experience an exception similar to the following:
>
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Data
> read has a different length than the expected: dataLength=53562;
> expectedLength=65536; includeSkipped=true; in.getClass()=class
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client$2;
> markedSupported=false; marked=0; resetSinceLastMarked=false; markCount=0;
> resetCount=0
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:151)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:93)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:76)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.closeStream(S3AInputStream.java:529)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:490)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopDataInputStream.close(HadoopDataInputStream.java:89)
> at
> org.apache.flink.api.common.io.FileInputFormat.close(FileInputFormat.java:861)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:206)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
>
>
> The root cause is a bug in Hadoop's S3A filesystem implementation:
> https://issues.apache.org/jira/browse/HADOOP-16767
>
> A temporary hacky workaround is to replace S3AInputStream class and all the
> classes that it requires and use it in a custom filesystem implementation.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Documentation tasks for release-1.10

2019-12-16 Thread Kostas Kloudas
Hi all,

With the feature-freeze for the release-1.10 already past us, it is
time to focus a little bit on documenting the new features that the
community added to this release, and improving the already existing
documentation based on questions that we see in Flink's mailing lists.

To this end, I have create an umbrella issue
https://issues.apache.org/jira/browse/FLINK-15273 to monitor the
pending documentation tasks. This is by no means an exhaustive list of
the tasks, so feel free to add more.

Having a central place with all these tasks will also allow more
easily other members of the community, not necessarily people who
implemented the features, to get involved with the project. This is a
really helpful way to get in touch with the community and help the
project reach even greater audiences, as a feature that is not
documented is non-existent to users.

Thanks a lot,
Kostas


Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-12-12 Thread Kostas Kloudas
Hi Harrison,

Really sorry for the late reply.
Do you have any insight on whether the missing records were read by
the consumer and just the StreamingFileSink failed to write their
offsets, or the Kafka consumer did not even read them or dropped them
for some reason? I asking this in order to narrow down the problem. In
addition, did you see anything out of the ordinary in the logs?

I am also cc'ing Becket who may know a bit more on the kafka consumer
side of things.

Cheers,
Kostas

On Mon, Dec 2, 2019 at 10:00 PM Harrison Xu  wrote:
>
> Thank you for your reply,
>
> Some clarification:
>
> We have configured the BucketAssigner to use the Kafka record timestamp. 
> Exact bucketing behavior as follows:
> private static final DateTimeFormatter formatter = DateTimeFormatter
> .ofPattern("-MM-dd'T'HH");
>
> @Override
> public String getBucketId(KafkaRecord record, BucketAssigner.Context context) 
> {
> return String.format(
> "%s/dt=%s/partition_%s",
> record.getTopic(),
> Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format(formatter),
> record.getPartition());
> }
>
> For each record, we write only its offset to the S3 object as a sanity check. 
> It is easy to detect missing or duplicate offsets. To answer your questions:
>
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
> No, because even if the producer were idle during these datetimes, we would 
> expect no missing offsets. We observed both millions of missing records, in 
> addition to missing partitions (2019-11-24T01 and 2019-11-24T02). Further, 
> the producer was very active during this time.
> I want to emphasize that we noticed that the consumer for this exact 
> TopicPartition was falling behind (>1 hour lag); this degree of lag was only 
> observed for this partition. (The consumer eventually caught up). It's normal 
> for the consumer to fall behind the producer for short bursts, but we 
> definitely do not expect missing records as a result. There were millions of 
> records whose timestamps fall into (dt 2019-11-24T01 and 2019-11-24T02) - 
> they were entirely skipped by the writer.
>
>
> what does TT stand for?
> It's simply convention for datetime serialization as string.
>
>
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration?
> We are using the BulkWriter. I am under the impression that this writer 
> should only produce one file per checkpoint interval, which we have 
> configured to be 5 minutes. You see that the preceding commits follow this 
> pattern of one commit per checkpoint interval, which is what we expect. It's 
> very strange that two files for the same TopicPartition (same TaskManager) 
> are committed.
>
>
> I am eager to hear your reply and understand what we're seeing.
>
> Thanks,
> Harrison
>
> On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas  wrote:
>>
>> Hi Harrison,
>>
>> One thing to keep in mind is that Flink will only write files if there
>> is data to write. If, for example, your partition is not active for a
>> period of time, then no files will be written.
>> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
>> are entirely skipped?
>>
>> In addition, for the "duplicates", it would help if you could share a
>> bit more information about your BucketAssigner.
>> How are these names assigned to the files and what does TT stand for?
>> Can it be that there are a lot of events for partition 4 that fill up
>> 2 part files for that duration? I am
>> asking because the counter of the 2 part files differ.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu  wrote:
>> >
>> > Hello,
>> >
>> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 
>> > 0.10.1.1) arbitrarily skipping data.
>> >
>> > Context
>> > KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter 
>> > (S3) as sink with no intermediate operators. Recently, we noticed that 
>> > millions of Kafka records were missing for one topic partition (this job 
>> > is running for 100+ topic partitions, and such behavior was only observed 
>> > for one). This job is run on YARN, and hosts were healthy with no hardware 
>> > faults observed. No exceptions in jobmanager or taskmanager logs at this 
>> > time.
>> >
>> > How was this detected?
>> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate 
>> > location in S3, and have monitoring to ensure 

Re: Flink client trying to submit jobs to old session cluster (which was killed)

2019-12-12 Thread Kostas Kloudas
Hi Pankaj,

When you start a session cluster with the bin/yarn-session.sh script,
Flink will create the cluster and then write a "Yarn Properties file"
named ".yarn-properties-YOUR_USER_NAME" in the directory:
either the one specified by the option "yarn.properties-file.location"
in the flink-conf.yaml or in your local
System.getProperty("java.io.tmpdir"). This file will contain the
applicationId of the cluster and
it will be picked up by any future calls to `flink run`. Could you
check if this file exists and if it is updated every time you create a
cluster?

Thanks,
Kostas

On Thu, Dec 12, 2019 at 2:22 PM vino yang  wrote:
>
> Hi Pankaj,
>
> Can you tell us what's Flink version do you use?  And can you share the Flink 
> client and job manager log with us?
>
> This information would help us to locate your problem.
>
> Best,
> Vino
>
> Pankaj Chand  于2019年12月12日周四 下午7:08写道:
>>
>> Hello,
>>
>> When using Flink on YARN in session mode, each Flink job client would 
>> automatically know the YARN cluster to connect to. It says this somewhere in 
>> the documentation.
>>
>> So, I killed the Flink session cluster by simply killing the YARN 
>> application using the "yarn kill" command. However, when starting a new 
>> Flink session cluster and trying to submit Flink jobs to yarn-session, Flink 
>> complains that the old cluster (it gave the port number and YARN application 
>> ID) is not available.
>>
>> It seems like the details of the old cluster were still stored somewhere in 
>> Flink. So, I had to completely replace the Flink folder with a new one.
>>
>> Does anyone know the proper way to kill a Flink+YARN session cluster to 
>> completely remove it so that jobs will get submitted to a new Flink session 
>> cluster?
>>
>> Thanks,
>>
>> Pankaj


Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-11-28 Thread Kostas Kloudas
Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu  wrote:
>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 
> 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter 
> (S3) as sink with no intermediate operators. Recently, we noticed that 
> millions of Kafka records were missing for one topic partition (this job is 
> running for 100+ topic partitions, and such behavior was only observed for 
> one). This job is run on YARN, and hosts were healthy with no hardware faults 
> observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate 
> location in S3, and have monitoring to ensure that written offsets are 
> contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with 
> MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with 
> MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with 
> MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with 
> MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with 
> MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with 
> MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with 
> MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with 
> MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with 
> MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in 
> millions of missing offsets. They are never written in future commits (and 
> data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), 
> happened nearly simultaneously on 2019-11-24 03:03, despite our commit 
> interval being set at 5 minutes. Why was the same TopicPartition read from 
> and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy 
> to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>


Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread Kostas Kloudas
As a side note, I am assuming that you are using the same Flink Job
before and after the savepoint and the same Flink version.
Am I correct?

Cheers,
Kostas

On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
>
> Hi Singh,
>
> This behaviour is strange.
> One thing I can recommend to see if the two jobs are identical is to
> launch also the second job without a savepoint,
> just start from scratch, and simply look at the web interface to see
> if everything is there.
>
> Also could you please provide some code from your job, just to see if
> there is anything problematic with the application code?
> Normally there should be no problem with not providing UIDs for some
> stateless operators.
>
> Cheers,
> Kostas
>
> On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> >
> >
> > Hey Folks:
> >
> > Please let me know how to resolve this issue since using 
> > --allowNonRestoredState without knowing if any state will be lost seems 
> > risky.
> >
> > Thanks
> > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh 
> >  wrote:
> >
> >
> > Hi:
> >
> > I have a flink application in which some of the operators have uid and name 
> > and some stateless ones don't.
> >
> > I've taken a save point and tried to start another instance of the 
> > application from a savepoint - I get the following exception which 
> > indicates that the operator is not available to the new program even though 
> > the second job is the same as first but just running from the first jobs 
> > savepoint.
> >
> > Caused by: java.lang.IllegalStateException: Failed to rollback to 
> > checkpoint/savepoint 
> > s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> >  Cannot map checkpoint/savepoint state for operator 
> > d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator 
> > is not available in the new program. If you want to allow to skip this, you 
> > can set the --allowNonRestoredState option on the CLI.
> >
> > at 
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> >
> > at 
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> >
> > at 
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> >
> > ... 10 more
> >
> >
> >
> > I've tried to start an application instance from the checkpoint too of the 
> > first instance but it gives the same exception indicating that the operator 
> > is not available.
> >
> > Questions:
> >
> > 1. If this a problem because some of the operators don't have uid ?
> > 2. Is it required to have uids even for stateless operators like simple map 
> > or filter operators ?
> > 3. Is there a way to find out which operator is not available in the new 
> > application even though I am running the same application ?
> > 4. Is there a way to figure out if this is the only missing operator or are 
> > there others whose mapping is missing for the second instance run ?
> > 5. Is this issue resolved in Apache Flink 1.9 (since I am still using Flink 
> > 1.6)
> >
> > If there any additional pointers please let me know.
> >
> > Thanks
> >
> > Mans
> >
> >


Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-25 Thread Kostas Kloudas
Hi Singh,

This behaviour is strange.
One thing I can recommend to see if the two jobs are identical is to
launch also the second job without a savepoint,
just start from scratch, and simply look at the web interface to see
if everything is there.

Also could you please provide some code from your job, just to see if
there is anything problematic with the application code?
Normally there should be no problem with not providing UIDs for some
stateless operators.

Cheers,
Kostas

On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
>
>
> Hey Folks:
>
> Please let me know how to resolve this issue since using 
> --allowNonRestoredState without knowing if any state will be lost seems risky.
>
> Thanks
> On Friday, November 22, 2019, 02:55:09 PM EST, M Singh  
> wrote:
>
>
> Hi:
>
> I have a flink application in which some of the operators have uid and name 
> and some stateless ones don't.
>
> I've taken a save point and tried to start another instance of the 
> application from a savepoint - I get the following exception which indicates 
> that the operator is not available to the new program even though the second 
> job is the same as first but just running from the first jobs savepoint.
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
>  Cannot map checkpoint/savepoint state for operator 
> d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator is 
> not available in the new program. If you want to allow to skip this, you can 
> set the --allowNonRestoredState option on the CLI.
>
> at 
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
>
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
>
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
>
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
>
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>
> ... 10 more
>
>
>
> I've tried to start an application instance from the checkpoint too of the 
> first instance but it gives the same exception indicating that the operator 
> is not available.
>
> Questions:
>
> 1. If this a problem because some of the operators don't have uid ?
> 2. Is it required to have uids even for stateless operators like simple map 
> or filter operators ?
> 3. Is there a way to find out which operator is not available in the new 
> application even though I am running the same application ?
> 4. Is there a way to figure out if this is the only missing operator or are 
> there others whose mapping is missing for the second instance run ?
> 5. Is this issue resolved in Apache Flink 1.9 (since I am still using Flink 
> 1.6)
>
> If there any additional pointers please let me know.
>
> Thanks
>
> Mans
>
>


Re: Streaming File Sink - Parquet File Writer

2019-10-30 Thread Kostas Kloudas
Hi Vinay,

You are correct when saying that the bulk formats only support
onCheckpointRollingPolicy.

The reason for this has to do with the fact that currently Flink
relies on the Hadoop writer for Parquet.

Bulk formats keep important details about how they write the actual
data (such as compression
schemes, offsets, etc) in metadata and they write this metadata with
the file (e.g. parquet writes
them as a footer). The hadoop writer gives no access to these
metadata. Given this, there is
no way for flink to be able to checkpoint a part file securely without
closing it.

The solution would be to write our own writer and not go through the
hadoop one, but there
are no concrete plans for this, as far as I know.

Cheers,
Kostas


On Tue, Oct 29, 2019 at 12:57 PM Vinay Patil  wrote:
>
> Hi,
>
> I am not able to roll the files based on file size as the bulkFormat has 
> onCheckpointRollingPolicy.
>
> One way is to write CustomStreamingFileSink and provide RollingPolicy like 
> RowFormatBuilder. Is this the correct way to go ahead ?
>
> Another way is to write ParquetEncoder and use RowFormatBuilder.
>
> P.S. Curious to know Why was the RollingPolicy not exposed in case of 
> BulkFormat ?
>
> Regards,
> Vinay Patil


Re: Verifying correctness of StreamingFileSink (Kafka -> S3)

2019-10-16 Thread Kostas Kloudas
Hi Amran,

If you want to know from which partition your input data come from,
you can always have a separate bucket for each partition.
As described in [1], you can extract the offset/partition/topic
information for an incoming record and based on this, decide the
appropriate bucket to put the record.

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

On Wed, Oct 16, 2019 at 4:00 AM amran dean  wrote:
>
> I am evaluating StreamingFileSink (Kafka 0.10.11) as a production-ready 
> alternative to a current Kafka -> S3 solution.
>
> Is there any way to verify the integrity of data written in S3? I'm confused 
> how the file names (e.g part-1-17) map to Kafka partitions, and further 
> unsure how to ensure that no Kafka records are lost (I know Flink guarantees 
> exactly-once, but this is more of a sanity check).
>
>
>
>


Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-14 Thread Kostas Kloudas
Hi all,

Big +1 for contributing Stateful Functions to Flink and as for the
main question at hand, I would vote for putting it in the main
repository.

I understand that this can couple the release cadence of Flink and
Stateful Functions although I think the pros of having a "you break
it,
you fix it" policy outperform the cons of tying the release cadences.

Looking forward to the integration and the new usecases it may bring!

Cheers,
Kostas

On Mon, Oct 14, 2019 at 9:35 AM Dian Fu  wrote:
>
> Hi Stephan,
>
> Big +1 for adding stateful functions to Apache Flink! The use cases unlocked 
> with this feature are very interesting and promising.
>
> Regarding to whether to place it into Flink core repository, personally I 
> perfer to put it in the main repository. This feature introduces a new set of 
> APIs and it will support a new set of applications. It enriches the API stack 
> of Apache Flink. This is somewhat simlar to the Table API & SQL, State 
> Processor API, CEP library, etc. If the applications supported by this 
> feature are important enough for Flink, it's more appropriate to put it 
> directly into the main repository.
>
> Regards,
> Dian
>
> > 在 2019年10月13日,上午10:47,Hequn Cheng  写道:
> >
> > Hi Stephan,
> >
> > Big +1 for adding this to Apache Flink!
> >
> > As for the problem of whether this should be added to the Flink main 
> > repository, from my side, I prefer to put it in the main repository. Not 
> > only Stateful Functions shares very close relations with the current Flink, 
> > but also other libs or modules in Flink can make use of it the other way 
> > round in the future. At that time the Flink API stack would also be changed 
> > a bit and this would be cool.
> >
> > Best, Hequn
> >
> > On Sat, Oct 12, 2019 at 9:16 PM Biao Liu  > > wrote:
> > Hi Stehpan,
> >
> > +1 for having Stateful Functions in Flink.
> >
> > Before discussing which repository it should belong, I was wondering if we 
> > have reached an agreement of "splitting flink repository" as Piotr 
> > mentioned or not. It seems that it's just no more further discussion.
> > It's OK for me to add it to core repository. After all almost everything is 
> > in core repository now. But if we decide to split the core repository 
> > someday, I tend to create a separate repository for Stateful Functions. It 
> > might be good time to take the first step of splitting.
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Sat, 12 Oct 2019 at 19:31, Yu Li  > > wrote:
> > Hi Stephan,
> >
> > Big +1 for adding stateful functions to Flink. I believe a lot of user 
> > would be interested to try this out and I could imagine how this could 
> > contribute to reduce the TCO for business requiring both streaming 
> > processing and stateful functions.
> >
> > And my 2 cents is to put it into flink core repository since I could see a 
> > tight connection between this library and flink state.
> >
> > Best Regards,
> > Yu
> >
> >
> > On Sat, 12 Oct 2019 at 17:31, jincheng sun  > > wrote:
> > Hi Stephan,
> >
> > bit +1 for adding this great features to Apache Flink.
> >
> > Regarding where we should place it, put it into Flink core repository or 
> > create a separate repository? I prefer put it into main repository and 
> > looking forward the more detail discussion for this decision.
> >
> > Best,
> > Jincheng
> >
> >
> > Jingsong Li mailto:jingsongl...@gmail.com>> 
> > 于2019年10月12日周六 上午11:32写道:
> > Hi Stephan,
> >
> > big +1 for this contribution. It provides another user interface that is 
> > easy to use and popular at this time. these functions, It's hard for users 
> > to write in SQL/TableApi, while using DataStream is too complex. (We've 
> > done some stateFun kind jobs using DataStream before). With statefun, it is 
> > very easy.
> >
> > I think it's also a good opportunity to exercise Flink's core capabilities. 
> > I looked at stateful-functions-flink briefly, it is very interesting. I 
> > think there are many other things Flink can improve. So I think it's a 
> > better thing to put it into Flink, and the improvement for it will be more 
> > natural in the future.
> >
> > Best,
> > Jingsong Lee
> >
> > On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz  > > wrote:
> > Hi Stephan,
> >
> > I think this is a nice library, but what I like more about it is that it 
> > suggests exploring different use-cases. I think it definitely makes sense 
> > for the Flink community to explore more lightweight applications that 
> > reuses resources. Therefore I definitely think it is a good idea for Flink 
> > community to accept this contribution and help maintaining it.
> >
> > Personally I'd prefer to have it in a separate repository. There were a few 
> > discussions before where different people were suggesting to extract 
> > connectors and other libraries to separate repositories. Moreover I think 
> > it could serve as 

Re: StreamingFileSink rolling callback Inbox

2019-09-12 Thread Kostas Kloudas
Hi Anton,

First of all, there is this PR
https://github.com/apache/flink/pull/9581 that may be interesting to
you.

Second, I think you have to keep in mind that the hourly bucket
reporting will be per-subtask. So if you have parallelism of 4, each
of the 4 tasks will report individually that they are done with hour
e.g. 10, and it is up to the receiving end to know if it should wait
for more or not. This may be a problem for your stateful assigner
approach as the assigner cannot know by default which subtask it
belongs to. If, for example, you have parallelism of 1, then your
stateful assigner approach could work, although it suffers from the
problem you also mentioned, that it is not integrated with
checkpointing (so a part file may be "reverted") and that a file may
roll, but it does not mean that the previous is already written to the
FS.

Third, a solution could be that instead of having the job itself
pushing notifications that a part file has rolled (which may suffer
from the problem that a part file may roll but the FS takes some time
until it writes everything to disk), you could simply monitor the FS
directory where you are writing your buckets, and parse the part file
names in order to know that all subtasks have finished with hour X.
This can be done by another job which will also put notifications to
the SQS. I think that this will also solve your concern: "I’m also
thinking on how I should couple this with checkpointing mechanism as
ideally I’d like to not invoke this callback before checkpoint is
written."

Cheers,
Kostas

On Mon, Sep 9, 2019 at 12:40 PM Anton Parkhomenko  wrote:
>
> Hello,
>
> I’m writing a Flink job that reads heterogenius (one row contains several 
> types that need to be partitioned downstream) data from AWS Kinesis and 
> writes to S3 directory structure like s3://bucket/year/month/day/hour/type, 
> this all works great with StreamingFileSink in Flink 1.9, but problem is that 
> I need to immedietely (or “as soon as possible” rather) let know another 
> application to know when “hour” bucket has rolled (i.e. we’re 100% sure it 
> won’t write any more data for this hour). Another problem is that data can be 
> very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% 
> of rows with typeB and 1% of rows with typeC.
>
> My current plan is to:
>
> 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t 
> care about event time at all)
> 2. Assign every row its bucket in a windowing function
> 3. Write a stateful BucketAssigner that:
> 3.1. Keeps its last window in a mutable variable
> 3.2. Once we received a row with newer window sends a message to SQS and 
> increments the window
>
> My biggest concern now is about 3rd point. For me BucketAssigner looks like a 
> pure function of (Row, Time) -> Bucket and I’m not sure that introducing 
> state and side-effect there would be reasonable. Is there any other ways to 
> do it? I’m also thinking on how I should couple this with checkpointing 
> mechanism as ideally I’d like to not invoke this callback before checkpoint 
> is written.
>
> StreamingFileSink provides not much ways to extend it. I tried to 
> re-implement it for my purposes, but stumbled upon many private methods and 
> classes, so even though it looks possible, the end result probably will be 
> too ugly.
>
> To make things a little bit easier, I don’t care too much about delivery 
> semantics of those final SQS messages - if I get only ~99% of them - that’s 
> fine, if some of them will be duplicated - that’s also fine.
>
> Regards,
> Anton


Re: Will there be a Flink 1.9.1 release ?

2019-09-09 Thread Kostas Kloudas
Hi Debasish,

So far I am not aware of any concrete timeline for Flink 1.9.1 but
I think that Gordon and Kurt (cc'ed) who were the release-1.9
managers are the best to answer this question.

Cheers,
Kostas

On Mon, Sep 9, 2019 at 9:38 AM Debasish Ghosh  wrote:
>
> Hello -
>
> Is there a plan for a Flink 1.9.1 release in the short term ? We are using 
> Flink and Avro with Avrohugger generating Scala case classes form Avro 
> schema. Hence we need https://github.com/apache/flink/pull/9565 which has 
> been closed recently.
>
> regards.
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg


Re: Exception when trying to change StreamingFileSink S3 bucket

2019-09-05 Thread Kostas Kloudas
Hi Sidhartha,

Your explanation is correct.
If you stopped the job with a savepoint and then you try to restore
from that savepoint, then Flink will try to restore its state
which is, of course, included in its old bucket.

But new data will go to the new bucket.

One solution is either to restart your job from scratch, if you do not
care about your "old" state.

Cheers,
Kostas

On Thu, Sep 5, 2019 at 1:27 PM Fabian Hueske  wrote:
>
> Hi,
>
> Kostas (in CC) might be able to help.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav 
> :
>>
>> Hi,
>>
>> Can someone suggest a workaround so that we do not get this issue while 
>> changing the S3 bucket ?
>>
>> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav  wrote:
>>>
>>> Hi,
>>>
>>> We are trying to change our StreamingFileSink S3 bucket, say from 
>>> s3://eu1/output_old to s3://eu2/output_new. When we do so we get an 
>>> exception and the taskmanger goes into a restart loop.
>>>
>>> We suspect that it tries to restore state and gets the bucketid from saved 
>>> state [ final BucketID bucketId = 
>>> recoveredState.getBucketId()]. Flink then tries to read output_old from eu2 
>>> and gets an AccessDeniedError. Rightly so as it has permission for 
>>> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is 
>>> Flink trying to access the old bucket and how to avoid this exception.
>>>
>>> Logs:
>>>
>>> > "S3Committer.java","line":"87","message":"Failed to commit after recovery 
>>> > output_old/2019-08-22/18/part-3-40134 with MPU ID 
>>> > 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
>>> >  Checking if file was committed before...",
>>>
>>> > "Task.java","line":"910","message":"... switched from RUNNING to FAILED."
>>>
>>> > java.nio.file.AccessDeniedException: 
>>> > output_old/2019-08-22/18/part-3-40134: getObjectMetadata on 
>>> > output_old/2019-08-22/18/part-3-40134: 
>>> > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> >  Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 
>>> > Forbidden; Request ID: 79F1AEE53131FB66; S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
>>> >  S3 Extended Request ID: 
>>> > 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
>>> >  Forbidden
>>>
>>> flink-taskmanager at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>>> flink-taskmanager at 
>>> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> flink-taskmanager at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> flink-taskmanager at 
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for 
>>> externalized checkpoint 

Re: BucketingSink - Could not invoke truncate while recovering from state

2019-08-27 Thread Kostas Kloudas
Hi Guyla,

Thanks for looking into it.
I did not dig into it but in the trace you posted there is the line:

Failed to TRUNCATE_FILE ... for
**DFSClient_NONMAPREDUCE_-1189574442_56** on 172.31.114.177 because
**DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease
holder**.

The client seems to be the same so can this be that we are trying to
get the lease twice from the same task and the lease is not
"reentrant"?

Cheers,
Kostas

On Tue, Aug 27, 2019 at 4:53 PM Gyula Fóra  wrote:
>
> Hi all!
>
> I am gonna try to resurrect this thread as I think I have hit the same issue 
> with the StreamingFileSink: https://issues.apache.org/jira/browse/FLINK-13874
>
> I don't have a good answer but it seems that we try to truncate before we get 
> the lease (even though there is logic both in BucketingSink and 
> HadoopRecoverable... to wait before calling truncate).
>
> Does anyone have any idea?
>
> Cheers,
> Gyula
>
> On Sun, Feb 24, 2019 at 4:13 AM sohimankotia  wrote:
>>
>> Any help ?
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Kostas Kloudas
Congratulations Andrey!
Well deserved!

Kostas

On Wed, Aug 14, 2019 at 4:04 PM Yun Tang  wrote:
>
> Congratulations Andrey.
>
> Best
> Yun Tang
> 
> From: Xintong Song 
> Sent: Wednesday, August 14, 2019 21:40
> To: Oytun Tez 
> Cc: Zili Chen ; Till Rohrmann ; 
> dev ; user 
> Subject: Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer
>
> Congratulations Andery~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Aug 14, 2019 at 3:31 PM Oytun Tez  wrote:
>
> Congratulations Andrey!
>
> I am glad the Flink committer team is growing at such a pace!
>
> ---
> Oytun Tez
>
> M O T A W O R D
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Wed, Aug 14, 2019 at 9:29 AM Zili Chen  wrote:
>
> Congratulations Andrey!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年8月14日周三 下午9:26写道:
>
> Hi everyone,
>
> I'm very happy to announce that Andrey Zagrebin accepted the offer of the 
> Flink PMC to become a committer of the Flink project.
>
> Andrey has been an active community member for more than 15 months. He has 
> helped shaping numerous features such as State TTL, FRocksDB release, Shuffle 
> service abstraction, FLIP-1, result partition management and various 
> fixes/improvements. He's also frequently helping out on the user@f.a.o 
> mailing lists.
>
> Congratulations Andrey!
>
> Best, Till
> (on behalf of the Flink PMC)


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Kostas Kloudas
Congratulations Rong!

On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:

> Congratulations Rong Rong!
> Welcome on board!
>
> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  wrote:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the Flink
>> PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on SQL
>> and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>


  1   2   3   4   >