Is Flink 1.15.3 planned in foreseeable future?

2022-10-13 Thread Maciek Próchniak

Hello,

I suppose that committers are heavily concentrated on 1.16, but are 
there plans to have 1.15.3 out?


We've been affected by https://issues.apache.org/jira/browse/FLINK-28488 
and it's preventing us from using 1.15.x at this moment.



thanks,

maciek



Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support

2022-10-05 Thread Maciek Próchniak

Hi Martin,

Could you please remind what was the conclusion of discussion on 
upgrading Scala to 2.12.15/16? 
https://lists.apache.org/thread/hwksnsqyg7n3djymo7m1s7loymxxbc3t - I 
couldn't find any follow-up vote?


If it's acceptable to break binary compatibility by such an upgrade, 
then upgrading to JDK17 before 2.0 will be doable?



thanks,

maciek


On 04.10.2022 18:21, Martijn Visser wrote:

Hi Yaroslav,

Thanks for the feedback, that's much appreciated! Regarding Java 17 as 
a prerequisite, we would have to break compatibility already since 
Scala 2.12.7 doesn't compile on Java 17 [1].


Given that we can only remove Scala APIs with the next major Flink 
(2.0) version, would that still impact you a lot? I do imagine that if 
we get to a Flink 2.0 version there would be more breaking involved 
anyway. The biggest consequence of deprecating support for Scala in 
Flink 1.x would be that new APIs would only be available in Java, but 
since these don't exist yet there would be no refactoring involved. I 
can imagine that we might change something in an existing API, but 
that would have certain compatibility guarantees already (depending if 
it's Public/PublicEvolving/Experimental). If a change would happen 
there, I think it would be smaller refactoring.


Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/FLINK-25000

On Tue, Oct 4, 2022 at 10:58 AM Yaroslav Tkachenko 
 wrote:


Hi Martijn,

As a Scala user, this change would affect me a lot and I'm not
looking forward to rewriting my codebase, and it's not even a very
large one :)

I'd like to suggest supporting Java 17 as a prerequisite
(https://issues.apache.org/jira/browse/FLINK-15736). Things like
switch expressions and records could simplify the migration
quite a bit. Would you consider adding it to the FLIP?

On Tue, Oct 4, 2022 at 10:50 AM Jing Ge  wrote:

Hi Martijn,

Thanks for bringing this up. It is generally a great idea, so +1.

Since both scala extension projects mentioned in the FLIP are
still very young and I don't think they will attract more
scala developers as Flink could just because they are external
projects. It will be a big issue for users who have to rewrite
their large codebases. Those users should be aware of the
effort from now on and would better not count on those scala
extension projects and prepare their migration plan
before Flink 2.0.

Best regards,
Jing


On Tue, Oct 4, 2022 at 1:59 PM Martijn Visser
 wrote:

Hi Marton,

You're making a good point, I originally wanted to include
already the User mailing list to get their feedback but
forgot to do so. I'll do some more outreach via other
channels as well.

@Users of Flink, I've made a proposal to deprecate and
remove Scala API support in a future version of Flink.
Your feedback on this topic is very much appreciated.

Regarding the large Scala codebase for Flink, a potential
alternative could be to have a wrapper for all Java APIs
that makes them available as Scala APIs. However, this
still requires Scala maintainers and I don't think that we
currently have those in our community. The easiest
solution for them would be to use the Java APIs directly.
Yes it would involve work, but we won't actually be able
to remove the Scala APIs until Flink 2.0 so there's still
time for that :)

Best regards,

Martijn

On Tue, Oct 4, 2022 at 1:26 AM Márton Balassi
 wrote:

Hi Martjin,

Thanks for compiling the FLIP. I agree with the
sentiment that Scala poses
considerable maintenance overhead and key improvements
(like 2.13 or 2.12.8
supports) are hanging stale. With that said before we
make this move we
should attempt to understand the userbase affected.
A quick Slack and user mailing list search does return
quite a bit of
results for scala (admittedly a cursory look at them
suggest that many of
them have to do with missing features in Scala that
exist in Java or Scala
versions). I would love to see some polls on this
topic, we could also use
the Flink twitter handle to ask the community about this.

I am aware of users having large existing Scala
codebases for Flink. This
move would pose a very large effort on them, as they
would need to rewrite
much of their existing code. What are the alternatives
in your 

Re: Slow Tests in Flink 1.15

2022-09-09 Thread Maciek Próchniak

Hi,

we also had similar problems in Nussknacker recently (tests on fake 
sources), my colleague found out it's due to 
ENABLE_CHECKPOINTS_AFTER_FINISH flag 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#waiting-for-the-final-checkpoint-before-task-exit) 
is set by default to true in 1.15. After the fake source ends, the job 
waits for next checkpoint to be triggered before finishing. In the end 
we reduced checkpoint interval in some places and disabled checkpoints 
altogether in some other tests


maciek

On 09.09.2022 10:44, David Jost wrote:

Hey, sorry for not coming back to this earlier, but I was hoping to better 
isolate the problem for analysis.

Maybe for comparison to Alexey's case: We have four different pipelines at 
ours, which are all built similarly. Though we use Kafka in the actual jobs, 
the tests are using fake sources and sinks. Only the tests which use the 
MiniClusterWithClientResource are affected (we have one badly written test, 
which runs the pipeline without MiniCluster and it is not affected).

I analysed it a bit and identified, that the job would hang for about 30s after 
the last event has been pushed out the sink. Summing up these 30s for each 
(job) test results in the additional time used by all tests in the end.
So I would assume, that there is some kind of wind-down timer or so, which 
holds up everything.

I hope, this is helpful somehow. I would love to find the source of this issue. 
I was hoping to isolate the issue in an MWE, but have been unsuccessful for now.

NB: I tested the tests with both, MiniClusterWithClientResource (with 
adjustments for JUnit 5) and MiniClusterExtension, but there was no noticeable 
difference.


On 7. Sep 2022, at 14:41, Alexey Trenikhun  wrote:

The class contains single test method, which runs single job (the job is quite 
complex), then waits for job being running after that waits for data being 
populated in output topic, and this doesn't happen during 5 minutes (test 
timeout). Tried under debugger, set breakpoint in Kafka record deserializer it 
is hit but very slow, roughly 3 records per 5 minute (the topic was 
pre-populated)

No table/sql API, only stream API
From: Chesnay Schepler 
Sent: Wednesday, September 7, 2022 5:20 AM
To: Alexey Trenikhun ; David Jost ; Matthias 
Pohl 
Cc: user@flink.apache.org 
Subject: Re: Slow Tests in Flink 1.15
  
The test that gotten slow; how many test cases does it actually contain / how many jobs does it actually run?

Are these tests using the table/sql API?

On 07/09/2022 14:15, Alexey Trenikhun wrote:

We are also observing extreme slow down (5+ minutes vs 15 seconds) in 1 of 2 
integration tests . Both tests use Kafka. The slow test uses 
org.apache.flink.runtime.minicluster.TestingMiniCluster, this test tests 
complete job, which consumes and produces Kafka messages. Not affected test 
extends org.apache.flink.test.util.AbstractTestBase which uses 
MiniClusterWithClientResource, this test is simpler and only produce Kafka 
messages.

Thanks,
Alexey
From: Matthias Pohl via user 
Sent: Tuesday, September 6, 2022 6:36 AM
To: David Jost 
Cc: user@flink.apache.org 
Subject: Re: Slow Tests in Flink 1.15
  
Hi David,

I guess, you're referring to [1]. But as Chesnay already pointed out in the 
previous thread: It would be helpful to get more insights into what exactly 
your tests are executing (logs, code, ...). That would help identifying the 
cause.

Can you give us a more complete stacktrace so we can see what call in
Flink is waiting for something?

Does this happen to all of your tests?
Can you provide us with an example that we can try ourselves? If not,
can you describe the test structure (e.g., is it using a
MiniClusterResource).

Matthias

[1] https://lists.apache.org/thread/yhhprwyf29kgypzzqdmjgft4qs25yyhk

On Mon, Sep 5, 2022 at 4:59 PM David Jost  wrote:
Hi,

we were going to upgrade our application from Flink 1.14.4 to Flink 1.15.2, 
when we noticed, that all our job tests, using a MiniClusterWithClientResource, 
are multiple times slower in 1.15 than before in 1.14. I, unfortunately, have 
not found mentions in that regard in the changelog or documentation. The 
slowdown is rather extreme I hope to find a solution to this. I saw it 
mentioned once in the mailing list, but there was no (public) outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
  David


Kafka connector depending on Table API

2021-08-31 Thread Maciek Próchniak

Hello,

we are testing 1.14 RC0 and we discovered that we need to include 
table-api as dependency when using kafka connector - e.g. due to this 
change:


https://github.com/apache/flink/blame/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java#L75

(don't know if it's the only place).

The table-api is marked as optional dependency, I understand it's needed 
when using Kafka with table API, but I guess for pure datastream API 
jobs it shouldn't be needed? Or is it intended and we should just add 
table-api as provided dependency?



thanks,

maciek



Flink 1.11.4?

2021-04-12 Thread Maciek Próchniak

Hello,

I'd like to ask if there are any plans to release 1.11.4 - I understand 
it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just 
round the corner"?


There are a few fixes we'd like to use - e.g. 
https://issues.apache.org/jira/browse/FLINK-9844, 
https://issues.apache.org/jira/browse/FLINK-21164



thanks,

maciek



Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-09 Thread Maciek Próchniak

Hi Arvid,

"You can still bundle it into your jar if you prefer it." - is it really 
the case with JDBC drivers? I think that if the driver is not on Flink 
main classpath (that is, in the lib folder) there is no way the class 
would be loaded by main classloader - regardless of parent/child 
classloader setting?


Those settings will help if the driver is both on Flink classpath and in 
user jar - I noticed now the documentation is slightly misleading 
suggesting otherwise, isn't it?



thanks,

maciek


On 09.04.2021 11:25, Arvid Heise wrote:

Hi,

What do you mean by light-weight way? Just to clarify: you copy the 
jar once in the lib folder and restart the cluster once (and put it 
into the lib/ for future clusters). Not sure how it would be more 
light-weight.


You can still bundle it into your jar if you prefer it. It just tends 
to be big but if it's easier for you to not touch the cluster, then 
just put everything into your jar.


On Fri, Apr 9, 2021 at 4:08 AM 太平洋 <495635...@qq.com 
<mailto:495635...@qq.com>> wrote:


I have tried  to add
'classloader.parent-first-patterns.additional:
"ru.yandex.clickhouse" ' to flink-config, but problem still exist.
Is there lightweight way to put clickhouse JDBC driver on Flink
lib/ folder?

-- 原始邮件 ------
*发件人:* "Maciek Próchniak" mailto:m...@touk.pl>>;
*发送时间:* 2021年4月9日(星期五) 凌晨3:24
*收件人:* "太平洋"<495635...@qq.com <mailto:495635...@qq.com>>;"Arvid
Heise"mailto:ar...@apache.org>>;"Yangze
Guo"mailto:karma...@gmail.com>>;
*抄送:* "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;"renqschn"mailto:renqs...@gmail.com>>;
*主题:* Re: 回复: period batch job lead to OutOfMemoryError:
Metaspace problem

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in
lib folder) and not in user-jar - as described here:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code

<https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code>?

When we encountered Metaspace leaks recently, in quite a few cases
it turned out that the problem was the JDBC driver in user
classloder which was registered by DriverManager and caused
classloader leak.


maciek


On 08.04.2021 11:42, 太平洋 wrote:

My application program looks like this. Does this structure has
some problem?

public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner()
.inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t,
DataPoint.class);

DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new
OutputTag("predict") {
};

SingleOutputStreamOperator mainDataStream =
predictPoints
.process();

DataStream exStream =
mainDataStream.getSideOutput(outPutPredict);

                                        //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new
CkSinkBuilder(),
new
JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new

JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)
.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



-- 原始邮件 --
*发件人:* "Arvid Heise"  <mailto:ar...@apache.org>;
*发送时间:* 2021年4月8日(星期四) 下午2:33
*收件人:* "Yangze Guo"
<mailto:karma...@gmail.com>;
*抄送:* "太平洋"<495635...@qq.com>
<mailto:495635...@qq.com>;"user"
<mailto:user@flink.apache.org>;"

Re: 回复: period batch job lead to OutOfMemoryError: Metaspace problem

2021-04-08 Thread Maciek Próchniak

Hi,

Did you put the clickhouse JDBC driver on Flink main classpath (in lib 
folder) and not in user-jar - as described here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/debugging/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code?


When we encountered Metaspace leaks recently, in quite a few cases it 
turned out that the problem was the JDBC driver in user classloder which 
was registered by DriverManager and caused classloader leak.



maciek


On 08.04.2021 11:42, ?? wrote:
My application program looks like this. Does this structure has some 
problem?


public class StreamingJob {
public static void main(String[] args) throws Exception {
int i = 0;
while (i < 100) {
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(Parallelism);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner()

.inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);


bsTableEnv.executeSql("CREATE TEMPORARY TABLE ");
Table t = bsTableEnv.sqlQuery(query);

DataStream points = bsTableEnv.toAppendStream(t, 
DataPoint.class);


DataStream weightPoints = points.map();

DataStream predictPoints = weightPoints.keyBy()
.reduce().map();

// side output
final OutputTag outPutPredict = new 
OutputTag("predict") {

};

SingleOutputStreamOperator mainDataStream = predictPoints
.process();

DataStream exStream = 
mainDataStream.getSideOutput(outPutPredict);


?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 ?0?2 
?0?2 ?0?2 ?0?2 ?0?2 //write data to clickhouse
String insertIntoCKSql = "xxx";
mainDataStream.addSink(JdbcSink.sink(insertIntoCKSql, new CkSinkBuilder(),
new JdbcExecutionOptions.Builder().withBatchSize(CkBatchSize).build(),
new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(CkDriverName)

.withUrl(CkUrl).withUsername(CkUser).withPassword(CkPassword).build()));

// write data to kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>();
exStream.map().addSink(producer);

env.execute("Prediction Program");
} catch (Exception e) {
e.printStackTrace();
}
i++;
Thread.sleep(window * 1000);
}
}
}



--?0?2?0?2--
*??:* "Arvid Heise" ;
*:*?0?22021??4??8??(??) 2:33
*??:*?0?2"Yangze Guo";
*:*?0?2"??"<495635...@qq.com>;"user";"guowei.mgw";"renqschn";
*:*?0?2Re: period batch job lead to OutOfMemoryError: Metaspace problem

Hi,

ChildFirstClassLoader are created (more or less) by application jar 
and seeing so many looks like a classloader leak to me. I'd expect you 
to see a new ChildFirstClassLoader popping up with each new job 
submission.


Can you check who is referencing the ChildFirstClassLoader 
transitively? Usually, it's some thread that is lingering around 
because some third party library is leaking threads etc.


OneInputStreamTask is legit and just indicates that you have a job 
running with 4 slots on that TM. It should not hold any dedicated 
metaspace memory.


On Thu, Apr 8, 2021 at 4:52 AM Yangze Guo > wrote:


I went through the JM & TM logs but could not find any valuable clue.
The exception is actually thrown by kafka-producer-network-thread.
Maybe @Qingsheng could also take a look?


Best,
Yangze Guo

On Thu, Apr 8, 2021 at 10:39 AM ?? <495635...@qq.com
> wrote:
>
> I have configured to 512M, but problem still exist. Now the
memory size is still 256M.
> Attachments are TM and JM logs.
>
> Look forward to your reply.
>
> --  --
> ??: "Yangze Guo" mailto:karma...@gmail.com>>;
> : 2021??4??6??(??) 6:35
> ??: "??"<495635...@qq.com >;
> : "user"mailto:user@flink.apache.org>>;"guowei.mgw"mailto:guowei@gmail.com>>;
> : Re: period batch job lead to OutOfMemoryError: Metaspace
problem
>
> > I have tried this method, but the problem still exist.
> How much memory do you configure for it?
>
> > is 21 instances of
"org.apache.flink.util.ChildFirstClassLoader" normal
> Not quite sure about it. AFAIK, each job will have a classloader.
> Multiple tasks of the same job in the same TM will share the same
> classloader. The classloader will be removed if there is no more
task
> running on the TM. Classloader without reference will be finally
> cleanup by GC. Could you share JM and TM logs for further analysis?
> I'll also involve @Guowei Ma in this thread.
>
>
> Best,
> Yangze Guo
>
> On Tue, Apr 6, 2021 at 6:05 PM ?? <495635...@qq.com
> wrote:
> >
> > I have tried this method, but the problem still 

Re: Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Maciek Próchniak

Hi,

don't know if this is the problem you're facing, but some time ago we 
encountered two issues connected to REST API and increased disk usage 
after each submission:


https://issues.apache.org/jira/browse/FLINK-21164

https://issues.apache.org/jira/browse/FLINK-9844

- they're closed ATM, but only 1.12.2 contains the fixes.


maciek


On 08.04.2021 19:52, Great Info wrote:


I have deployed my own flink setup in AWS ECS. One Service for 
JobManager and one Service for task Managers. I am running one ECS 
task for a job manager and 3 ecs tasks for TASK managers.


I have a kind of batch job which I upload using flink rest every-day 
with changing new arguments, when I submit each time disk memory gets 
increased by ~ 600MB, I have given a checkpoint as S3 . Also I have 
set *historyserver.archive.clean-expired-jobs* true .


Since I am running on ECS, I am not able to find why the memory is 
getting increased on every jar upload and execution .


What are the flink config params I should look at to make sure the 
memory is not shooting up?




Re: Future of QueryableState

2021-03-10 Thread Maciek Próchniak

Hi Konstantin,

thanks for detailed answer. I also thought about CoFunction, but it is a 
bit too heavy for us for the moment (each state would have to have 
additional kafka producer/consumer).


Guess we'll use QueryableState for now and try to phase it out slowly...


thanks,

maciek


On 09.03.2021 17:42, Konstantin Knauf wrote:

Hi Maciek,

Thank you for reaching out. I'll try to answer your questions separately.

- nothing comparable. You already mention the State Processor API. 
Besides that, I can only think of a side channel (CoFunction) that is 
used to request a certain state that is then send to a side output and 
ultimate to a sink, e.g. Kafka State Request Topic -> Flink -> Kafka 
State Response Topic. This puts this complexity into the Flink Job, 
though.


- I think it is a combination of both. Queryable State works well 
within its limitations. In the case of the RocksDBStatebackend this is 
mainly the availability of the job and the fact that you might read 
"uncommitted" state updates. In case of the heap-backed statebackends 
there are also synchronization issues, e.g. you might read stale 
values. You also mention the fact that queryable state has been an 
afterthought when it comes to more recent deployment options. I am not 
aware of any Committer who currently has the time to work on this to 
the degree that would be required. So, we thought, it would be more 
fair and realistic to mark Queryable State as "approaching end of 
life" in the sense that there is no active development on that 
component anymore.


Best,

Konstantin

On Tue, Mar 9, 2021 at 7:08 AM Maciek Próchniak <mailto:m...@touk.pl>> wrote:


Hello,


We are using QueryableState in some of Nussknacker deployments as
a nice
addition, allowing end users to peek inside job state for a given key
(we mostly use custom operators).


Judging by mailing list and feature radar proposition by Stephan:

https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg

<https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg>



this feature is not widely used/supported. I'd like to ask:

- are there any alternative ways of accessing state during job
execution? State API is very nice, but it operates on checkpoints and
loading whole state to lookup one key seems a bit heavy?

- are there any inherent problems in QueryableState design (e.g. it's
not feasible to use it in K8 settings, performance considerations) or
just lack of interest/support (in that case we may offer some help)?


thanks,

maciek



--

Konstantin Knauf

https://twitter.com/snntrable <https://twitter.com/snntrable>

https://github.com/knaufk <https://github.com/knaufk>



Future of QueryableState

2021-03-08 Thread Maciek Próchniak

Hello,


We are using QueryableState in some of Nussknacker deployments as a nice 
addition, allowing end users to peek inside job state for a given key 
(we mostly use custom operators).



Judging by mailing list and feature radar proposition by Stephan: 
https://github.com/StephanEwen/flink-web/blob/feature_radar/img/flink_feature_radar.svg 



this feature is not widely used/supported. I'd like to ask:

- are there any alternative ways of accessing state during job 
execution? State API is very nice, but it operates on checkpoints and 
loading whole state to lookup one key seems a bit heavy?


- are there any inherent problems in QueryableState design (e.g. it's 
not feasible to use it in K8 settings, performance considerations) or 
just lack of interest/support (in that case we may offer some help)?



thanks,

maciek



Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Maciek Próchniak

Hi Chesnay,

thanks for reply. I wonder if FLINK-21164 will help without FLINK-9844 - 
if the jar file is not closed, it won't be successfully deleted?


As for FLINK-9844 - I understand that having code like

if (userClassLoader instanceof Closeable) { ((Closeable) 
userClassloader).close() }


is too "dirty trick" to be considered?


thanks,

maciek


On 27.01.2021 13:00, Chesnay Schepler wrote:
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI 
but is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally on 
a good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment 
changing default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not 
deleting those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be 
cleaned up after the job is terminated (I assume that your jobs 
successfully finished). The jars are managed by the BlobService. The 
dispatcher will trigger the jobCleanup in [1] after job termination. 
Are there any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 
<https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797>


On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak <mailto:m...@touk.pl>> wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via
heap dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek





Re: JobManager seems to be leaking temporary jar files

2021-01-26 Thread Maciek Próchniak

Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment changing 
default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not deleting 
those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be cleaned 
up after the job is terminated (I assume that your jobs successfully 
finished). The jars are managed by the BlobService. The dispatcher 
will trigger the jobCleanup in [1] after job termination. Are there 
any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 
<https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797>


On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak <mailto:m...@touk.pl>> wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it seems
that
PackagedProgram.deleteExtractedLibraries is not invoked when using
REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the disk
space
is not reclaimed until process is restarted, we also see via heap
dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit quite
a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find where
this
leak would happen...

Does anybody have some pointers where we can search? Or how to fix
this
behaviour?


thanks,

maciek



JobManager seems to be leaking temporary jar files

2021-01-25 Thread Maciek Próchniak

Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit 
multiple jobs with it)


- additional jars embedded in lib directory of main jar (this is crucial 
part)


When we submit jobs this way, Flink creates new temp jar files via 
PackagedProgram.extractContainedLibraries method.


We observe that they are not removed after job finishes - it seems that 
PackagedProgram.deleteExtractedLibraries is not invoked when using REST 
API.


What's more, it seems that those jars remain open in JobManager process. 
We observe that when we delete them manually via scripts, the disk space 
is not reclaimed until process is restarted, we also see via heap dump 
inspection that java.util.zip.ZipFile$Source  objects remain, pointing 
to those files. This is quite a problem for us, as we submit quite a few 
jobs, and after a while we ran out of either heap or disk space on 
JobManager process/host. Unfortunately, I cannot so far find where this 
leak would happen...


Does anybody have some pointers where we can search? Or how to fix this 
behaviour?



thanks,

maciek




Re: Partitioned tables in SQL client configuration.

2020-12-03 Thread Maciek Próchniak

Hi Jark,

thanks for answer. I'm a bit puzzled, because in my yaml I'm using  
"connector: filesystem" (not connector.type). I don't think I end up using


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html#file-system-connector 
- this connector as partitioning and orc format are handled correctly.



It's also not clear for me what is "not legacy" connector for reading 
files directly from filesystem (no Hive). I don't see any implementation 
of DynamicTableSourceFactory which would do this.


I assumed that using DDL I wrote below also gives me 
FileSystemTableFactory, am I wrong?



thanks,

maciek



On 03.12.2020 16:26, Jark Wu wrote:
Only legacy connectors (`connector.type=kafka` instead of 
`connector=kafka`) are supported in the YAML at the moment. You can 
use regular DDL instead. There is a similar discussion in 
https://issues.apache.org/jira/browse/FLINK-20260 
<https://issues.apache.org/jira/browse/FLINK-20260> these days.


Best,
Jark

On Thu, 3 Dec 2020 at 00:52, Till Rohrmann <mailto:trohrm...@apache.org>> wrote:


Hi Maciek,

I am pulling in Timo who might help you with this problem.

Cheers,
Till

On Tue, Dec 1, 2020 at 6:51 PM Maciek Próchniak mailto:m...@touk.pl>> wrote:

Hello,

I try to configure SQL Client to query partitioned ORC data on
local
filesystem. I have directory structure like that:

/tmp/table1/startdate=2020-11-28

/tmp/table1/startdate=2020-11-27

etc.


If I run SQL Client session and create table by hand:

create table tst (column1 string, startdate string)
partitioned by
(startdate) with ('connector'='filesystem', 'format'='orc',
'path'='/tmp/table1');

everything runs fine:

explain select * from tst where startdate='2020-11-27'

shows that only one partition in 'readPartitions'


However, I struggle to configure table in .yaml config.

I tried like this (after some struggle, as "partition.keys"
setting
doesn't seem to be documented...) :

tables:
   - name: tst2
 type: source-table
 connector: filesystem
 path: "/tmp/table1"
 format: orc
 partition.keys:
   - name: startdate
 schema:
   - name: column1
 data-type: string
   - name: startdate
 data-type: string

and it more or less works - queries are executed properly.
However,
partitions are not pruned:

explain select * from tst2 where startdate='2020-11-27'

show all partitions in 'readPartitions'


Any idea what can be wrong? I'm using Flink 1.11.2


thanks,

maciek




Partitioned tables in SQL client configuration.

2020-12-01 Thread Maciek Próchniak

Hello,

I try to configure SQL Client to query partitioned ORC data on local 
filesystem. I have directory structure like that:


/tmp/table1/startdate=2020-11-28

/tmp/table1/startdate=2020-11-27

etc.


If I run SQL Client session and create table by hand:

create table tst (column1 string, startdate string) partitioned by 
(startdate) with ('connector'='filesystem', 'format'='orc', 
'path'='/tmp/table1');


everything runs fine:

explain select * from tst where startdate='2020-11-27'

shows that only one partition in 'readPartitions'


However, I struggle to configure table in .yaml config.

I tried like this (after some struggle, as "partition.keys" setting 
doesn't seem to be documented...) :


tables:
  - name: tst2
    type: source-table
    connector: filesystem
    path: "/tmp/table1"
    format: orc
    partition.keys:
  - name: startdate
    schema:
  - name: column1
    data-type: string
  - name: startdate
    data-type: string

and it more or less works - queries are executed properly. However, 
partitions are not pruned:


explain select * from tst2 where startdate='2020-11-27'

show all partitions in 'readPartitions'


Any idea what can be wrong? I'm using Flink 1.11.2


thanks,

maciek




Re: Conversion of Table (Blink/batch) to DataStream

2020-04-05 Thread Maciek Próchniak

Hi Jark,

thanks for quick answer - I strongly suspected there is a hack like that 
somewhere - but couldn't find it easily in the maze of old and new scala 
and java APIs :D


For my current experiments it's ok, I'm sure in next releases everything 
will be cleaned up :)



best,

maciek



On 05/04/2020 06:04, Jark Wu wrote:

Hi Maciek,

This will be supported in the future.
Currently, you can create a `StreamTableEnvironmentImpl` by yourself 
using the constructor (the construct does'n restrict batch mode).

SQL CLI also does in the same way [1] (even though it's a hack).

Best,
Jark

[1]: 
https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java#L419


On Sat, 4 Apr 2020 at 15:42, Maciek Próchniak <mailto:m...@touk.pl>> wrote:


Hello,

I'm playing around with Table/SQL API (Flink 1.9/1.10) and I was
wondering how I can do the following:

1. read batch data (e.g. from files)

2. sort them using Table/SQL SortOperator

3. perform further operations using "normal" DataStream API
(treating my
batch as finite stream) - to reuse the code I have developed for
stream
processing cases.


Now, to perform step 2. I understand I should use Blink planner in
batch
mode, but then - although there is StreamExecutionEnvironment
underneath
- there seems to be no easy

(or at least documented ;)) way of going from Table to DataStream.

The toAppendStream/toRetractStream are restricted to stream mode,
and if
I use it I cannot use SortOperator easily.

Of course, I can write results to some external output like files,
but
I'd like to avoid that...

Is there any nice way to do this? And if not - are there plans to
make
it possible in the future?


thanks,

maciek


ps. the new Table/SQL stuff is really, really cool!



Re: Dynamic Flink SQL

2020-04-04 Thread Maciek Próchniak

Hi Krzysiek,

the idea is quite interesting - although maintaining some coordination 
to be able to handle checkpoints would probably pretty tricky. Did you 
figure out how to handle proper distribution of tasks between TMs? As 
far as I understand you have to guarantee that all sources reading from 
cache are on the same TM as sinks writing data from Kafka? Or you think 
about some distributed caches?


As for your original question - we are also looking for solutions/ideas 
for this problem in Nussknacker. We have similar problem, however we had 
different constraints (on premise, not have to care too much about 
bandwidth) and we went with "one job per scenario". It works ok, but the 
biggest problem for me is that it does not scale with the number of 
jobs:  Flink job is quite heavy entity - all the threads, classloaders 
etc. Having more than a few dozens of jobs is also not so easy to handle 
on JobManager part - especially when it's restarted etc. I guess your 
idea would also suffer from this problem?



thanks,

maciek



On 27/03/2020 10:18, Krzysztof Zarzycki wrote:

I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory. 
Then I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the 
data to the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL 
based) , that uses the source from cache to read the data out and then 
reinterprets it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use 
different classloaders. The class of cached object also needs to be 
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it. 
All the problems of checkpointing this data I will leave for later.


I'm very very interested in your, community, comments about this idea 
and later productization of it.

Thanks!

Answering your comments:

Unless you need reprocessing for newly added rules, I'd probably
just cancel with savepoint and restart the application with the
new rules. Of course, it depends on the rules themselves and how
much state they require if a restart is viable. That's up to a POC.

No, I don't need reprocessing (yet). The rule starts processing the 
data from the moment it is defined.
The cancellation with savepoint was considered, but because the number 
of new rules defined/changed daily might be large enough, that will 
generate too much of downtime. There is a lot of state kept in those 
rules making the restart heavy. What's worse, that would be 
cross-tenant downtime, unless the job was somehow per team/tenant. 
Therefore we reject this option.
BTW, the current design of our system is similar to the one from the 
blog series by Alexander Fedulov about dynamic rules pattern [2] he's 
just publishing.



They will consume the same high intensive source(s) therefore
I want to optimize for that by consuming the messages in Flink
only once.

That's why I proposed to run one big query instead of 500 small
ones. Have a POC where you add two of your rules manually to a
Table and see how the optimized logical plan looks like. I'd bet
that the source is only tapped once.


I can do that PoC, no problem. But AFAIK it will only work with the 
"restart with savepoint" pattern discussed above.



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html



On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki
mailto:k.zarzy...@gmail.com>> wrote:

Hello Arvid,
Thanks for joining to the thread!
First, did you take into consideration that I would like to
dynamically add queries on the same source? That means first
define one query, later the day add another one , then another
one, and so on. A Week later kill one of those, start yet
another one, etc... There will be hundreds of these queries
running at once, but the set of queries change several times a
day.
They will consume the same high intensive source(s) therefore
I want to optimize for that by consuming the messages in Flink
only once.

Regarding the temporary tables AFAIK they are only the
metadata (let's say Kafka topic detals) and store it in the
scope of a SQL session. Therefore multiple queries against
that temp table will behave the same way as querying normal
table, that is will read the datasource multiple times.

It looks like the feature I want or could use is defined by
the way of FLIP-36 about Interactive Programming, more
precisely caching the stream table [1].
While 

Conversion of Table (Blink/batch) to DataStream

2020-04-04 Thread Maciek Próchniak

Hello,

I'm playing around with Table/SQL API (Flink 1.9/1.10) and I was 
wondering how I can do the following:


1. read batch data (e.g. from files)

2. sort them using Table/SQL SortOperator

3. perform further operations using "normal" DataStream API (treating my 
batch as finite stream) - to reuse the code I have developed for stream 
processing cases.



Now, to perform step 2. I understand I should use Blink planner in batch 
mode, but then - although there is StreamExecutionEnvironment underneath 
- there seems to be no easy


(or at least documented ;)) way of going from Table to DataStream.

The toAppendStream/toRetractStream are restricted to stream mode, and if 
I use it I cannot use SortOperator easily.


Of course, I can write results to some external output like files, but 
I'd like to avoid that...


Is there any nice way to do this? And if not - are there plans to make 
it possible in the future?



thanks,

maciek


ps. the new Table/SQL stuff is really, really cool!



queryable state API

2018-02-01 Thread Maciek Próchniak

Hello,

Currently (1.4) to be able to use queryable state client has to know ip 
of (working) task manager and port. This is a bit awkward - as it forces 
external services to know details of flink cluster. Event more complex 
when we define port range for queryable state proxy and we're not sure 
which port is chosen... In former versions it was possible to use only 
job manager address - I understand it was changed for performance reasons.


Are there plans to make using queryable state a bit easier? E.g. to be 
able to get list of taskmanagers with respectable queryable state 
proxies via JobManager REST API? It would be great if external services 
could communicate with Flink cluster knowing only jobmanager adresses...


thanks,

maciek



Re: flowable <-> flink integration

2018-01-18 Thread Maciek Próchniak

Hi Martin,


I did some activiti development so your mail caught my attention :)

I don't think I understand what are you trying to achieve - where is 
process you're simulating, where is simulation running and where is 
place for Flink. Do you want to invoke Flink (batch job I suppose?) from 
Flowable process? Or do you want to run simulations of BPMN process as 
Flink job?



thanks,

maciek


On 16/01/2018 22:29, Martin Grofčík wrote:

Hi,

I want to implement flowable (BPMN platform  - www.flowable.org 
) <-> flink integration module. The 
motivation is to execute process simulations with flink (simple 
simulation experiment example 
https://gromar01.wordpress.com/2017/11/07/will-we-meet-our-kpis/). I 
was able to create



Flink provides REST API through which I can easily create a job and 
monitor its execution.

wordCountProcess.PNG
(15K)



at the end I can encapsulate whole process into one task (e.g. Execute 
flink job) which will do the same in java code.
In fact I have no experience with flink that's why I can imagine only 
process steps to:

1. create flink job
2. monitor its state

Question 1:
Can you propose another useful process steps? (e.g.  to download 
results, upload datasets, .)

(Provide me a link how I can proceed with their implementation, please)

Question 2:
The problem with the process is that it is always checking job state. 
I would prefer to add a hook at the end of flink job execution to call 
flowable rest API to notify process instance about the job finished 
(failed) events.
The way which I have found is to implement rest end 
point org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler 
which calls flowable rest api at the end of flink job execution.
What I would prefer is to make something like wrapper around the main 
class to execute flowable rest call at the end.

Can you provide me a hint how to implement this wrapper please?

Thank you in advance for the answer.

Regards
Martin




Re: Checkpoint was declined (tasks not ready)

2017-10-23 Thread Maciek Próchniak

it seems that one of operators is stuck during recovery:

prio=5 os_prio=0 tid=0x7f634bb31000 nid=0xd5e in Object.wait() 
[0x7f63f13cc000]

java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502 <http://object.java:502/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:406 
<http://asyncwaitoperator.java:406/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228 
<http://asyncwaitoperator.java:228/>)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:174 
<http://asyncwaitoperator.java:174/>)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376 
<http://streamtask.java:376/>)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253 
<http://streamtask.java:253/>)

- locked <0x00037ae51a38> (a java.lang.Object)
atorg.apache.flink.runtime.taskmanager.Task.run 
<http://org.apache.flink.runtime.taskmanager.task.run/>(Task.java:702)

atjava.lang.Thread.run <http://java.lang.thread.run/>(Thread.java:745)


On 23/10/2017 13:54, Maciek Próchniak wrote:


we also have similar problem - it happens really often when we invoke 
async operators (ordered one). But we also observe that job is not 
starting properly - we don't process any data when such problems appear


we'll keep you posted if we manage to find exact cause...


thanks,
maciek

On 09/10/2017 12:10, Karthik Deivasigamani wrote:

Hi Stephan,
Once the job restarts due to an async io operator timeout we 
notice that its checkpoints never succeed again.  But the job is 
running fine and is processing data.

~
Karthik


On Mon, Oct 9, 2017 at 3:19 PM, Stephan Ewen <se...@apache.org 
<mailto:se...@apache.org>> wrote:


As long as this does not appear all the time, but only once in a
while, it should not be a problem.
It simply means that this particular checkpoint could not be
triggered, because some sources were not ready yet.

It should try another checkpoint and then be okay.


On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani
<karthi...@gmail.com <mailto:karthi...@gmail.com>> wrote:

We are using Flink 1.3.1 in Standalone mode with a HA job
manager setup.
~
Karthik

On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani
<karthi...@gmail.com <mailto:karthi...@gmail.com>> wrote:

Hi,
I'm noticing a weird issue with our flink streaming
job. We use async io operator which makes a HTTP call and
in certain cases when the async task times out, it throws
an exception and causing the job to restart.

java.lang.Exception: An async function call terminated with an 
exception. Failing the AsyncWaitOperator.
at 
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
at 
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.TimeoutException: Async function call has timed out.
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)


After the job restarts(we have a fixed restart strategy)
we notice that the checkpoints start failing continuously
with this message :
Checkpoint was declined (tasks not ready)

Inline image 1

But we see the job is running, its processing data, the
accumulators we have are getting incremented etc but
checkpointing fails with tasks not ready message.

Wanted to reach out to the community to see if anyone
else has experienced this issue before?
~
Karthik










Re: Checkpoint was declined (tasks not ready)

2017-10-23 Thread Maciek Próchniak
we also have similar problem - it happens really often when we invoke 
async operators (ordered one). But we also observe that job is not 
starting properly - we don't process any data when such problems appear


we'll keep you posted if we manage to find exact cause...


thanks,
maciek

On 09/10/2017 12:10, Karthik Deivasigamani wrote:

Hi Stephan,
Once the job restarts due to an async io operator timeout we 
notice that its checkpoints never succeed again. But the job is 
running fine and is processing data.

~
Karthik


On Mon, Oct 9, 2017 at 3:19 PM, Stephan Ewen > wrote:


As long as this does not appear all the time, but only once in a
while, it should not be a problem.
It simply means that this particular checkpoint could not be
triggered, because some sources were not ready yet.

It should try another checkpoint and then be okay.


On Fri, Oct 6, 2017 at 4:53 PM, Karthik Deivasigamani
> wrote:

We are using Flink 1.3.1 in Standalone mode with a HA job
manager setup.
~
Karthik

On Fri, Oct 6, 2017 at 8:22 PM, Karthik Deivasigamani
> wrote:

Hi,
I'm noticing a weird issue with our flink streaming
job. We use async io operator which makes a HTTP call and
in certain cases when the async task times out, it throws
an exception and causing the job to restart.

java.lang.Exception: An async function call terminated with an 
exception. Failing the AsyncWaitOperator.
at 
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:136)
at 
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.TimeoutException: Async function call has timed out.
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture.get(FlinkFuture.java:110)


After the job restarts(we have a fixed restart strategy)
we notice that the checkpoints start failing continuously
with this message :
Checkpoint was declined (tasks not ready)

Inline image 1

But we see the job is running, its processing data, the
accumulators we have are getting incremented etc but
checkpointing fails with tasks not ready message.

Wanted to reach out to the community to see if anyone else
has experienced this issue before?
~
Karthik








Re: ANN: TouK Nussknacker - designing Flink processes with GUI

2017-09-04 Thread Maciek Próchniak

Hi Flavio,

we have rather modest goals - so currently we don't plan to handle batch 
(although in theory it could be done). We also don't even think about 
visualizing and editing existing programs - in generality this could be 
pretty difficult.


We only let users define pretty simple diagrams (in particular they have 
to be trees, not DAGs) to be able to handle them properly (although we 
do have mechanism to plug-in custom Flink code - it just have to be 
coded separately as part of the model)



thanks,

maciek


On 04/09/2017 15:43, Flavio Pompermaier wrote:
Thanks for sharing this nice tool maciek. Does it handle both batch 
and Streaming? Is it able to visualize also an existing Flink program?


Best,
Flavio

On Mon, Sep 4, 2017 at 3:03 PM, Maciek Próchniak <m...@touk.pl 
<mailto:m...@touk.pl>> wrote:


Hello,

we would like to announce availability of TouK Nussknacker - tool
for creating Flink processes with GUI.


It's our attempt to bring Flink a bit closer to analysts and
business people - by letting them design processes with MS
Visio-like tool ;)

Of course not every Flink process can be created by drawing
diagrams, but we wanted to make it possible for analyst to design
process that does filtering, enrichment or simple aggregations.

Our idea is pretty simple:

- first, data model and additional services have to be developed
and packaged into jar (this is the jar that is used by Flink jobs)
- this is the part where coding skills are required

- users can create new processes by drawing diagrams - the nodes
in toolbox are based on the data model. This includes e.g.
creating filters with SPEL (Spring Expression Language) expressions.

- the process is deployed to Flink cluster as jar with model and
diagram serialized as JSON

We make it easier to work with our diagrams by integration with
Grafana (each process registers quite detailed metrics about it's
behaviour), letting users test their processes via GUI with Flink
mini-cluster and various other goodies (subprocesses, basic code
completion, export to PDF etc.)


So far we have one quite large (>30 processes, >100k/s total
throughput in all processes) production deployment and a few more
in progress. Our main use case so far is RTM (Real Time Marketing)
and fraud detection in telco, banking and similar industries.

We think that our tool can be of interest mainly for more
traditional enterprises, that do not have large development teams,
but have competent analysts in their BI departments.


Nussknacker is released under ASL 2.0 (however we use React
heavily so beware of patents clause...)

Of course, we'd like to hear your opinions.


The code is here: https://github.com/touk/nussknacker
<https://github.com/touk/nussknacker>

The quickstart is here:
https://touk.github.io/nussknacker/Quickstart.html
<https://touk.github.io/nussknacker/Quickstart.html>

And our blog post is here:

https://touk.pl/blog/2017/09/04/touk-nussknacker-using-apache-flink-made-easier-for-analysts-and-business/

<https://touk.pl/blog/2017/09/04/touk-nussknacker-using-apache-flink-made-easier-for-analysts-and-business/>

I'll also be talking about Nussknacker next week at Flink Forward
-

https://berlin.flink-forward.org/kb_sessions/touk-nussknacker-creating-flink-jobs-with-gui/

<https://berlin.flink-forward.org/kb_sessions/touk-nussknacker-creating-flink-jobs-with-gui/>
- hope to see you there :)


thanks,

maciek próchniak

TouK







ANN: TouK Nussknacker - designing Flink processes with GUI

2017-09-04 Thread Maciek Próchniak

Hello,

we would like to announce availability of TouK Nussknacker - tool for 
creating Flink processes with GUI.



It's our attempt to bring Flink a bit closer to analysts and business 
people - by letting them design processes with MS Visio-like tool ;)


Of course not every Flink process can be created by drawing diagrams, 
but we wanted to make it possible for analyst to design process that 
does filtering, enrichment or simple aggregations.


Our idea is pretty simple:

- first, data model and additional services have to be developed and 
packaged into jar (this is the jar that is used by Flink jobs) - this is 
the part where coding skills are required


- users can create new processes by drawing diagrams - the nodes in 
toolbox are based on the data model. This includes e.g. creating filters 
with SPEL (Spring Expression Language) expressions.


- the process is deployed to Flink cluster as jar with model and diagram 
serialized as JSON


We make it easier to work with our diagrams by integration with Grafana 
(each process registers quite detailed metrics about it's behaviour), 
letting users test their processes via GUI with Flink mini-cluster and 
various other goodies (subprocesses, basic code completion, export to 
PDF etc.)



So far we have one quite large (>30 processes, >100k/s total throughput 
in all processes) production deployment and a few more in progress. Our 
main use case so far is RTM (Real Time Marketing) and fraud detection in 
telco, banking and similar industries.


We think that our tool can be of interest mainly for more traditional 
enterprises, that do not have large development teams, but have 
competent analysts in their BI departments.



Nussknacker is released under ASL 2.0 (however we use React heavily so 
beware of patents clause...)


Of course, we'd like to hear your opinions.


The code is here: https://github.com/touk/nussknacker

The quickstart is here: https://touk.github.io/nussknacker/Quickstart.html

And our blog post is here: 
https://touk.pl/blog/2017/09/04/touk-nussknacker-using-apache-flink-made-easier-for-analysts-and-business/


I'll also be talking about Nussknacker next week at Flink Forward - 
https://berlin.flink-forward.org/kb_sessions/touk-nussknacker-creating-flink-jobs-with-gui/ 
- hope to see you there :)



thanks,

maciek próchniak

TouK



Re: Running job in "dry mode"?

2017-06-07 Thread Maciek Próchniak



On 07/06/2017 10:27, Maciek Próchniak wrote:




On 07/06/2017 10:07, Tzu-Li (Gordon) Tai wrote:

Hi Maciek,

Is there any particular reason why you do not wish to start running 
the Kafka sources on the test run?
Otherwise, it would be perfectly fine to start the test job for 
testing to see if everything works, and keep that savepoint 
eventually for the non-dry run.


well, I want to make sure I don't interfere with currently running, 
production process. While I could use different consumer I certainly 
don't want to have events emitted. Although it may work if I have some 
dummy sinks... I'll think about that...


Also, what our integration tests for migrating across Flink versions 
typically do is have some dummy collection source (`fromElements`) 
for the test job.
yes, I also tried that. Unfortunately I encountered a problem: when I 
replace kafka source with collection source the state becomes 
incompatible, because collection source has different state than kafka 
one...


thanks,
maciek


Cheers,
Gordon

On 7 June 2017 at 7:34:25 AM, Maciek Próchniak (m...@touk.pl 
<mailto:m...@touk.pl>) wrote:



Hello,

I'd like to be able to see if new version of my job is compatible with
the old one.

I can make a savepoint and run new version from that, but I'd like 
to be
able to do it without actually starting sources and so on - so that 
e.g.

it won't start to read from my kafka topics.

Of course I can do it by substituting configuration values, or running
without network access - but this seems a bit mundane and error-prone.

Do you know about any ways to achieve this?

thanks,

maciek







Running job in "dry mode"?

2017-06-06 Thread Maciek Próchniak

Hello,

I'd like to be able to see if new version of my job is compatible with 
the old one.


I can make a savepoint and run new version from that, but I'd like to be 
able to do it without actually starting sources and so on - so that e.g. 
it won't start to read from my kafka topics.


Of course I can do it by substituting configuration values, or running 
without network access - but this seems a bit mundane and error-prone.


Do you know about any ways to achieve this?

thanks,

maciek



Re: Graphite reporter recover from broken pipe

2017-02-01 Thread Maciek Próchniak
Starting with flink 1.2 it's possible to use UDP transport for graphite 
- I think it can be good workaround if you can listen on UDP port on 
your graphite installation


thanks,

maciek


On 01/02/2017 13:22, Philipp Bussche wrote:

Hi there,
after moving my graphite service to another host my task manager does not
recover monitoring and continues to complain about a broken pipe issue. It
sounds a bit like this one:
https://github.com/dropwizard/metrics/pull/1036
What do I need to do to update dropwizard to a 3.1.x version to incorporate
this change ?
Is this version maybe already part of Flink 1.2 ?

Thanks
Philipp



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Graphite-reporter-recover-from-broken-pipe-tp11391.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





accessing flink HA cluster with scala shell/zeppelin notebook

2017-01-22 Thread Maciek Próchniak

Hi,

I have standalone Flink cluster configured with HA setting (i.e. with 
zookeeper recovery). How should I access it remotely, e.g. with Zeppelin 
notebook or scala shell?


There are settings for host/port, but with HA setting they are not fixed 
- if I check which is *current leader* host and port and set that I get 
exception on job manager:


20:36:38.237 [flink-akka.actor.default-dispatcher-22704] WARN 
o.a.f.runtime.jobmanager.JobManager - Discard message 
LeaderSessionMessage(null,SubmitJob(JobGraph(jobId: 
02a3a43464b2a750e04855d4c0b6fecb),EXECUTION_RESULT_AND_STATE_CHANGES)) 
because the expected leader session ID 
Some(1a4e9d39-2c59-45bb-b81c-d867bec1958d) did not equal the received 
leader session ID None.


- I guess it's reasonable behaviour, since I should use appropriate 
LeaderRetrievalService and so on. But apparently there's no such 
possibility in scala flink shell?


Is it missing feature? I can prepare patch, but I'm not sure how would I 
hook behaviour of ClusterClient into FlinkILoop?


thanks,

maciek



Re: ContinuousFileMonitoringFunction - deleting file after processing

2016-11-26 Thread Maciek Próchniak

Hi Kostas,

I didn't see any discussion on dev mailing list, so I'd like to share 
our problems/solutions (we had a busy month...;)


1. we refactored ContinuousFileMonitoringFunction so that state includes 
not only lastModificationTime, but also list of files that have exactly 
this modification time. This way we're sure that we don't loose any 
files that appear later with same modification time. It turned out that 
for local file system this is quite important, as modificationTime in 
java can have one second resolution (see e.g. 
http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java 
- we learned it the hard way...)


2. we are able to safely delete files in following way:
- in ContinuousFileReaderOperator we emit additional marker event 
after end of split

- the split contains information how many splits are in the file
- we added additional operator of parallelism 1 after 
ContinuousFileReaderOperator which tracks additional events so that it 
knows when all splits from file has been processed and deletes finished 
files after appropriate checkpoints have been completed.


If you & other committers find these ideas ok, I can prepare jiras and 
pull requests. While the first point is pretty straightforward IMHO, I'd 
like to get some feedback one the second one.


thanks,
maciek

On 18/10/2016 11:52, Kostas Kloudas wrote:

Hi Maciek,

I agree with you that 1ms is often too long :P

This is the reason why I will open a discussion to have
all the ideas/ requirements / shortcomings in a single place.
This way the community can track and influence what
is coming next.

Hopefully I will do it in the afternoon and I will send you
the discussion thread.

Cheers,
Kostas


On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <m...@touk.pl> wrote:

Hi Kostas,

thanks for quick answer.

I wouldn't dare to delete files in InputFormat if they were splitted and 
processed in parallel...

As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty 
interesting, I'll try to try it out. Although I wonder a bit if relying only on 
modification timestamp is enough - many things may happen in one ms :)

thanks,

macie


On 18/10/2016 11:14, Kostas Kloudas wrote:

Hi Maciek,

Just a follow-up on the previous email, given that splits are read in parallel, 
when the
ContinuousFileMonitoringFunction forwards the last split, it does not mean that 
the
final splits is going to be processed last. If the node it gets assigned is 
fast enough
then it may be processed faster than others.

This assumption only holds if you have a parallelism of 1.

Cheers,
Kostas


On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
wrote:

Hi Maciek,

Currently this functionality is not supported but this seems like a good 
addition.
Actually, give that the feature is rather new, we were thinking of opening a 
discussion
in the dev mailing list in order to

i) discuss some current limitations of the Continuous File Processing source
ii) see how people use it and adjust our features accordingly

I will let you know as soon as I open this thread.

By the way for your use-case, we should probably have a callback in the 
notifyCheckpointComplete()
that will inform the source that a given checkpoint was successfully performed 
and then
we can purge the already processed files. This can be a good solution.

Thanks,
Kostas


On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <m...@touk.pl> wrote:

Hi,

we want to monitor hdfs (or local) directory, read csv files that appear and 
after successful processing - delete them (mainly not to run out of disk 
space...)

I'm not quite sure how to achieve it with current implementation. Previously, 
when we read binary data (unsplittable files) we made small hack and deleted 
them

in our FileInputFormat - but now we want to use splits and detecting which 
split is 'the last one' is no longer so obvious - of course it's also 
problematic when it comes to checkpointing...

So my question is - is there a idiomatic way of deleting processed files?


thanks,

maciek







Re: window-like use case

2016-11-04 Thread Maciek Próchniak

Hi Aljoscha,

I know it's tricky...

Few weeks ago we decided to implement it without windows, using just 
stateful operator and some queues/map per key as state - so yeah, we 
tried to imagine how to do this in plain java and one stream ;)


We also process watermarks to evict old events. Fortunately, our streams 
are not that big and we can keep all "recent" events in state - without 
preaggregation.


Currently we're waiting for some feedback from our client on results - 
if it's ok, we'll stick with that, otherwise we'll have to look into it 
deeper...


thanks,

maciek


On 25/10/2016 16:41, Aljoscha Krettek wrote:

Hi Maciek,
cases like this, where you essentially want to evict elements that are 
older than a certain threshold while keeping a count of those elements 
that are not older than that threshold tend to be quite tricky.


In order to start thinking about this, how would you implement this 
case in a non-parallel way, in plain Java. You have the stream of 
incoming events, they possibly have timestamps, they are possibly not 
ordered by that timestamp (this depends on your use case). Now, what 
are the algorithms/data structures that could be used for computing 
the result that you require?


Cheers,
Aljoscha

On Fri, 23 Sep 2016 at 10:50 Claudia Wegmann <c.wegm...@kasasi.de 
<mailto:c.wegm...@kasasi.de>> wrote:


Hey,

I'm no expert at all, but for me this sounds like a use case for
Complex Event Processing (CEP). I don't know if you're aware of
Flinks CEP Library [1, 2]? Maybe that solves your problem of
multiple firings. But best to wait for the experts to answer your
questions on handling state and firing windows :)

Best,
Claudia

[1]: https://flink.apache.org/news/2016/04/06/cep-monitoring.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html


-Ursprüngliche Nachricht-
    Von: Maciek Próchniak [mailto:m...@touk.pl <mailto:m...@touk.pl>]
Gesendet: Freitag, 23. September 2016 10:36
An: user@flink.apache.org <mailto:user@flink.apache.org>
Betreff: window-like use case

Hi,

in our project we're dealing with a stream of billing events. Each
has customerId and charge amount We want to have a process that
will trigger event (alarm) when sum of charges for customer during
last 4 hours exceeds certain threshold, say
- 10.
The triggered event should contain data from last billing event
(the one that triggered alarm)

One one hand we can implement it as custom state - we'd save
charges (or some precomputed aggregates) from last 4 hours and
trigger event when new one arrives.
OTOH we've been thinking if we can make it easier by using flink
windows.
We tried to model our situation as sliding events (of length
4hours, slide is 1h), have some precomputed aggregate and have a
custom trigger that fires on element when threshold is exceeded.
It kinda works, except for the fact that state is unnecesarily
large, custom trigger is a bit hacky and (worst of all) when event
with charge amount e.g. 20 arrives trigger fires in all slides and
we have duplicated events.
That's why we currently think about implementing it by custom state...

Do you have any other ideas/recommendations how can we handle such
requirement?

thanks,
maciek





Re: ContinuousFileMonitoringFunction - deleting file after processing

2016-10-18 Thread Maciek Próchniak

Hi Kostas,

thanks for quick answer.

I wouldn't dare to delete files in InputFormat if they were splitted and 
processed in parallel...


As for using notifyCheckpointComplete - thanks for suggestion, it looks 
pretty interesting, I'll try to try it out. Although I wonder a bit if 
relying only on modification timestamp is enough - many things may 
happen in one ms :)


thanks,

macie


On 18/10/2016 11:14, Kostas Kloudas wrote:

Hi Maciek,

Just a follow-up on the previous email, given that splits are read in parallel, 
when the
ContinuousFileMonitoringFunction forwards the last split, it does not mean that 
the
final splits is going to be processed last. If the node it gets assigned is 
fast enough
then it may be processed faster than others.

This assumption only holds if you have a parallelism of 1.

Cheers,
Kostas


On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
wrote:

Hi Maciek,

Currently this functionality is not supported but this seems like a good 
addition.
Actually, give that the feature is rather new, we were thinking of opening a 
discussion
in the dev mailing list in order to

i) discuss some current limitations of the Continuous File Processing source
ii) see how people use it and adjust our features accordingly

I will let you know as soon as I open this thread.

By the way for your use-case, we should probably have a callback in the 
notifyCheckpointComplete()
that will inform the source that a given checkpoint was successfully performed 
and then
we can purge the already processed files. This can be a good solution.

Thanks,
Kostas


On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <m...@touk.pl> wrote:

Hi,

we want to monitor hdfs (or local) directory, read csv files that appear and 
after successful processing - delete them (mainly not to run out of disk 
space...)

I'm not quite sure how to achieve it with current implementation. Previously, 
when we read binary data (unsplittable files) we made small hack and deleted 
them

in our FileInputFormat - but now we want to use splits and detecting which 
split is 'the last one' is no longer so obvious - of course it's also 
problematic when it comes to checkpointing...

So my question is - is there a idiomatic way of deleting processed files?


thanks,

maciek







ContinuousFileMonitoringFunction - deleting file after processing

2016-10-18 Thread Maciek Próchniak

Hi,

we want to monitor hdfs (or local) directory, read csv files that appear 
and after successful processing - delete them (mainly not to run out of 
disk space...)


I'm not quite sure how to achieve it with current implementation. 
Previously, when we read binary data (unsplittable files) we made small 
hack and deleted them


in our FileInputFormat - but now we want to use splits and detecting 
which split is 'the last one' is no longer so obvious - of course it's 
also problematic when it comes to checkpointing...


So my question is - is there a idiomatic way of deleting processed files?


thanks,

maciek



window-like use case

2016-09-23 Thread Maciek Próchniak

Hi,

in our project we're dealing with a stream of billing events. Each has 
customerId and charge amount
We want to have a process that will trigger event (alarm) when sum of 
charges for customer during last 4 hours exceeds certain threshold, say 
- 10.
The triggered event should contain data from last billing event (the one 
that triggered alarm)


One one hand we can implement it as custom state - we'd save charges (or 
some precomputed aggregates) from last 4 hours and trigger event when 
new one arrives.

OTOH we've been thinking if we can make it easier by using flink windows.
We tried to model our situation as sliding events (of length 4hours, 
slide is 1h), have some precomputed aggregate and have a custom trigger 
that fires on element when threshold is exceeded.
It kinda works, except for the fact that state is unnecesarily large, 
custom trigger is a bit hacky and (worst of all) when event with charge 
amount e.g. 20 arrives trigger fires in all slides and we have 
duplicated events.

That's why we currently think about implementing it by custom state...

Do you have any other ideas/recommendations how can we handle such 
requirement?


thanks,
maciek


Re: Enriching events with data from external http resources

2016-08-17 Thread Maciek Próchniak

Hi Ufuk,

thanks for info - this is good news :)

maciek


On 16/08/2016 12:16, Ufuk Celebi wrote:

On Mon, Aug 15, 2016 at 8:52 PM, Maciek Próchniak <m...@touk.pl> wrote:

I know it's not really desired way of using flink and that it would be
better to keep data as state inside stream and have it updated by some join
operator, but for us it's a bit of overkill - what's more, we have many (not
so large) streams, it would be not really feasible to keep all of state
(which is the same) in each of them.

Hey Maciek! The points you raise all make sense and there is work in
progress to provide better support for these use cases:
https://issues.apache.org/jira/browse/FLINK-4391

The general idea would be to have something like a multi threaded flat
map function that dispatches the requests to a thread pool (it's like
"virtually" increasing the parallelism as you do now). This is pretty
straight forward to implement if you don't need to worry about fault
tolerance for now. Integrating this with checkpointing is a little
more involved and will be addressed as part of the linked issue.





Enriching events with data from external http resources

2016-08-15 Thread Maciek Próchniak

Hi,

Our data streams do some filtering based on data from external http 
resources (not maintained by us, they're really fast with redis as storage).


So far we did that by just invoking synchronously some http client in 
map/flatMap operations. It works without errors but it seems somehow 
inefficient to have to use degree of paralellism just to wait for 
blocking http call - especially when you think about all recent 
developments of fast async clients and so on. I was wondering if there 
is some way of invoking http (or other external) service in non-blocking 
way.


I know it's not really desired way of using flink and that it would be 
better to keep data as state inside stream and have it updated by some 
join operator, but for us it's a bit of overkill - what's more, we have 
many (not so large) streams, it would be not really feasible to keep all 
of state (which is the same) in each of them.


Are there any patterns/ways - existing or planned of dealing with such 
situation?


thanks,

maciek



Re: checkpoints not being removed from HDFS

2016-05-13 Thread Maciek Próchniak

Hi Ufuk,

It seems I messed it up a bit :)
I cannot comment on jira, since it's temporarily locked...

1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty - this seems to be expected behaviour, as 
AbstractFileStateHandle.discardState():


// send a call to delete the checkpoint directory containing the file. 
This will

// fail (and be ignored) when some files still exist
try {
   getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {}

- so this is working as expected, although it causes a lot of garbage in 
hdfs logs...


2. The problem with not discarded checkpoints seems to be related to 
periods when we don't have any traffic (during night).

At that point many checkpoints "expire before completing":
2016-05-13 00:00:10,585 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 199 @ 1463090410585
2016-05-13 00:10:10,585 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Checkpoint 199 expired before completing.
2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300] 
WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Received late message for now expired checkpoint attempt 199


When checkpoint manage to complete they take v. long to do so:
2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 201 (in 308472 ms)


- this is happening when no new messages arrive (we have simple process 
like kafka->keyBy->custom state aggregation->kafka, with EventTime time 
characteristic)
I think I messed sth up with eventTime & generating watermarks - I'll 
have to check it.
With RocksDB I made checkpoints at much larger intervals, so probably 
that's why I haven't noticed the disk is getting full.

OTOH - shouldn't expired checkpoints be cleaned up automatically?


Sorry for confustion and thanks for help

thanks,
maciek


On 12/05/2016 21:28, Maciek Próchniak wrote:

thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:

The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <u...@apache.org> wrote:

Hey Maciek,

thanks for reporting this. Having files linger around looks like a 
bug to me.


The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and 
then fix it.


– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <m...@touk.pl> wrote:

Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded 
properly.

In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* 
addToInvalidates:

blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
10.10.113.9:49233

Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is 
non

empty': Directory is not empty
 at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) 


 at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) 



While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 
[flink-akka.actor.default-dispatcher-240088] INFO

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 
[flink-akka.actor.default-dispatcher-240028] INFO

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.Che

checkpoints not being removed from HDFS

2016-05-12 Thread Maciek Próchniak

Hi,

we have stream job with quite large state (few GB), we're using 
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded 
properly. In hadoop logs I can see:


2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server 
handler 9 on 8020, call 
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
10.10.113.9:49233 Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
empty': Directory is not empty
at 
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)


While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 64 @ 1462875682636


I see in the code that delete operations in flink are done with 
recursive flag set to false - but I'm not sure why the contents are not 
being deleted before?

When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek





Re: Threads waiting on LocalBufferPool

2016-04-22 Thread Maciek Próchniak



On 21/04/2016 16:46, Aljoscha Krettek wrote:

Hi,
I would be very happy about improvements to our RocksDB performance. 
What are the RocksDB Java benchmarks that you are running? In Flink, 
we also have to serialize/deserialize every time that we access 
RocksDB using our TypeSerializer. Maybe this is causing the slow down.



Hi Aljoscha,

I'm using benchmark from:
https://github.com/facebook/rocksdb/blob/master/java/jdb_bench.sh

My value is pretty simple scala case class - around 12 fields with 
Int/Long/String values - I think serialization shouldn't be a big 
problem. However I think I'll have to do more comprehensive tests to be 
sure I'm comparing apples to apples - hope to find time during weekend 
for that :)


thanks,
maciek

By the way, what is the type of value stored in the RocksDB state. 
Maybe the TypeSerializer for that value is very slow.


Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 16:41 Maciek Próchniak <m...@touk.pl 
<mailto:m...@touk.pl>> wrote:


Well...
I found some time to look at rocksDB performance.

It takes around 0.4ms to lookup value state and 0.12ms to update -
these are means, 95th percentile was > 1ms for get... When I set
additional options:
  .setIncreaseParallelism(8)
  .setMaxOpenFiles(-1)
.setCompressionType(CompressionType.SNAPPY_COMPRESSION)

I manage to get
0.05ms for update and 0.2ms for get - but still it seems pretty
bad compared to standard rocksdb java benchmarks that I try on the
same machine, as they are:
fillseq  : 1.23238 micros/op;   89.8 MB/s; 100 ops
done;  1 / 1 task(s) finished.
readrandom   : 9.25380 micros/op;   12.0 MB/s; 100 /
100 found;  1 / 1 task(s) finished.
fillrandom   : 4.46839 micros/op;   24.8 MB/s; 100 ops
done;  1 / 1 task(s) finished.

guess I'll have to look at it a bit more...

thanks anyway,
maciek



On 21/04/2016 08:41, Maciek Próchniak wrote:

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to
saturate the pool. After few minutes, periodically all kafka
threads were waiting for bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on
rocks we had 5min, now I tried 30s. .

I attach throughput metrics - the former (around 18) is with
increased heap & buffers, the latter (around 22) is with
FileSystemStateBackend.
My state is few GB large - during the test it reached around
2-3GB. I must admit I was quite impressed that checkpointing to
HDFS using FileSystem took only about 6-7s (with occasional
spikes to 12-13s, which can be seen on metrcs - didn't check if
it was caused by hdfs or sth else).

Now I looked at logs from 18 and seems like checkpointing rocksdb
took around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147]
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Completed checkpoint 6 (in 140588 ms)
- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to
see where is the problem with rocksDB... I'll write if I find
anything, but that won't be today I think...

Btw - I was looking at FS state and I wonder would it be feasible
to make variant of this state using immutable map (probably some
scala one) to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the
state map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is
often the case when writing in scala. WDYT?

thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the
network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run
your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to
files.
This would help in checking how much RocksDB is slowing things
down.


I'm curious about the results. Do you think you will have time
to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <m...@touk.pl>
<mailto:m...@touk.pl> wrote:

Hi,

Re: Threads waiting on LocalBufferPool

2016-04-21 Thread Maciek Próchniak

Well...
I found some time to look at rocksDB performance.

It takes around 0.4ms to lookup value state and 0.12ms to update - these 
are means, 95th percentile was > 1ms for get... When I set additional 
options:

  .setIncreaseParallelism(8)
  .setMaxOpenFiles(-1)
  .setCompressionType(CompressionType.SNAPPY_COMPRESSION)

I manage to get
0.05ms for update and 0.2ms for get - but still it seems pretty bad 
compared to standard rocksdb java benchmarks that I try on the same 
machine, as they are:
fillseq  : 1.23238 micros/op;   89.8 MB/s; 100 ops 
done;  1 / 1 task(s) finished.
readrandom   : 9.25380 micros/op;   12.0 MB/s; 100 / 100 
found;  1 / 1 task(s) finished.
fillrandom   : 4.46839 micros/op;   24.8 MB/s; 100 ops 
done;  1 / 1 task(s) finished.


guess I'll have to look at it a bit more...

thanks anyway,
maciek


On 21/04/2016 08:41, Maciek Próchniak wrote:

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting 
for bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks 
we had 5min, now I tried 30s. .


I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, 
which can be seen on metrcs - didn't check if it was caused by hdfs or 
sth else).


Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 6 (in 140588 ms)

- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, 
but that won't be today I think...


Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala 
one) to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the 
state map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is 
often the case when writing in scala. WDYT?


thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try 
this?


– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <m...@touk.pl> wrote:

Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple 
filtering job,

so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting 
for

buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
nid=0x8118 in Object.wait() [0x7f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) 


 - locked <0x0002eade3890> (a java.util.ArrayDeque)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 


 at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) 


Re: Threads waiting on LocalBufferPool

2016-04-21 Thread Maciek Próchniak

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting for 
bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks we 
had 5min, now I tried 30s. .


I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, which 
can be seen on metrcs - didn't check if it was caused by hdfs or sth else).


Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 6 (in 140588 ms)

- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, but 
that won't be today I think...


Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala one) 
to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the state 
map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is often 
the case when writing in scala. WDYT?


thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <m...@touk.pl> wrote:

Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering job,
so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
nid=0x8118 in Object.wait() [0x7f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
 - locked <0x0002eade3890> (a java.util.ArrayDeque)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
 at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
 - locked <0x0002eb73cbd0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
 at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at
org.apache.flink.streaming.api.scala.DataSt

Re: streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Maciek Próchniak

Thanks,

that makes sense...
Guess I'll try some dirty workaround for now by interrupting consumer 
thread if it's doesn't finish after some time...


maciek

On 09/03/2016 14:42, Stephan Ewen wrote:

Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595

On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org 
<mailto:se...@apache.org>> wrote:


Hi!

Thanks for the debugging this, I think there is in fact an issue
in the 0.9 consumer.

I'll open a ticket for it, will try to fix that as soon as possible...

Stephan


On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <m...@touk.pl
<mailto:m...@touk.pl>> wrote:

Hi,

from time to time when we cancel streaming jobs (or they are
failing for some reason) we encounter:

2016-03-09 10:25:29,799 [Canceler for Source: read objects
from topic: (...) ' did not react to cancelling signal, but is
stuck in method:
 java.lang.Object.wait(Native Method)
java.lang.Thread.join(Thread.java:1253)

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)

org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)


Now, relevant stacktrace is this:

"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... 
nid=0x2e96 in Object.wait() [0x7f2bac847000]

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at

org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- locked <0x00041ae00180> (a java.util.ArrayDeque)
at

org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at

org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x0004be0002f0> (a

org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at

org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at

org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at scala.collection.immutable.List.foreach(List.scala:381)
at

org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at

org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at

org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyin

streaming job reading from kafka stuck while cancelling

2016-03-09 Thread Maciek Próchniak

Hi,

from time to time when we cancel streaming jobs (or they are failing for 
some reason) we encounter:


2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic: 
(...) ' did not react to cancelling signal, but is stuck in method:

 java.lang.Object.wait(Native Method)
java.lang.Thread.join(Thread.java:1253)
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:745)


Now, relevant stacktrace is this:

"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=...  nid=0x2e96 in 
Object.wait() [0x7f2bac847000]

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)

- locked <0x00041ae00180> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x0004be0002f0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)

at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)

- locked <0x00041ae001c8> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)

- locked <0x00041ae001c8> (a java.lang.Object)

and also:
"OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x7f2a39d4e800 
nid=0x2e7d waiting for monitor entry [0x7f2a3e5e4000]

   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
- waiting to lock <0x0004be0002f0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)



- 

Re: rebalance of streaming job after taskManager restart

2016-03-08 Thread Maciek Próchniak

Hi,

thanks for quick answer - yes, I does what I want to accomplish,
but I was hoping for some "easier" solution.
Are there any plans for "restart" button/command or sth similar? I mean, 
the whole process of restarting is ready as I understand - as it's 
triggered when task manager dies.


thanks,
maciek

On 08/03/2016 16:03, Aljoscha Krettek wrote:

Hi,
I think what you can do is make a savepoint of your program, then cancel it and 
restart it from the savepoint. This should make Flink redistribute it on all 
TaskManagers.

See 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
and
https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html#savepoints
for documentation about savepoints.

The steps to follow should be:
  
bin/flink savepoint 


this will print a savepoint path that you will need later.
  
bin/flink cancel 


bin/flink run -s  …

The last command is your usual run command but with the additional “-s” 
parameter to continue from a savepoint.

I hope that helps.

Cheers,
Aljoscha

On 08 Mar 2016, at 15:48, Maciek Próchniak <m...@touk.pl> wrote:

Hi,

we have streaming job with paralelism 2 and two task managers. The job is 
occupying one slot on each task manager. When I stop manager2 the job is 
restarted and it runs on manager1 - occupying two of it's slots.
How can I trigger restart (or other similar process) that will cause the job to 
be balanced among task managers?

thanks,
maciek






rebalance of streaming job after taskManager restart

2016-03-08 Thread Maciek Próchniak

Hi,

we have streaming job with paralelism 2 and two task managers. The job 
is occupying one slot on each task manager. When I stop manager2 the job 
is restarted and it runs on manager1 - occupying two of it's slots.
How can I trigger restart (or other similar process) that will cause the 
job to be balanced among task managers?


thanks,
maciek