Testing Flink job with bounded input

2024-01-18 Thread Jan Lukavský

Hi,

I have a question about how to correctly set up a test that will read 
input from locally provided collection in bounded mode and provide 
outputs at the end of the computation. My test case looks something like 
the following:


String[] lines = ...;
try (StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()) {
  env.setRuntimeMode(RuntimeExecutionMode.BATCH); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream input = 
env.fromCollection(Arrays.asList(lines)); // the following includes keyBy().window().aggregate() 
DataStream>> output = TopWords.getTopWords(input, 5); // sink that collects outputs 
to (static) list SinkFunction>> sink = TestSink.create(); output.addSink(sink); 
env.execute(); List> output =
  
TestSink.outputs(sink).stream().flatMap(Streams::stream).collect(Collectors.toList());
 // output is empty }

When I remove any window() and aggregations I can see outputs at the 
sink. I tried create manual trigger for the window, I can see 
onElement() methods called, but onEventTime() is never called. Also the 
FromElementsFunction never explicitly emits max watermark at the end 
reading the input. Seems that Flink runtime also does not emit the final 
watermark (I cannot see any traces of any watermark emission in the 
logs). Seems that I'm doing something obviously wrong, I just cannot 
figure out how to emit the final watermark. Adding autoWatermarkInterval 
did not help either.


The test seems to behave the same for Flink versions 1.16.3 and 1.18.1.

Thanks in advance for any pointers.

 Jan


Re: Flink TCP custom source - secured server socket

2023-07-02 Thread Jan Lukavský

Hi,

in plain TCP socket, there is no 'state' you can return to when 
restoring from checkpoint. All you can do is reopening the socket. You 
would have to ensure fault-tolerance by some (custom) higher-level 
protocol you implement, including persistent storage, replication, etc. 
You will soon see you are actually implementing a commit-log like Apache 
Kafka, so it might be better to use Kafka directly. If your clients need 
to send data over plain TCP socket, you can receive it over TCP (with a 
custom TCP server application), store to Kafka and use Kafka as a data 
source for your Flink application.


Best,

 Jan

On 7/1/23 05:20, Kamal Mittal wrote:


Hello,

Thanks for your suggestion but please confirm below.

Is it the case that TCP socket source job can’t be restored from last 
checkpoint?


Rgds,

Kamal

*From:*Jan Lukavský 
*Sent:* 29 June 2023 06:18 PM
*To:* user@flink.apache.org
*Subject:* Re: Flink TCP custom source - secured server socket

> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)

On 6/29/23 14:45, Jan Lukavský wrote:

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink
application's jar (not recommended, your service's private key
will have to be not exactly "private")
 b) create a service which will provide certificate for your
service during runtime (e.g. ACME based or similar)

I have a different note, though. Flink (or for that matters any
streaming engine, I'm more focused on Apache Beam) heavily relies
on the ability of sources to restore a state backward in
(processing) time. That is definitely not the case of a plain TCP
socket. It is likely you will experience data-loss issues with
this solution (regardless of security). This might be okay for
you, I just felt it would be good to stress this out.

Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:

Hello Community,

I have created TCP stream custom source and reading data from
TCP stream source.

But this TCP connection needs to be secured i.e. SSL based,
query is how to configure/provide certificates via Flink for
Client-Server secured TCP connection?

Rgds,

Kamal


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

> ... a state backward in (processing) time ...
(of course not processing, I meant to say event time)

On 6/29/23 14:45, Jan Lukavský wrote:


Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will 
have to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on 
the ability of sources to restore a state backward in (processing) 
time. That is definitely not the case of a plain TCP socket. It is 
likely you will experience data-loss issues with this solution 
(regardless of security). This might be okay for you, I just felt it 
would be good to stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Re: Flink TCP custom source - secured server socket

2023-06-29 Thread Jan Lukavský

Hi Kamal,

you probably have several options:

 a) bundle your private key and certificate into your Flink 
application's jar (not recommended, your service's private key will have 
to be not exactly "private")
 b) create a service which will provide certificate for your service 
during runtime (e.g. ACME based or similar)


I have a different note, though. Flink (or for that matters any 
streaming engine, I'm more focused on Apache Beam) heavily relies on the 
ability of sources to restore a state backward in (processing) time. 
That is definitely not the case of a plain TCP socket. It is likely you 
will experience data-loss issues with this solution (regardless of 
security). This might be okay for you, I just felt it would be good to 
stress this out.


Best,

 Jan

On 6/29/23 12:53, Kamal Mittal via user wrote:


Hello Community,

I have created TCP stream custom source and reading data from TCP 
stream source.


But this TCP connection needs to be secured i.e. SSL based, query is 
how to configure/provide certificates via Flink for Client-Server 
secured TCP connection?


Rgds,

Kamal


Re: Watermark in global commit

2023-02-14 Thread Jan Lukavský

Hi,

I'm not expert on Flink specifially, but your approach might be easier 
solve when broken down into two steps - create a "stable" input to 
downstream processing, this might include a specific watermark. In 
Flink, the "stability" of input for downstream processing is ensured by 
a checkpoint. You would therefore need to wait for a checkpoint, 
buffering intermediate data in a state (and produce a particular 
watermark as a data element, because watermarks in general need not be 
'stable'). Once a checkpoint is completed, you would flush the buffer 
for downstream operators, one would create the parquet files, the other 
would do whatever action needs to be taken based on the watermark. The 
checkpoint ensures that the two tasks would be eventually consistent (if 
this is sufficient for your case).


In Apache Beam, we call this operation a transform that 
'@RequiresStableInput' [1], the implementation in Flink is as I 
described above.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html


On 2/14/23 13:23, Tobias Fröhlich wrote:

Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

  1. Collection 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and
  2. void Committer::commit(Collection>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed 
in the global commit, it is possible to use a customized object that contains a field 
for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which 
does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

   1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
   2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
   3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich



Re: beam + flink + k8

2023-02-02 Thread Jan Lukavský
That would suggest it is uploading (you can verify that using jnettop). 
But I'd leave this open for others to answer, because now it is purely 
Flink (not Beam) question.


Best,

 Jan

On 2/2/23 10:46, bigdatadeveloper8 wrote:

Hi Jan,

Job manager is configured and working.. when I submit python Job to 
flink it's not showing or flink UI or simply hangs without any error.




Sent from my Galaxy


 Original message 
From: Jan Lukavský 
Date: 02/02/2023 15:07 (GMT+05:30)
To: user@flink.apache.org
Subject: Re: beam + flink + k8

I'm not sure how exactly minikube exposes the jobmanager, but in GKE 
you likely need to port-forward it, e.g.


 $ kubectl port-forward svc/flink-jobmanager 8081:8081

This should make jobmanager accessible via localhost:8081. For 
production cases you might want to use a different approach, like 
Flink operator, etc.


Best,

 Jan

On 2/1/23 17:08, P Singh wrote:

Hi Jan,

Thanks for the reply, I was able to submit the job to flink but it's 
failing due to an OOM issue so I am moving to the GKE. I got the 
flink UI there but submitted a job not appearing on flink UI. I am 
using the same script which I shared with you.. Do I need to make 
some changes for Google Kubernetes Environment?


On Tue, 31 Jan 2023 at 20:20, Jan Lukavský  wrote:

The script looks good to me, did you run the SDK harness?
External environment needs the SDK harness to be run externally,
see [1]. Generally, the best option is DOCKER, but that usually
does not work in k8s. For this, you might try PROCESS environment
and build your own docker image for flink, which will contain the
Beam harness, e.g. [2]. You will need to pass the environment
config using --environment_config={"command":
"/opt/apache/beam/boot"}.

From the screenshot it seems, that the Flink UI is accessible, so
this is the only option that comes to my mind. Did you check logs
of the Flink jobmanager pod?

 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2]

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie
with flink and minikube though i am trying to connect them by
script from local machine as suggested by flink kubernetes
documents link

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/>

I have changed the log level to ERROR but didn't find much...
Can you please help me out how to run the script from inside the
pod.

On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say
that the problem is that the flink jobmanager is not
accessible through localhost:8081, because it runs inside
the minikube. You need to expose it outside of the minikube
via [1], or run the script from pod inside the minikube and
access job manager via flink-jobmanager:8081. I'm surprised
that the log didn't make this more obvious, though. Is it
possible that you changed the default log level to ERROR?
Can you try DEBUG or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský
 wrote:

Hi,

can you please share the command-line and complete
output of the script?
Are you using minikube? Can you share list of your
running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my
local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running
but when i submit
> the job it's not reaching to the flink cluster and
getting failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting
job service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081',
'--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR

Re: beam + flink + k8

2023-02-02 Thread Jan Lukavský
I'm not sure how exactly minikube exposes the jobmanager, but in GKE you 
likely need to port-forward it, e.g.


 $ kubectl port-forward svc/flink-jobmanager 8081:8081

This should make jobmanager accessible via localhost:8081. For 
production cases you might want to use a different approach, like Flink 
operator, etc.


Best,

 Jan

On 2/1/23 17:08, P Singh wrote:

Hi Jan,

Thanks for the reply, I was able to submit the job to flink but it's 
failing due to an OOM issue so I am moving to the GKE. I got the flink 
UI there but submitted a job not appearing on flink UI. I am using the 
same script which I shared with you.. Do I need to make some changes 
for Google Kubernetes Environment?


On Tue, 31 Jan 2023 at 20:20, Jan Lukavský  wrote:

The script looks good to me, did you run the SDK harness? External
environment needs the SDK harness to be run externally, see [1].
Generally, the best option is DOCKER, but that usually does not
work in k8s. For this, you might try PROCESS environment and build
your own docker image for flink, which will contain the Beam
harness, e.g. [2]. You will need to pass the environment config
using --environment_config={"command": "/opt/apache/beam/boot"}.

From the screenshot it seems, that the Flink UI is accessible, so
this is the only option that comes to my mind. Did you check logs
of the Flink jobmanager pod?

 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2]

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie
with flink and minikube though i am trying to connect them by
script from local machine as suggested by flink kubernetes
documents link

<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/>

I have changed the log level to ERROR but didn't find much... Can
you please help me out how to run the script from inside the pod.

On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say that
the problem is that the flink jobmanager is not accessible
through localhost:8081, because it runs inside the minikube.
You need to expose it outside of the minikube via [1], or run
the script from pod inside the minikube and access job
manager via flink-jobmanager:8081. I'm surprised that the log
didn't make this more obvious, though. Is it possible that
you changed the default log level to ERROR? Can you try DEBUG
or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský 
wrote:

Hi,

can you please share the command-line and complete
output of the script?
Are you using minikube? Can you share list of your
running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my
local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running
but when i submit
> the job it's not reaching to the flink cluster and
getting failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job
service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081',
'--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error
bringing up service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Re: Non-temporal watermarks

2023-02-02 Thread Jan Lukavský

Hi,

I will not speak about details related to Flink specifically, the 
concept of watermarks is more abstract, so I'll leave implementation 
details aside.


Speaking generally, yes, there is a set of requirements that must be met 
in order to be able to generate a system that uses watermarks.


The primary question is what are watermarks used for? The answer is - we 
need watermarks to be able to define a partially stable order of 
_events_. Event is an immutable piece of data that can be _observed_ 
(i.e. processed) with various consumer-dependent delays (two consumers 
of the event can see the event at different processing times), or a 
specific (local) timestamp. Generally an event tells us that something, 
somewhere happened at given local timestamp.


Watermarks create markers in processing time of each observer, so that 
the observer is able to tell if two events (e.g. event "close 
time-window T1" and "new data with timestamp T2 arrived") can be ordered 
(that is being able to tell which one is - globally! - preceding the other).


Having said that - there is a general algebra for "timestamps" - and 
therefore watermarks. A timestamp can be any object that defines the 
following operations:


 - a less-than relation <, i.e. t1 < t2: bool, this relation needs to 
be a antisymmetric, so t1 < t2 implies not t2 < t1


 - a function min_timestamp_following(timestamp t1, duration): 
timestamp t2, that returns the minimal timestamp, for which t1 + 
duration < t2 (this function is actually a definition of duration)


These two conditions allows to construct a working streaming processing 
system, which means there should be no problem using different 
"timestamps", provided we know how to construct the above.


Using "a different number" for timestamps and watermarks seems valid in 
this sense, provided you are fine with the implicit definition of 
duration, that is currently defined as simple t2 - t1.


I tried to explain why it is not good to expect that two events can be 
globally ordered and what is the actual role of watermarks in this in a 
twitter thread [1], if anyone interested.


Best,

 Jan

[1] 
https://twitter.com/janl_apache/status/1478757956263071745?s=20=cMXfPHS8EjPrbF8jys43BQ


On 2/2/23 00:18, Yaroslav Tkachenko wrote:

Hey everyone,

I'm wondering if anyone has done any experiments trying to use 
non-temporal watermarks? For example, a dataset may contain some kind 
of virtual timestamp / version field that behaves just like a regular 
timestamp (monotonically increasing, etc.), but has a different scale 
/ range.


As far as I can see Flink assumes that the values used for event times 
and watermark generation are actually timestamps and the Table API 
requires you to define watermarks on TIMESTAMP columns.


Practically speaking timestamp is just a number, so if I have a 
"timeline" that consists of 1000 monotonically increasing integers, 
for example, the concepts like late-arriving 
data, bounded-out-of-orderness, etc. still work.


Thanks for sharing any thoughts you might have on this topic!


Re: beam + flink + k8

2023-01-31 Thread Jan Lukavský
The script looks good to me, did you run the SDK harness? External 
environment needs the SDK harness to be run externally, see [1]. 
Generally, the best option is DOCKER, but that usually does not work in 
k8s. For this, you might try PROCESS environment and build your own 
docker image for flink, which will contain the Beam harness, e.g. [2]. 
You will need to pass the environment config using 
--environment_config={"command": "/opt/apache/beam/boot"}.


From the screenshot it seems, that the Flink UI is accessible, so this 
is the only option that comes to my mind. Did you check logs of the 
Flink jobmanager pod?


 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile


On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie with 
flink and minikube though i am trying to connect them by script from 
local machine as suggested by flink kubernetes documents link 
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/>


I have changed the log level to ERROR but didn't find much... Can you 
please help me out how to run the script from inside the pod.


On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say that the
problem is that the flink jobmanager is not accessible through
localhost:8081, because it runs inside the minikube. You need to
expose it outside of the minikube via [1], or run the script from
pod inside the minikube and access job manager via
flink-jobmanager:8081. I'm surprised that the log didn't make this
more obvious, though. Is it possible that you changed the default
log level to ERROR? Can you try DEBUG or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský  wrote:

Hi,

can you please share the command-line and complete output of
the script?
Are you using minikube? Can you share list of your running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my local
machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running but
when i submit
> the job it's not reaching to the flink cluster and getting
failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job
service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081', '--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error bringing up
service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Re: beam + flink + k8

2023-01-31 Thread Jan Lukavský

Hi,

can you please share the also the script itself? I'd say that the 
problem is that the flink jobmanager is not accessible through 
localhost:8081, because it runs inside the minikube. You need to expose 
it outside of the minikube via [1], or run the script from pod inside 
the minikube and access job manager via flink-jobmanager:8081. I'm 
surprised that the log didn't make this more obvious, though. Is it 
possible that you changed the default log level to ERROR? Can you try 
DEBUG or similar?


 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský  wrote:

Hi,

can you please share the command-line and complete output of the
script?
Are you using minikube? Can you share list of your running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running but when i
submit
> the job it's not reaching to the flink cluster and getting
failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081', '--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error bringing up service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Re: beam + flink + k8

2023-01-30 Thread Jan Lukavský

Hi,

can you please share the command-line and complete output of the script? 
Are you using minikube? Can you share list of your running pods?


 Jan

On 1/30/23 14:25, P Singh wrote:

Hi Team,

I am trying to run beam job on top of flink on my local machine 
(kubernetes).


 I have flink 1.14 and beam 2.43 images both running but when i submit 
the job it's not reaching to the flink cluster and getting failed with 
below error.


ERROR:apache_beam.utils.subprocess_server:Starting job service with 
['java', '-jar', 
'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar', 
'--flink-master', 'http://localhost:8081', '--artifacts-dir', 
'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch', 
'--job-port', '57882', '--artifact-port', '0', '--expansion-port', '0']

ERROR:apache_beam.utils.subprocess_server:Error bringing up service
Traceback (most recent call last):
  File 
"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py", 
line 88, in start

    raise RuntimeError(
RuntimeError: Service failed to start up with error 1

Any help would be appreciated.


Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-02 Thread Jan Lukavský

-user@flink  as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the 
watermark stop progressing? Does that happen at any specific instant 
(e.g. end of window or end of window + allowed lateness)?


On 6/1/22 15:43, Gorjan Todorovski wrote:

Hi Jan,

I have not checked the harness log. I have now checked it *Apache Beam 
worker log) and found this, but currently not sure what it means:


2022/06/01 13:34:40 Python exited: 
2022/06/01 13:34:41 Python exited: 
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in 
_bootstrap_inner

    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 587, in 

    target=lambda: self._read_inputs(elements_iterator),
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 570, in _read_inputs

    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
416, in __next__

    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
803, in _next

    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
RPC that terminated with:

status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = 
"{"created":"@1654090485.252525992","description":"Error received from 
peer ipv4:127.0.0.1:44439 
<http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer 
hanging up","grpc_status":1}"

>

2022/06/01 13:34:45 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:47 Python exited: 
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', 
'--logging_endpoint=localhost:44267', 
'--artifact_endpoint=localhost:36413', 
'--provision_endpoint=localhost:42179', 
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', 
'--logging_endpoint=localhost:38683', 
'--artifact_endpoint=localhost:44867', 
'--provision_endpoint=localhost:34833', 
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', 
'--logging_endpoint=localhost:35391', 
'--artifact_endpoint=localhost:46571', 
'--provision_endpoint=localhost:44073', 
'--control_endpoint=localhost:44133']

Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský  wrote:

Hi Gorjan,

+user@beam <mailto:u...@beam.apache.org>

The trace you posted is just waiting for a bundle to finish in the
SDK harness. I would suspect there is a problem in the logs of the
harness. Did you look for possible errors there?

 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink Runner
(Basically a batch job on a Flink Session Cluster on Kubernetes)
version 1.13.6, but the job (for gathering stats) gets stuck.

There is nothing significant in the Job Manager or Task Manager
logs. The only thing that possibly might tell why the task is
stuck seems to be a thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at

java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at

java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at

org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
...
I use 32 parallel degrees. Task managers are set, so each TM runs
in one container with 1 CPU and a total process memory set to 20
GB. Each TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about 100
GB. If I run the pipeline with a smaller number of files to
process, it runs ok.
I need Flink to be able t

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-01 Thread Jan Lukavský

Hi Gorjan,

+user@beam 

The trace you posted is just waiting for a bundle to finish in the SDK 
harness. I would suspect there is a problem in the logs of the harness. 
Did you look for possible errors there?


 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses Apache 
Beam for data processing which in turn has a Flink Runner (Basically a 
batch job on a Flink Session Cluster on Kubernetes) version 1.13.6, 
but the job (for gathering stats) gets stuck.


There is nothing significant in the Job Manager or Task Manager logs. 
The only thing that possibly might tell why the task is stuck seems to 
be a thread dump:


"MapPartition (MapPartition at [14]{TFXIORead[train], 
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on 
java.util.concurrent.CompletableFuture$Signaller@6f078632

at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)

...
I use 32 parallel degrees. Task managers are set, so each TM runs in 
one container with 1 CPU and a total process memory set to 20 GB. Each 
TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about 100 GB. If 
I run the pipeline with a smaller number of files to process, it runs ok.
I need Flink to be able to process different amounts of data as it is 
able to scale by automatically adding pods depending on the parallel 
degree setting for the specific job (I set the parallel degree to the 
max(number of files,32))

Thanks,
Gorjan

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Jan Lukavský

Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so 
this might create a high pressure on state backend and/or heap, which 
could result in suboptimal performance. Due to the "connection loss" and 
timeout exceptions you describe I'd suppose there might be a lot of GC 
pressure.


 Jan

On 9/14/21 5:20 PM, Kathula, Sandeep wrote:


Hi,

We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). 
We have a fixed window of 5 minutes after conversion to 
PCollection and then writing to S3. We have around 320 
columns in our data. Our intention is to write large files of size 
128MB or more so that it won’t have a small file problem when reading 
back from Hive. But from what we observed it is taking too much memory 
to write to S3 (giving memory of 8GB to heap is not enough to write 50 
MB files and it is going OOM). When I increase memory for heap to 32GB 
then it take lot of time to write records to s3.


For instance it takes:

20 MB file - 30 sec

50 MB file - 1 min 16 sec

75 MB file - 2 min 15 sec

83 MB file - 2 min 40 sec

Code block to write to S3:

PCollection parquetRecord = …….

parquetRecord.apply(FileIO./write/()
    .via(ParquetIO./sink/(getOutput_schema()))
    .to(outputPath.isEmpty() ? outputPath() : outputPath)
    .withNumShards(5)
    .withNaming(new CustomFileNaming("snappy.parquet")));

We are also getting different exceptions like:

 1. *UserCodeException*:

**

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator


at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)


at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)


at java.lang.Iterable.forEach(Iterable.java:75)

at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

On 9/14/21 3:57 PM, David Morávek wrote:

Hi Jan,

Notion of completeness is just one part of the problem. The second 
part is that once you remove the Kafka topic, you are no longer able 
to replay the data in case of failure.


So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be 
done by checking watermark for that partition as you're suggesting)

2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N 
invalid)

Agree.


If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be 
performed by Flink (not by the 3rd party system as suggested in the 
original question) in the second phase of 2PC (when you're sure that 
you've successfully taken a checkpoint, that has seen all the data).


Agree, the deletion would have to be preceded by something like 
partition drain. What is needed is the watermark reaching end of global 
window (+inf) and a checkpoint. After that, the source can be removed 
and what happens with it is no concern any more. That applies to all 
sources in general. I don't know the implementation details, but it 
seems that the topic would have to be somehow marked as "draining", it 
would then be the responsibility of the reader to shift the watermark 
belonging to partitions of that topic to +inf. It would then be 
responsibility of Flink to verify that such source is removed only after 
a checkpoint is taken. Otherwise there would be possible risk of data loss.


This definitely looks like quite complex process.



Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi,

just out of curiosity, would this problem be solvable by the
ability to
remove partitions, that declare, that do not contain more data
(watermark reaching end of global window)? There is probably another
problem with that topic can be recreated after being deleted, which
could result in watermark moving back in time, but this problem
might be
there already.

  Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:
> Hi Constantinos,
>
> I agree with David that it is not easily possible to remove a
partition while a Flink job is running. Imagine the following
scenario:
>
> Your Flink job initially works on 2 partitions belonging to two
different topics and you have checkpointing enabled to guarantee
> exactly-once delivery. It implies that on every checkpoint the
offsets of the Kafka topic are stored in a Flink checkpoint to recover
> from them in case of a failure.
> Now you trigger the removal of one of the topics and the
discovery detects that one of the partitions was removed. If the
pipeline
> now fails before the next checkpoint was taken Flink will try to
recover from the previous checkpoint which is invalid by now because
> the partition is not available anymore.
>
> Only if you do not care about loosing data it is possible to
simply ignore the removed partition.
>
> Best,
> Fabian



Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský

Hi,

just out of curiosity, would this problem be solvable by the ability to 
remove partitions, that declare, that do not contain more data 
(watermark reaching end of global window)? There is probably another 
problem with that topic can be recreated after being deleted, which 
could result in watermark moving back in time, but this problem might be 
there already.


 Jan

On 9/14/21 3:08 PM, Fabian Paul wrote:

Hi Constantinos,

I agree with David that it is not easily possible to remove a partition while a 
Flink job is running. Imagine the following scenario:

Your Flink job initially works on 2 partitions belonging to two different 
topics and you have checkpointing enabled to guarantee
exactly-once delivery. It implies that on every checkpoint the offsets of the 
Kafka topic are stored in a Flink checkpoint to recover
from them in case of a failure.
Now you trigger the removal of one of the topics and the discovery detects that 
one of the partitions was removed. If the pipeline
now fails before the next checkpoint was taken Flink will try to recover from 
the previous checkpoint which is invalid by now because
the partition is not available anymore.

Only if you do not care about loosing data it is possible to simply ignore the 
removed partition.

Best,
Fabian


Re: Running Beam on a native Kubernetes Flink cluster

2021-08-15 Thread Jan Lukavský

Hi Gorjan,

the address of localhost is hard-coded in the python worker pool (see 
[1]). There should be no need to setup a load-balancer for the 
worker_pool, if you have it as another container in each TM pod, it 
should suffice to replace {beam_sdk_url} with 'localhost'. Each TM will 
then have its own worker_pool, which should be just fine.


Best,

 Jan

[1] 
https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81


On 8/14/21 4:37 PM, Gorjan Todorovski wrote:

Hi!

I need help implementing a native Kubernetes Flink cluster that needs 
to run batch jobs (run by TensorFlow Extended), but I am not sure I am 
configuring it right as I have issues running jobs on more than one 
task manager, while jobs run fine if there is only one TM.


I use the following parameters for the job:

|"--runner=FlinkRunner", "--parallelism=4", 
f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL", 
f"--environment_config={beam_sdk_url}:5", 
"--flink_submit_uber_jar", "--worker_harness_container_image=none", |


I have configured the Beam workers to run as side-cars to the TM 
containers. I do this by configuring. task manager template for the 
pods like this:


|kubernetes.pod-template-file.taskmanager|

it is pointing out to a template file with contents:

|kind: Pod metadata: name: taskmanager-pod-template spec: 
#hostNetwork: true containers: - name: flink-main-container #image: 
apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1" 
- name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value: 
"/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122 
#22 name: rpc - containerPort: 6125 name: query-state livenessProbe: 
tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 - 
name: beam-worker-pool env: - name: PYTHONPATH value: 
"/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name: 
S3_VERIFY_SSL value: "0" image: 
848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers 
 
imagePullPolicy: Always args: ["--worker_pool"] ports: - 
containerPort: 5 name: pool livenessProbe: tcpSocket: port: 5 
initialDelaySeconds: 30 periodSeconds: 60 |


I have also created a kubernetes load balancer for the task managers, 
so clients can connect on port 5. So I use that address when 
configuring:


|f"--environment_config={beam_sdk_url}:5",|

the problem is as it looks like the Beam SDK harness on one task 
manager wants to connect to the endpoint running on the other task 
manager, but looks for it on localhost:


Log from beam-worker-pool on TM 2:

|2021/08/11 09:43:16 Failed to obtain provisioning information: failed 
to dial server at localhost:33705 caused by: context deadline exceeded |


The provision endpoint on TM 1 is the one actually listening on the 
port 33705, while this is looking for it on localhost, so cannot 
connect to it.


Showing how I test this:

|... TM 1:  $ kubectl logs 
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12 
09:10:34 Starting worker pool 1: python -m 
apache_beam.runners.worker.worker_pool_main --service_port=5 
--container_executable=/opt/apache/beam/boot Starting worker with 
command ['/opt/apache/beam/boot', '--id=1-1', 
'--logging_endpoint=localhost:33383', 
'--artifact_endpoint=localhost:43477', 
'--provision_endpoint=localhost:40983', 
'--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to 
obtain provisioning information: failed to dial server at 
localhost:40983 caused by: context deadline exceeded TM 2: = $ 
kubectl logs my-first-flink-cluster-taskmanager-1-2 -c 
beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m 
apache_beam.runners.worker.worker_pool_main --service_port=5 
--container_executable=/opt/apache/beam/boot Starting worker with 
command ['/opt/apache/beam/boot', '--id=1-1', 
'--logging_endpoint=localhost:40497', 
'--artifact_endpoint=localhost:36245', 
'--provision_endpoint=localhost:32907', 
'--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to 
obtain provisioning information: failed to dial server at 
localhost:32907 caused by: context deadline exceeded Testing: 
. TM 1:  $ kubectl exec -it 
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash 
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983 
curl: (7) Failed to connect to localhost port 40983: Connection 
refused root@my-first-flink-cluster-taskmanager-1-1:/# curl 
localhost:32907 Warning: Binary output can mess up your terminal. Use 
"--output -" to ... TM 2: = 
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907 
curl: (7) Failed to connect to localhost port 32907: Connection 
refused root@my-first-flink-cluster-taskmanager-1-2:/# curl 
localhost:40983 Warning: Binary output can 

Re: Delay data elements in pipeline by X minutes

2021-07-19 Thread Jan Lukavský
I don't want to speak for Apache Flink - I'm using it via Apache Beam 
only - but generally speaking, each key will have to be held in state up 
to some moment when it can be garbage collected. This moment is defined 
(at least in the Apache Beam case) as the timestamp of end of window + 
allowed lateness. So, in the case of global window, it is (practically) 
forever in future, yes.


You can clean the state manually, though. If you would use the UUID (or 
similar) approach, then you would set a timer for the 15 minutes 
(relative) interval and then after you emit the data, you can clear the 
timer and the value state, which should clear the complete state of the 
window (please someone correct me if I'm wrong).


Alternative approach would be to use session windows and a 
GroupByKey-like operation, which would hold and emit element at the end 
of the session, which is exactly what you need. The state of the session 
window will be cleared in this case as well.


 Jan

On 7/19/21 2:00 PM, Dario Heinisch wrote:


Hey Jan,

No it isn't a logical constraint. Reason is there are different kind 
of users, some who pay for live data while other want a cheaper 
version but where the data is delayed.


But what happens if I add a random key ( lets say a uuid ) isn't that 
bad for performance? Then for every Object that is being processed I 
would have a state which is only being used once but I assume Flink 
wouldn't clean that state up, wouldn't it? What happens to the 
ValueState? Is that still being kept in memory? Because I thought that 
for every key Flink encounters it would keep a state.


But I think this could be solved with a TTL: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl, 
guess I will test that at some point this week! :)


For reference, this would be the code:

[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))

public abstract class Timestamper { public abstract long executedAt(); }

public class DelayedProcessor extends 
KeyedProcessFunction implements ResultTypeQueryable {


    private final String stateName;
    private final Class clazz;

    private ValueState state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class clazz) {
    this.stateName = stateName;
    this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {

    StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

    ValueStateDescriptor desc = new 
ValueStateDescriptor<>(stateName, clazz);

    desc.enableTimeToLive(ttlConfig);
    state = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(T t, Context ctx, Collector 
collector) throws Exception {

    this.state.update(t);

    long now = System.currentTimeMillis();

    long timeout = (now + TIMEOUT) - t.executedAt();

    ctx.timerService().registerEventTimeTimer(timeout);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {

    out.collect(this.state.value());
    }

    @Override
    public TypeInformation getProducedType() {
    return TypeInformation.of(clazz);
    }
}


Best regards,

Dario



On 18.07.21 19:12, Jan Lukavský wrote:


Hi Dario,

out of curiosity, could you briefly describe the driving use-case? 
What is the (logical) constraint, that drives the requirement? I'd 
guess, that it could be related to waiting for some (external) 
condition? Or maybe related to late data? I think that there might be 
better approaches, than (unconditionally) delay data in pipeline. On 
the other hand, if that is really the best approach, then adding a 
random key to create a keyed stream should work in all cases, right?


 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:


Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink 
& one kafka sink.


So I can just process the data in real time and insert them in the 
DB. Then I would just select the latest row where created_at >= 
NOW() - interval '15 minutes' and for any kafka consumer I would 
just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which 
are not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from 
delaying sink writes) can be solved via spark streaming. Then maybe 
spark streaming with a micro-batch of 15 mins would help.




On Sat, Jul 1

Re: Delay data elements in pipeline by X minutes

2021-07-18 Thread Jan Lukavský

Hi Dario,

out of curiosity, could you briefly describe the driving use-case? What 
is the (logical) constraint, that drives the requirement? I'd guess, 
that it could be related to waiting for some (external) condition? Or 
maybe related to late data? I think that there might be better 
approaches, than (unconditionally) delay data in pipeline. On the other 
hand, if that is really the best approach, then adding a random key to 
create a keyed stream should work in all cases, right?


 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:


Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & 
one kafka sink.


So I can just process the data in real time and insert them in the DB. 
Then I would just select the latest row where created_at >= NOW() - 
interval '15 minutes' and for any kafka consumer I would just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which 
are not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from delaying 
sink writes) can be solved via spark streaming. Then maybe spark 
streaming with a micro-batch of 15 mins would help.




On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch 
mailto:dario.heini...@gmail.com>> wrote:


Hey there,

Hope all is well!

I would like to delay the time by 15minutes before my data
arrives at my
sinks:

stream()
.map()
[]
.
.print()

I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:

public abstract class Timestamper {
 public abstract long executedAt();
}

public class DelayedProcessor extends
ProcessFunction {

 private final String stateName;
 private final Class clazz;

 // TODO: Should we do ListState as this is being preferred for
serialization
 //  or should we do Value but this may impact
serialization.
 private ListState state;

 private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

 public DelayedProcessor(String stateName, Class clazz) {
 this.stateName = stateName;
 this.clazz = clazz;
 }

 @Override
 public void open(Configuration parameters) {
 state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
 }

 @Override
 public void processElement(T t, Context ctx, Collector
collector) throws Exception {
 this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
 }

 @Override
 public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
 List list = new ArrayList<>();
 this.state.get().forEach(list::add);

 val now = System.currentTimeMillis();

 list = list.stream().filter(v -> {

 if (v.executedAt() + TIMEOUT <= now) {
 out.collect(v);
 return false;
 }

 return true;

 }).collect(Collectors.toList());

 this.state.update(list);
 }
}

Unfortunately, this can only used on a keyed stream which may not
always
be the case for me.

One possible solution would be to use:

.windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
Time.seconds(1)))

and then always just take the value with the lowest timestamp but
this
seems very bad performance wise and the state would be very large.

Does anyone has a solution for me or can point me in the right
direction?

Best regards,

Dario



Re: Does the Kafka source perform retractions on Key?

2021-02-24 Thread Jan Lukavský

Hi Rex,

If I understand correctly, you are concerned about behavior of Kafka 
source in the case of compacted topic, right? If that is the case, then 
this is not directly related to Flink, Flink will expose the behavior 
defined by Kafka. You can read about it for instance here [1]. TL;TD - 
your pipeline is guaranteed to see every record written to topic (every 
single update, be it later "overwritten" or not) if it processes the 
record with latency at most 'delete.retention.ms'. This is configurable 
per topic - default 24 hours. If you want to reprocess the data later, 
your consumer might see only resulting compacted ("retracted") stream, 
and not every record actually written to the topic.


 Jan

[1] 
https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262


On 2/24/21 3:14 AM, Rex Fenley wrote:
Apologies, forgot to finish. If the Kafka source performs its own 
retractions of old data on key (user_id) for every append it receives, 
it should resolve this discrepancy I think.


Again, is this true? Anything else I'm missing?

Thanks!


On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley > wrote:


Hi,

I'm concerned about the impacts of Kafka's compactions when
sending data between running flink jobs.

For example, one job produces retract stream records in sequence of
(false, (user_id: 1, state: "california") -- retract
(true, (user_id: 1, state: "ohio")) -- append
Which is consumed by Kafka and keyed by user_id, this could end up
compacting to just
(true, (user_id: 1, state: "ohio")) -- append
If some other downstream Flink job has a filter on state ==
"california" and reads from the Kafka stream, I assume it will
miss the retract message altogether and produce incorrect results.

Is this true? How do we prevent this from happening? We need to
use compaction since all our jobs are based on CDC and we can't
just drop data after x number of days.

Thanks

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US




--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 





Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
The exclusions should not have any impact on that, because what defines 
which classloader will load which class is not the presence or 
particular class in a specific jar, but the configuration of 
parent-first-patterns [1].


If you don't use any flink internal imports, than it still might be the 
case, that a class in any of the packages defined by the 
parent-first-pattern to hold reference to your user-code classes, which 
would cause the leak. I'd recommend to inspect the heap dump after 
several restarts of the application and look for reference to Class 
objects from the root set.


Jan

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#class-loading


On 11/16/20 5:34 PM, Flavio Pompermaier wrote:
I've tried to remove all possible imports of classes not contained in 
the fat jar but I still face the same problem.
I've also tried to reduce as much as possible the exclude in the shade 
section of the maven plugin (I took the one at [1]) so now I exclude 
only few dependencies..could it be that I should include org.slf4j:* 
if I use static import of it?



    
com.google.code.findbugs:jsr305
      org.slf4j:*
      log4j:*
    


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies


On Mon, Nov 16, 2020 at 3:29 PM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Yes, that could definitely cause this. You should probably avoid
using these flink-internal shaded classes and ship your own
versions (not shaded).

Best,

 Jan

On 11/16/20 3:22 PM, Flavio Pompermaier wrote:

Thank you Jan for your valuable feedback.
Could it be that I should not use import shaded-jackson classes
in my user code?
For example import

org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?

Bets,
Flavio

On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Flavio,

when I encountered quite similar problem that you describe,
it was related to a static storage located in class that was
loaded "parent-first". In my case it was it was in
java.lang.ClassValue, but it might (and probably will be)
different in your case. The problem is that if user-code
registers something in some (static) storage located in class
loaded with parent (TaskTracker) classloader, then its
associated classes will never be GC'd and Metaspace will
grow. A good starting point would be not to focus on biggest
consumers of heap (in general), but to look at where the 15k
objects of type Class are referenced from. That might help
you figure this out. I'm not sure if there is something that
can be done in general to prevent this type of leaks. That
would be probably question on dev@ mailing list.

Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this
mailing list appeared..
The difference is that the other problem seems to be related
with Flink 1.10 on YARN and does not output anything helpful
in debugging the cause of the problem.

Indeed, in my use case I use Flink 1.11.0 and Flink on a
standalone session cluster (the job is submitted to the
cluster using the CLI client).
The problem arises when I submit the same job for about 20
times (this number unfortunately is not deterministic and
can change a little bit). The error reported by the Task
Executor is related to the ever growing Metaspace..the error
seems to be pretty detailed [1].

I found the same issue in some previous threads on this
mailing list and I've tried to figure it out the cause of
the problem. The issue is that looking at the objects
allocated I don't really get an idea of the source of the
problem because the type of objects that are consuming the
memory are of general purpose (i.e. Bytes, Integers and
Strings)...these are my "top" memory consumers if looking at
the output of  jmap -histo :

At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   2:          3736        6536672  [I (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   3:         38081         913944  java.lang.String
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   4:            26         852384
 [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class
(java.base@11

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský
Yes, that could definitely cause this. You should probably avoid using 
these flink-internal shaded classes and ship your own versions (not shaded).


Best,

 Jan

On 11/16/20 3:22 PM, Flavio Pompermaier wrote:

Thank you Jan for your valuable feedback.
Could it be that I should not use import shaded-jackson classes in my 
user code?
For example import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper?


Bets,
Flavio

On Mon, Nov 16, 2020 at 3:15 PM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Flavio,

when I encountered quite similar problem that you describe, it was
related to a static storage located in class that was loaded
"parent-first". In my case it was it was in java.lang.ClassValue,
but it might (and probably will be) different in your case. The
problem is that if user-code registers something in some (static)
storage located in class loaded with parent (TaskTracker)
classloader, then its associated classes will never be GC'd and
Metaspace will grow. A good starting point would be not to focus
on biggest consumers of heap (in general), but to look at where
the 15k objects of type Class are referenced from. That might help
you figure this out. I'm not sure if there is something that can
be done in general to prevent this type of leaks. That would be
probably question on dev@ mailing list.

Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this mailing
list appeared..
The difference is that the other problem seems to be related
with Flink 1.10 on YARN and does not output anything helpful in
debugging the cause of the problem.

Indeed, in my use case I use Flink 1.11.0 and Flink on a
standalone session cluster (the job is submitted to the cluster
using the CLI client).
The problem arises when I submit the same job for about 20 times
(this number unfortunately is not deterministic and can change a
little bit). The error reported by the Task Executor is related
to the ever growing Metaspace..the error seems to be pretty
detailed [1].

I found the same issue in some previous threads on this mailing
list and I've tried to figure it out the cause of the problem.
The issue is that looking at the objects allocated I don't really
get an idea of the source of the problem because the type of
objects that are consuming the memory are of general purpose
(i.e. Bytes, Integers and Strings)...these are my "top" memory
consumers if looking at the output of  jmap -histo :

At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   2:          3736        6536672  [I (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   3:         38081         913944  java.lang.String
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   4:            26         852384
 [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)

At run 1:

   1:         77.608       25.317.496  [B (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   2:          7.004        9.088.360  [I (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   3:         15.814        1.887.256  java.lang.Class
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   4:         67.381        1.617.144  java.lang.String
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   5:          3.906        1.422.960  [Ljava.util.HashMap$Node;
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)

At run 6:

   1:         81.408       25.375.400  [B (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   2:         12.479        7.249.392  [I (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   3:         29.090        3.496.168  java.lang.Class
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   4:          4.347        2.813.416  [Ljava.util.HashMap$Node;
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   5:         71.584        1.718.016  java.lang.String
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)

At run 8:

   1:        985.979      127.193.256  [B (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   2:         35.400       13.702.112  [I (java.base@11.0.9.1
<mailto:java.base@11.0.9.1>)
   3:        260.387        6.249.288  java.lang.String
(java.base@11.0.9.1 <mailto:java.base@11.0.9.1>)
   4:        148.836        5.953.440
 java.util.HashMap$KeyIterator (java.base@11.0.9.1
<mailto:java.b

Re: Random Task executor shutdown (java.lang.OutOfMemoryError: Metaspace)

2020-11-16 Thread Jan Lukavský

Hi Flavio,

when I encountered quite similar problem that you describe, it was 
related to a static storage located in class that was loaded 
"parent-first". In my case it was it was in java.lang.ClassValue, but it 
might (and probably will be) different in your case. The problem is that 
if user-code registers something in some (static) storage located in 
class loaded with parent (TaskTracker) classloader, then its associated 
classes will never be GC'd and Metaspace will grow. A good starting 
point would be not to focus on biggest consumers of heap (in general), 
but to look at where the 15k objects of type Class are referenced from. 
That might help you figure this out. I'm not sure if there is something 
that can be done in general to prevent this type of leaks. That would be 
probably question on dev@ mailing list.


Best,

 Jan

On 11/16/20 2:27 PM, Flavio Pompermaier wrote:

Hello everybody,
I was writing this email when a similar thread on this mailing list 
appeared..
The difference is that the other problem seems to be related 
with Flink 1.10 on YARN and does not output anything helpful in 
debugging the cause of the problem.


Indeed, in my use case I use Flink 1.11.0 and Flink on a standalone 
session cluster (the job is submitted to the cluster using the CLI 
client).
The problem arises when I submit the same job for about 20 times (this 
number unfortunately is not deterministic and can change a little 
bit). The error reported by the Task Executor is related to the ever 
growing Metaspace..the error seems to be pretty detailed [1].


I found the same issue in some previous threads on this mailing list 
and I've tried to figure it out the cause of the problem. The issue is 
that looking at the objects allocated I don't really get an idea of 
the source of the problem because the type of objects that are 
consuming the memory are of general purpose (i.e. Bytes, Integers and 
Strings)...these are my "top" memory consumers if looking at the 
output of  jmap -histo :


At run 0:

 num     #instances         #bytes  class name (module)
---
   1:         46238       13224056  [B (java.base@11.0.9.1 
)
   2:          3736        6536672  [I (java.base@11.0.9.1 
)
   3:         38081         913944  java.lang.String 
(java.base@11.0.9.1 )

   4:            26         852384  [Lakka.dispatch.forkjoin.ForkJoinTask;
   5:          7146         844984  java.lang.Class 
(java.base@11.0.9.1 )


At run 1:

   1:         77.608       25.317.496  [B (java.base@11.0.9.1 
)
   2:          7.004        9.088.360  [I (java.base@11.0.9.1 
)
   3:         15.814        1.887.256  java.lang.Class 
(java.base@11.0.9.1 )
   4:         67.381        1.617.144  java.lang.String 
(java.base@11.0.9.1 )
   5:          3.906        1.422.960  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )


At run 6:

   1:         81.408       25.375.400  [B (java.base@11.0.9.1 
)
   2:         12.479        7.249.392  [I (java.base@11.0.9.1 
)
   3:         29.090        3.496.168  java.lang.Class 
(java.base@11.0.9.1 )
   4:          4.347        2.813.416  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )
   5:         71.584        1.718.016  java.lang.String 
(java.base@11.0.9.1 )


At run 8:

   1:        985.979      127.193.256  [B (java.base@11.0.9.1 
)
   2:         35.400       13.702.112  [I (java.base@11.0.9.1 
)
   3:        260.387        6.249.288  java.lang.String 
(java.base@11.0.9.1 )
   4:        148.836        5.953.440  java.util.HashMap$KeyIterator 
(java.base@11.0.9.1 )
   5:         17.641        5.222.344  [Ljava.util.HashMap$Node; 
(java.base@11.0.9.1 )


Thanks in advance for any help,
Flavio

[1] 
--
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory 
error has occurred. This can mean two things: either the job requires 
a larger size of JVM metaspace to load classes or there is a class 
loading leak. In the first case 
'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak in user 
code or some of its dependencies which has to be investigated and 
fixed. The task executor has to be shutdown...

        at java.lang.ClassLoader.defineClass1(Native Method) ~[?:?]
        at 

Re: Streaming data to parquet

2020-09-14 Thread Jan Lukavský

Hi,

I'd like to mention another approach, which might not be as "flinkish", 
but removes the source of issues which arise when writing bulk files. 
The actual cause of issues here is that when creating bulk output, the 
most efficient option is to have _reversed flow of commit_. That is to 
say - on contrary of Flink's checkpoint barrier flowing from sources to 
sinks - the optimal performance in bulk case is to let the sink commit 
source once it finishes the bulk write (with whatever period). This is 
currently impossible to achieve with Flink, but what works for me the 
best is to use Flink sinks to write streaming commit log (e.g. Kafka) 
and then have independent processes (Kafka consumers or equivalent) to 
read output topics, pack them and push to bulk store, once the write is 
finished, the Kafka topic is committed. It requires deployment of 
additional application, but that is low overhead in deployments like k8s.


Moreover, this solves the dilemma between quick commits (for real-time 
data) and large files, because one can read data from both streaming 
(real real-time) source and do a union with batch data stored at bulk 
store. Both these techniques are implemented in [1] (disclaimer: I'm one 
of the core developers of that platform).


Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform

On 9/14/20 2:03 PM, Arvid Heise wrote:

Hi Kumar,

for late events, I have seen two approaches:

* Initial compaction every day, repeated compaction after two days, 
and after 1 week.
* Using something like delta lake [1], which is a set of specially 
structured parquet files. Usually you also compact them after some 
time (e.g. 1 week in your case), but you can query them efficiently in 
the meantime.


However, I'm not aware of some out-of-the-box delta lake solution for 
Flink. This might be something that we could put on the community 
agenda if there is a general interest.


[1] 
https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/


On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar > wrote:


Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket:
Initial (partitioned by event time)

Do you write another Flink batch application (step 2) which
partitions the data from Initial in larger “event time” chunks

and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times
which might be up to 1 week old.

One approach is to simply write the batch job after 1 week, but
then we lose the ability to query the recent data in an efficient
fashion.

I would appreciate any ideas etc.

Cheers

Kumar

*From: *Ayush Verma mailto:ayushver...@gmail.com>>
*Date: *Friday, September 11, 2020 at 8:14 AM
*To: *Robert Metzger mailto:rmetz...@apache.org>>
*Cc: *Marek Maj mailto:marekm...@gmail.com>>, user mailto:user@flink.apache.org>>
*Subject: *Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with
how often you commit. No matter which system you use, this
variable will always be there. If you commit frequently, you will
be close to realtime, but you will have numerous small files. If
you commit after long intervals, you will have larger files, but
this is as good as a "batch world". We solved this problem at my
company by having 2 systems. One to commit the files at small
intervals, thus bringing data into durable storage reliably, and
one to roll up these small files. It's actually really simple to
implement this if you don't try to do it in a single job.

Best

Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Marek,

what you are describing is a known problem in Flink. There are
some thoughts on how to address this in
https://issues.apache.org/jira/browse/FLINK-11499


and https://issues.apache.org/jira/browse/FLINK-17505



Maybe some ideas there help you already for your current
problem (use long checkpoint intervals).

A related idea to (2) is to write your data with the Avro
format, and then regularly use a batch job 

Re: k8s job cluster using StatefulSet

2020-08-14 Thread Jan Lukavský

Hi Alexey,

I'm using StatefulSet for JM exactly as you describe (Deployment for TM 
is just fine). The main advantage is that you don't need distributed 
storage for JM fault tolerance, because you can use persistent volume 
mount (provided your cloud provider provides it as fault tolerant 
volume). You still need zookeeper (AFAIK), but that might be a feature 
request, as it is not actually necessary in this case. If this could be 
incorporated in the mentioned Flink Operators, that would be great. :-)


Best,

 Jan

On 8/14/20 5:09 AM, Yang Wang wrote:

Hi Alexey,

Actually, StatefulSets could also be used to start the JobManager and 
TaskManager.


So why do we suggest to use Deployment in the Flink documentation?
* StatefulSets requires the user to have persistent volume in the K8s 
cluster. However, it is not always true,

  especially for the unmanaged(self-build) K8s cluster.
* Flink uses Zookeeper and distributed storage(S3, GFS, etc.) to 
process the fault tolerance. If you start multiple
  JobManagers, the leader election and leader retrieval will be done 
via Zookeeper. Also the meta information will
   be stored in the Zookeeper. So it is unnecessary to use StatefulSet 
to do more things.
* The local data of JobManager and TaskManager is ephemeral. It could 
be discarded after crashed.



Best,
Yang




Arvid Heise mailto:ar...@ververica.com>> 
于2020年8月13日周四 下午4:38写道:


Hi Alexey,

I don't see any issue in using stateful sets immediately.

I'd recommend using one of the K8s operators or Ververica's
community edition [1] though if you start with a new setup as they
may solve even more issues that you might experience in the future.

[1] https://www.ververica.com/getting-started

On Mon, Aug 10, 2020 at 11:22 PM Alexey Trenikhun mailto:yen...@msn.com>> wrote:

Hello,
Flink documentation suggests to use Deployments to deploy JM
and TM for kubernetes job cluster. Is any known potential
issues with using StatefulSets instead, seems StatefullSet
provides uniqueness for JM during upgrade/rollback, while with
Deployments could be multiple JM pods (e.g.1 terminating and 1
running)

Thanks,
Alexey



-- 


Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache
FlinkConference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
BManaging Directors: Timothy Alexander Steinert, Yip Park Tung
Jason, Ji (Toni) Cheng



Re: POJO serialization vs immutability

2019-10-07 Thread Jan Lukavský
Having said that - the same logic applies to using POJO as keys in 
grouping operations, which heavily rely on hashCode() and equals(). That 
might suggest, that using mutable objects is not the best option there 
either. But that might  be very much subjective claim.


Jan

On 10/7/19 3:13 PM, Jan Lukavský wrote:


Exactly. And that's why it is good for mutable data, because they are 
not suited for keys either.


Jan

On 10/7/19 2:58 PM, Chesnay Schepler wrote:
The default hashCode implementation is effectively random and not 
suited for keys as they may not be routed to the same instance.


On 07/10/2019 14:54, Jan Lukavský wrote:


Hi Stephen,

I found a very nice article [1], which might help you solve the 
issues you are concerned about. The elegant solution to this problem 
might be summarized as "do not implement equals() and hashCode() for 
POJO types, use Object's default implementation". I'm not 100% sure 
that this will not have any negative impacts on some other Flink 
components, but I _suppose_ it should not (someone might correct me 
if I'm wrong).


Jan

[1] http://web.mit.edu/6.031/www/sp17/classes/15-equality/

On 10/7/19 1:37 PM, Chesnay Schepler wrote:


This question should only be relevant for cases where POJOs are 
used as keys, in which case they /must not/ return a class-constant 
nor effectively-random value, as this would break the hash 
partitioning.


This is somewhat alluded to in the keyBy() documentation 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations>, 
but could be clarified.


It is in any case heavily discouraged to modify objects after they 
have been emitted from a function; the mutability of POJOs is hence 
usually not a problem.


On 02/10/2019 14:17, Stephen Connolly wrote:
I notice 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types 
says that all non-transient fields need a setter.


That means that the fields cannot be final.

That means that the hashCode() should probably just return a 
constant value (otherwise an object could be mutated and then lost 
from a hash-based collection.


Is it really the case that we have to either register a serializer 
or abandon immutability and consequently force hashCode to be a 
constant value?


What are the recommended implementation patterns for the POJOs 
used in a topology


Thanks

-Stephen







Re: POJO serialization vs immutability

2019-10-07 Thread Jan Lukavský
Exactly. And that's why it is good for mutable data, because they are 
not suited for keys either.


Jan

On 10/7/19 2:58 PM, Chesnay Schepler wrote:
The default hashCode implementation is effectively random and not 
suited for keys as they may not be routed to the same instance.


On 07/10/2019 14:54, Jan Lukavský wrote:


Hi Stephen,

I found a very nice article [1], which might help you solve the 
issues you are concerned about. The elegant solution to this problem 
might be summarized as "do not implement equals() and hashCode() for 
POJO types, use Object's default implementation". I'm not 100% sure 
that this will not have any negative impacts on some other Flink 
components, but I _suppose_ it should not (someone might correct me 
if I'm wrong).


Jan

[1] http://web.mit.edu/6.031/www/sp17/classes/15-equality/

On 10/7/19 1:37 PM, Chesnay Schepler wrote:


This question should only be relevant for cases where POJOs are used 
as keys, in which case they /must not/ return a class-constant nor 
effectively-random value, as this would break the hash partitioning.


This is somewhat alluded to in the keyBy() documentation 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations>, 
but could be clarified.


It is in any case heavily discouraged to modify objects after they 
have been emitted from a function; the mutability of POJOs is hence 
usually not a problem.


On 02/10/2019 14:17, Stephen Connolly wrote:
I notice 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types 
says that all non-transient fields need a setter.


That means that the fields cannot be final.

That means that the hashCode() should probably just return a 
constant value (otherwise an object could be mutated and then lost 
from a hash-based collection.


Is it really the case that we have to either register a serializer 
or abandon immutability and consequently force hashCode to be a 
constant value?


What are the recommended implementation patterns for the POJOs used 
in a topology


Thanks

-Stephen







Re: POJO serialization vs immutability

2019-10-07 Thread Jan Lukavský

Hi Stephen,

I found a very nice article [1], which might help you solve the issues 
you are concerned about. The elegant solution to this problem might be 
summarized as "do not implement equals() and hashCode() for POJO types, 
use Object's default implementation". I'm not 100% sure that this will 
not have any negative impacts on some other Flink components, but I 
_suppose_ it should not (someone might correct me if I'm wrong).


Jan

[1] http://web.mit.edu/6.031/www/sp17/classes/15-equality/

On 10/7/19 1:37 PM, Chesnay Schepler wrote:


This question should only be relevant for cases where POJOs are used 
as keys, in which case they /must not/ return a class-constant nor 
effectively-random value, as this would break the hash partitioning.


This is somewhat alluded to in the keyBy() documentation 
, 
but could be clarified.


It is in any case heavily discouraged to modify objects after they 
have been emitted from a function; the mutability of POJOs is hence 
usually not a problem.


On 02/10/2019 14:17, Stephen Connolly wrote:
I notice 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types 
says that all non-transient fields need a setter.


That means that the fields cannot be final.

That means that the hashCode() should probably just return a constant 
value (otherwise an object could be mutated and then lost from a 
hash-based collection.


Is it really the case that we have to either register a serializer or 
abandon immutability and consequently force hashCode to be a constant 
value?


What are the recommended implementation patterns for the POJOs used 
in a topology


Thanks

-Stephen





Re: [SURVEY] What is the most subtle/hard to catch bug that people have seen?

2019-10-01 Thread Jan Lukavský

Hi,

I'd add another one regarding Java hashCode() and its practical 
usability for distributed systems [1], although practically all (Java 
based) data processing systems rely on it.


One bug directly related to this I once saw was, that using an Enum 
inside other object used as partitioning key results in really hard to 
debug bugs. Mostly because during local testing everything works just 
fine, problem arises only when multiple JVMs are involved. This is 
caused by the fact, that hashCode() of Enum is derived from associated 
memory position.


Jan

[1] 
https://martin.kleppmann.com/2012/06/18/java-hashcode-unsafe-for-distributed-systems.html


On 10/1/19 11:45 AM, Piotr Nowojski wrote:

Hi,

Are you asking about bugs in Flink, in libraries that Flink is using or bugs in 
applications that were using Flink? From my perspective/what I have seen:

The most problematic bugs while developing features for Flink:

Dead locks & data losses caused by concurrency issues in network stack 
after changing some trivial things in new data notifications.
Data visibility issues for concurrent writes/reads when implementing S3 
connector.

The most problematic bug/type of bugs in the Dependencies:

Dead locks in the external connector (for example 
https://issues.apache.org/jira/browse/KAFKA-6132 
 ). Integration with external 
systems is always difficult. If you add concurrency issues to the mix…

The most problematic bug in the Flink application:

Being unaware that for some reasons, some unknown to me code was 
interrupting (SIGINT) threads spawned by a custom SourceFunction, that were 
emitting the data, when the job was back pressured. This was causing records 
serialisation very rarerly to be interrupted in the middle showing up on the 
down stream receiver as deserialisation errors.

Piotrek


On 1 Oct 2019, at 04:18, Konstantinos Kallas  
wrote:

Hi everyone.

I wanted to ask Flink users what are the most subtle Flink bugs that
people have witnessed. The cause of the bugs could be anything (e.g.
wrong assumptions on data, parallelism of non-parallel operator, simple
mistakes).

We are developing a testing framework for Flink and it would be
interesting to have examples of difficult to spot bugs to evaluate our
testing framework on.

Thanks,
Konstantinos Kallas