Re: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Talat Uyarer via user
Hi Schwalbe,


>- There is no way to have only one file unless you lower the
>parallelism to 1 (= only one subtask)
>
>
Even with single parallelism there are multiple checkpoint files for
chained operators.


>- So which files do you see: 1 “_metadata” + multiple data files (or
>just one)?
>
>
Yes per checkpoint we have a folder with a checkpoint number. That folder
has one metadata file and one file per operator with vertex uuid.


>- The idea of having multiple files is to allow multiple threads to be
>able to stare checkpoints at the same time, and when restarting from a
>checkpoint to consume from more files potentially distributed to multiple
>physical hard driver (more I/O capacity)
>
>
Yes I am well aware of why we have multiple files for operators. But having
a file per operator which is running the same thread in the operator chain
is redundant and increases checkpoint size. I believe the operator chain
driver could handle checkpointing all at once. It would reduce the total
size of the checkpoint. Because all chain operators use the same memory. If
objectreuse is enabled. then they use exact same objects.

Still (out of curiosity) why would you want to have everything in a single
> file?


I dont want to have single files for checkpointing. I want one file per
operator chain group. rather than having multiple files per operator in the
chain. When you have huge parallelism checkpoint size can hit huge numbers
such as GBs. And because of the size of the checkpoint we can not do
frequent checkpointing as much as we want. Chaining Operators are a really
good optimization in terms of memory usage. However it is still lacking in
terms of checkpointing. Today we are using Dataflow. Dataflow has similar
behavior with checkpoint support they call pipeline fusion. [1]

[1]
https://cloud.google.com/dataflow/docs/pipeline-lifecycle#fusion_optimization

Thanks

On Thu, Feb 2, 2023 at 9:25 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Talat Uyarer,
>
>
>
>- There is no way to have only one file unless you lower the
>parallelism to 1 (= only one subtask)
>- So which files do you see: 1 “_metadata” + multiple data files (or
>just one)?
>- The idea of having multiple files is to allow multiple threads to be
>able to stare checkpoints at the same time, and when restarting from a
>checkpoint to consume from more files potentially distributed to multiple
>physical hard driver (more I/O capacity)
>- So in general it is good to have multiple files
>
>
>
> Still (out of curiosity) why would you want to have everything in a single
> file?
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Talat Uyarer 
> *Sent:* Thursday, February 2, 2023 5:57 PM
> *To:* Schwalbe Matthias 
> *Cc:* Kishore Pola ; weijie guo <
> guoweijieres...@gmail.com>; user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Schwalbe, weijie,
>
>
>
> Thanks for your reply.
>
>
>
>
>- Each state primitive/per subtask stores state into a separate file
>
>
>
> In this picture You can see Operator Chain
> https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg
> 
>
>
>
> Source and Map are in the same chain. Today Flink creates two files for
> that operator chain. When we have OperatorChain, All subtasks are running
> in the same machine, same thread for memory optimization.  However Flink
> creates separate files per subtasks. Our question is whether there is a way
> to have one file not multiple files.
>
>
>
> Thanks
>
>
>
>
>
>
>
> On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>- *what makes your observation a problem to be solved?*
>- You write, you have no shuffling, does that mean you don’t use any
>keyBy(), or rebalance()?
>- How do you determine that there are 7 checkpoint, one for each
>operator?
>- In general please relate a bit more details about how you configure
>state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>- Jobmanager creates checkpoint mark/barrier in a configured interval
>- For synchronous checkpointing this flows along with the events
>through the chain of tasks
>- For asynchronous checkpointing, the checkpointing 

Re: Standalone cluster memory configuration

2023-02-02 Thread Hang Ruan
Hi, Theodor,

The description in
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#memory-configuration
map help you to config the memory for flink.

> Flink tries to shield users as much as possible from the complexity of
> configuring the JVM for data-intensive processing. In most cases, users
> should only need to set the values taskmanager.memory.process.size or
> taskmanager.memory.flink.size (depending on how the setup), and possibly
> adjusting the ratio of JVM heap and Managed Memory via
> taskmanager.memory.managed.fraction. The other options below can be used
> for performance tuning and fixing memory related errors.
>
I think maybe you should set the taskmanager.memory.process.size or
taskmanager.memory.flink.size, or increase the memory of the container.
Hope this helps!

Best,
Hang

Theodor Wübker  于2023年2月2日周四 23:37写道:

> Hello everyone,
>
> I have a Standalone Custer running in a docker-swarm with a very simple
> docker-compose configuration [3].  When I run my job there with a
> parallelism greater than one, I get an out of memory error. Nothing out of
> the ordinary, so I wanted to increase the JVM heap. I did that by setting
> ‘taskmanager.memory.task.heap.size’ according to [1]. However the
> taskmanager would not start, throwing an Exception saying that this
> configuration clashes with the configured total process memory - even
> though I had not configured that at all. Due to this, I could also not set
> the total Flink memory.
> Now I wonder, why did the TM tell me that the total process memory is
> already configured? Also, in [2]  I read that the cluster should not even
> start when neither total Flink memory nor total process memory are
> specified - which, as you can see in my configuration, I have not done [3].
>
> Maybe someone can enlighten me, why it looks like I can’t configure the
> memory properly? Thanks :)
>
> -Theo
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
>
> [3] The compose configuration:
>
>   jobmanager:
> image: flink:1.16.0
> command: jobmanager
> environment:
>   - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=2
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
>
>
>   taskmanager-01:
> image: flink:1.16.0
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
>


flink提交jar包时报连接不上BlobServer

2023-02-02 Thread 孙冬燕
2023-02-02 19:21:21,288 WARN 
org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - 
Could not execute application: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'Flink Streaming Job'.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 [?:1.8.0_66]
 at java.lang.Thread.run(Thread.java:756) [?:1.8.0_66]
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink 
Streaming Job'.
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:144)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
me.ele.arch.emonitor.blink.DataStreamApplication.main(DataStreamApplication.java:48)
 ~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_66]
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_66]
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_66]
 at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_66]
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist-1.16.0.jar:1.16.0]
 ... 7 more
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
 at 
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:177)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
 ~[?:1.8.0_66]
 at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) 
~[?:1.8.0_66]
 at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitJob(EmbeddedExecutor.java:174)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:134)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2188)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:144)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2027)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
me.ele.arch.emonitor.blink.DataStreamApplication.main(DataStreamApplication.java:48)
 ~[?:?]
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_66]
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_66]
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_66]
 at 

RE: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Schwalbe Matthias
Hi Talat Uyarer,


  *   There is no way to have only one file unless you lower the parallelism to 
1 (= only one subtask)
  *   So which files do you see: 1 “_metadata” + multiple data files (or just 
one)?
  *   The idea of having multiple files is to allow multiple threads to be able 
to stare checkpoints at the same time, and when restarting from a checkpoint to 
consume from more files potentially distributed to multiple physical hard 
driver (more I/O capacity)
  *   So in general it is good to have multiple files

Still (out of curiosity) why would you want to have everything in a single file?

Sincere greetings

Thias


From: Talat Uyarer 
Sent: Thursday, February 2, 2023 5:57 PM
To: Schwalbe Matthias 
Cc: Kishore Pola ; weijie guo 
; user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Schwalbe, weijie,

Thanks for your reply.


  *   Each state primitive/per subtask stores state into a separate file

In this picture You can see Operator Chain 
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for that 
operator chain. When we have OperatorChain, All subtasks are running in the 
same machine, same thread for memory optimization.  However Flink creates 
separate files per subtasks. Our question is whether there is a way to have one 
file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Kishore,


Having followed this thread for a while it is still quite a bit of confusion of 
concepts and in order to help resolve your original we would need to know,

  *   what makes your observation a problem to be solved?
  *   You write, you have no shuffling, does that mean you don’t use any 
keyBy(), or rebalance()?
  *   How do you determine that there are 7 checkpoint, one for each operator?
  *   In general please relate a bit more details about how you configure state 
primitives: kinds/also operator state?/on all operators/etc.

In general (as Weijie told) checkpointing works like that (simplified):

  *   Jobmanager creates checkpoint mark/barrier in a configured interval
  *   For synchronous checkpointing this flows along with the events through 
the chain of tasks
  *   For asynchronous checkpointing, the checkpointing marker is directly sent 
to the subtasks
  *   A single checkpoint looks like that:

 *   Each state primitive/per subtask stores state into a separate file
 *   At the end jobmager writes a “_metadata” file for the checkpoint 
metadata and for state that is too small to end up in a separate file
 *   i.e. each checkpoint generates only one checkpoint (multiple files) 
not 7

Hope we shed a little light on this

Best regards

Thias



From: Kishore Pola mailto:kishore.p...@hotmail.com>>
Sent: Thursday, February 2, 2023 4:12 AM
To: weijie guo mailto:guoweijieres...@gmail.com>>; 
Talat Uyarer mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Weijie,

In our case we do have 7 operators. All the 7 operators are getting executed as 
one chain within a single StreamTask. As checkpoint barrier is passing through 
all the operators, there are 7 checkpoints being stored. So our checkpoint size 
is up by 7 times. We are investigating to see if we can checkpoint the start 
operator (kafka source) or end operator (BQ sink), we are good and check point 
size comes down. Hence the question, when the operators are executed in the 
same StreamTask as one chain, is it possible to checkpoint at operator chain or 
single operator level?

Thanks,
Kishore


From: weijie guo mailto:guoweijieres...@gmail.com>>
Sent: Wednesday, February 1, 2023 6:59 PM
To: Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Reducing Checkpoint Count for Chain Operator

Hi Talat,

In Flink, a checkpoint barrier will be injected from source, and then pass 
through all operators in turn. Each stateful operator will do checkpoint in 
this process, the state is managed at operator granularity, not operator chain. 
So what is the significance of checkpoint based on the granularity of operator 
chain?


Best regards,

Weijie


Talat Uyarer 
mailto:tuya...@paloaltonetworks.com>> 
于2023年2月2日周四 02:20写道:
Hi Weijie,

Thanks for replying back.

Our job is  a streaming job. The OperatorChain contains all operators that are 
executed as one chain within a single StreamTask. But each operator creates 
their own checkpoint at checkpointing time . Rather than creating a checkpoint 
per operator in checkpointing time. Can I have one checkpoint per 
OperatorChain? This is my question.

Thanks

On Wed, Feb 1, 2023 at 1:02 AM weijie guo 

Clear global state

2023-02-02 Thread Dario Heinisch

Hey all,

Is it somehow possible to hook into all states in a current Job and 
clear them all at once? Currently the way I do it is just to stop the 
job and then restarting it.


Was wonderding if there is a way where I can do it without restarting 
the job. I know about adding TTL to states but since I know that the job 
only needs the state in all operators for specific timeframe, I feel 
like just doing globally all at once would be more appropiate.


Best regards,

Dario



Re: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Talat Uyarer via user
Hi Schwalbe, weijie,

Thanks for your reply.


>- Each state primitive/per subtask stores state into a separate file
>
>
In this picture You can see Operator Chain
https://nightlies.apache.org/flink/flink-docs-master/fig/tasks_chains.svg

Source and Map are in the same chain. Today Flink creates two files for
that operator chain. When we have OperatorChain, All subtasks are running
in the same machine, same thread for memory optimization.  However Flink
creates separate files per subtasks. Our question is whether there is a way
to have one file not multiple files.

Thanks



On Wed, Feb 1, 2023 at 11:50 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Kishore,
>
>
>
>
>
> Having followed this thread for a while it is still quite a bit of
> confusion of concepts and in order to help resolve your original we would
> need to know,
>
>- *what makes your observation a problem to be solved?*
>- You write, you have no shuffling, does that mean you don’t use any
>keyBy(), or rebalance()?
>- How do you determine that there are 7 checkpoint, one for each
>operator?
>- In general please relate a bit more details about how you configure
>state primitives: kinds/also operator state?/on all operators/etc.
>
>
>
> In general (as Weijie told) checkpointing works like that (simplified):
>
>- Jobmanager creates checkpoint mark/barrier in a configured interval
>- For synchronous checkpointing this flows along with the events
>through the chain of tasks
>- For asynchronous checkpointing, the checkpointing marker is directly
>sent to the subtasks
>- A single checkpoint looks like that:
>   - Each state primitive/per subtask stores state into a separate file
>   - At the end jobmager writes a “_metadata” file for the checkpoint
>   metadata and for state that is too small to end up in a separate file
>   - i.e. each checkpoint generates only one checkpoint (multiple
>   files) not 7
>
>
>
> Hope we shed a little light on this
>
>
>
> Best regards
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Kishore Pola 
> *Sent:* Thursday, February 2, 2023 4:12 AM
> *To:* weijie guo ; Talat Uyarer <
> tuya...@paloaltonetworks.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Weijie,
>
>
>
> In our case we do have 7 operators. All the 7 operators are getting
> executed as one chain within a single StreamTask. As checkpoint barrier is
> passing through all the operators, there are 7 checkpoints being stored. So
> our checkpoint size is up by 7 times. We are investigating to see if we can
> checkpoint the start operator (kafka source) or end operator (BQ sink), we
> are good and check point size comes down. Hence the question, when the
> operators are executed in the same StreamTask as one chain, is it possible
> to checkpoint at operator chain or single operator level?
>
>
>
> Thanks,
>
> Kishore
>
>
> --
>
> *From:* weijie guo 
> *Sent:* Wednesday, February 1, 2023 6:59 PM
> *To:* Talat Uyarer 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Reducing Checkpoint Count for Chain Operator
>
>
>
> Hi Talat,
>
>
>
> In Flink, a checkpoint barrier will be injected from source, and then pass
> through all operators in turn. Each stateful operator will do checkpoint in
> this process, the state is managed at operator granularity, not operator
> chain. So what is the significance of checkpoint based on the granularity
> of operator chain?
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer  于2023年2月2日周四 02:20写道:
>
> Hi Weijie,
>
>
>
> Thanks for replying back.
>
>
>
> Our job is  a streaming job. The OperatorChain contains all operators that
> are executed as one chain within a single StreamTask. But each
> operator creates their own checkpoint at checkpointing time . Rather than
> creating a checkpoint per operator in checkpointing time. Can I have one
> checkpoint per OperatorChain? This is my question.
>
>
>
> Thanks
>
>
>
> On Wed, Feb 1, 2023 at 1:02 AM weijie guo 
> wrote:
>
> Hi Talat,
>
>
>
> Can you elaborate on what it means to create one checkpoint object per
> chain operator more than all operators? If you mean to do checkpoint
> independently for each task, this is not supported.
>
>
>
> Best regards,
>
> Weijie
>
>
>
>
>
> Talat Uyarer via user  于2023年2月1日周三 15:34写道:
>
> Hi,
>
>
>
> We have a job that is reading from kafka and writing some endpoints. The
> job does not have any shuffling steps.  I implement it with multiple
> steps.  Flink chained those operators in one operator in submission time.
> However I see all operators are doing checkpointing.
>
>
>
> Is there any way to create one checkpoint object per chain operator rather
> than all operators ?
>
>
>
> Thanks
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet 

Standalone cluster memory configuration

2023-02-02 Thread Theodor Wübker
Hello everyone,

I have a Standalone Custer running in a docker-swarm with a very simple 
docker-compose configuration [3].  When I run my job there with a parallelism 
greater than one, I get an out of memory error. Nothing out of the ordinary, so 
I wanted to increase the JVM heap. I did that by setting 
‘taskmanager.memory.task.heap.size’ according to [1]. However the taskmanager 
would not start, throwing an Exception saying that this configuration clashes 
with the configured total process memory - even though I had not configured 
that at all. Due to this, I could also not set the total Flink memory. 
Now I wonder, why did the TM tell me that the total process memory is already 
configured? Also, in [2]  I read that the cluster should not even start when 
neither total Flink memory nor total process memory are specified - which, as 
you can see in my configuration, I have not done [3]. 

Maybe someone can enlighten me, why it looks like I can’t configure the memory 
properly? Thanks :)

-Theo

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory
 


[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
 


[3] The compose configuration: 

  jobmanager:
image: flink:1.16.0
command: jobmanager
environment:
  - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=2
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager


  taskmanager-01:
image: flink:1.16.0
depends_on:
  - jobmanager
command: taskmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2

smime.p7s
Description: S/MIME cryptographic signature


Re: beam + flink + k8

2023-02-02 Thread P Singh
Hi Jan,

localhost:5 is not open i got this error.. I have a question if I have
deployed with the same configuration on GKE and port-forward job manager to
localhost.. can't I port forward the task manager to localhost:5. I
don't know the IP of the running pod on GKE. That's why I'm getting this
issue. please suggest.

nc: connectx to localhost port 5 (tcp) failed: Connection refused
nc: connectx to localhost port 5 (tcp) failed: Connection refused

On Thu, 2 Feb 2023 at 17:50, Jan Lukavský  wrote:

> That would suggest it is uploading (you can verify that using jnettop).
> But I'd leave this open for others to answer, because now it is purely
> Flink (not Beam) question.
>
> Best,
>
>  Jan
> On 2/2/23 10:46, bigdatadeveloper8 wrote:
>
> Hi Jan,
>
> Job manager is configured and working.. when I submit python Job to flink
> it's not showing or flink UI or simply hangs without any error.
>
>
>
> Sent from my Galaxy
>
>
>  Original message 
> From: Jan Lukavský  
> Date: 02/02/2023 15:07 (GMT+05:30)
> To: user@flink.apache.org
> Subject: Re: beam + flink + k8
>
> I'm not sure how exactly minikube exposes the jobmanager, but in GKE you
> likely need to port-forward it, e.g.
>
>  $ kubectl port-forward svc/flink-jobmanager 8081:8081
>
> This should make jobmanager accessible via localhost:8081. For production
> cases you might want to use a different approach, like Flink operator, etc.
>
> Best,
>
>  Jan
> On 2/1/23 17:08, P Singh wrote:
>
> Hi Jan,
>
> Thanks for the reply, I was able to submit the job to flink but it's
> failing due to an OOM issue so I am moving to the GKE. I got the flink UI
> there but submitted a job not appearing on flink UI. I am using the same
> script which I shared with you.. Do I need to make some changes for Google
> Kubernetes Environment?
>
> On Tue, 31 Jan 2023 at 20:20, Jan Lukavský  wrote:
>
>> The script looks good to me, did you run the SDK harness? External
>> environment needs the SDK harness to be run externally, see [1]. Generally,
>> the best option is DOCKER, but that usually does not work in k8s. For this,
>> you might try PROCESS environment and build your own docker image for
>> flink, which will contain the Beam harness, e.g. [2]. You will need to pass
>> the environment config using --environment_config={"command":
>> "/opt/apache/beam/boot"}.
>>
>> From the screenshot it seems, that the Flink UI is accessible, so this is
>> the only option that comes to my mind. Did you check logs of the Flink
>> jobmanager pod?
>>
>>  Jan
>>
>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>
>> [2]
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile
>> On 1/31/23 13:33, P Singh wrote:
>>
>> HI Jan,
>>
>> Thanks for your reply, please find attached script, I am newbie with
>> flink and minikube though i am trying to connect them by script from local
>> machine as suggested by flink kubernetes documents link
>> 
>>
>> I have changed the log level to ERROR but didn't find much... Can you
>> please help me out how to run the script from inside the pod.
>>
>> On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> can you please share the also the script itself? I'd say that the
>>> problem is that the flink jobmanager is not accessible through
>>> localhost:8081, because it runs inside the minikube. You need to expose it
>>> outside of the minikube via [1], or run the script from pod inside the
>>> minikube and access job manager via flink-jobmanager:8081. I'm surprised
>>> that the log didn't make this more obvious, though. Is it possible that you
>>> changed the default log level to ERROR? Can you try DEBUG or similar?
>>>
>>>  Jan
>>>
>>> [1] https://minikube.sigs.k8s.io/docs/handbook/accessing/
>>> On 1/30/23 18:36, P Singh wrote:
>>>
>>> Hi Jan,
>>>
>>> Yeah I am using minikube and beam image with python 3.10.
>>>
>>> Please find the attached screenshots.
>>>
>>>
>>>
>>> On Mon, 30 Jan 2023 at 21:22, Jan Lukavský  wrote:
>>>
 Hi,

 can you please share the command-line and complete output of the
 script?
 Are you using minikube? Can you share list of your running pods?

   Jan

 On 1/30/23 14:25, P Singh wrote:
 > Hi Team,
 >
 > I am trying to run beam job on top of flink on my local machine
 > (kubernetes).
 >
 >  I have flink 1.14 and beam 2.43 images both running but when i
 submit
 > the job it's not reaching to the flink cluster and getting failed
 with
 > below error.
 >
 > ERROR:apache_beam.utils.subprocess_server:Starting job service with
 > ['java', '-jar',
 >
 '/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

 > '--flink-master', 'http://localhost:8081', '--artifacts-dir',
 

Re: beam + flink + k8

2023-02-02 Thread Jan Lukavský
That would suggest it is uploading (you can verify that using jnettop). 
But I'd leave this open for others to answer, because now it is purely 
Flink (not Beam) question.


Best,

 Jan

On 2/2/23 10:46, bigdatadeveloper8 wrote:

Hi Jan,

Job manager is configured and working.. when I submit python Job to 
flink it's not showing or flink UI or simply hangs without any error.




Sent from my Galaxy


 Original message 
From: Jan Lukavský 
Date: 02/02/2023 15:07 (GMT+05:30)
To: user@flink.apache.org
Subject: Re: beam + flink + k8

I'm not sure how exactly minikube exposes the jobmanager, but in GKE 
you likely need to port-forward it, e.g.


 $ kubectl port-forward svc/flink-jobmanager 8081:8081

This should make jobmanager accessible via localhost:8081. For 
production cases you might want to use a different approach, like 
Flink operator, etc.


Best,

 Jan

On 2/1/23 17:08, P Singh wrote:

Hi Jan,

Thanks for the reply, I was able to submit the job to flink but it's 
failing due to an OOM issue so I am moving to the GKE. I got the 
flink UI there but submitted a job not appearing on flink UI. I am 
using the same script which I shared with you.. Do I need to make 
some changes for Google Kubernetes Environment?


On Tue, 31 Jan 2023 at 20:20, Jan Lukavský  wrote:

The script looks good to me, did you run the SDK harness?
External environment needs the SDK harness to be run externally,
see [1]. Generally, the best option is DOCKER, but that usually
does not work in k8s. For this, you might try PROCESS environment
and build your own docker image for flink, which will contain the
Beam harness, e.g. [2]. You will need to pass the environment
config using --environment_config={"command":
"/opt/apache/beam/boot"}.

From the screenshot it seems, that the Flink UI is accessible, so
this is the only option that comes to my mind. Did you check logs
of the Flink jobmanager pod?

 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2]

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie
with flink and minikube though i am trying to connect them by
script from local machine as suggested by flink kubernetes
documents link



I have changed the log level to ERROR but didn't find much...
Can you please help me out how to run the script from inside the
pod.

On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say
that the problem is that the flink jobmanager is not
accessible through localhost:8081, because it runs inside
the minikube. You need to expose it outside of the minikube
via [1], or run the script from pod inside the minikube and
access job manager via flink-jobmanager:8081. I'm surprised
that the log didn't make this more obvious, though. Is it
possible that you changed the default log level to ERROR?
Can you try DEBUG or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský
 wrote:

Hi,

can you please share the command-line and complete
output of the script?
Are you using minikube? Can you share list of your
running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my
local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running
but when i submit
> the job it's not reaching to the flink cluster and
getting failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting
job service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081',
'--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error
bringing up service
> 

Re: Non-temporal watermarks

2023-02-02 Thread James Sandys-Lumsdaine
I can describe a use that has been successful for me. We have a Flink workflow 
that calculates reports over many days and have it currently set up to 
recompute the last 10 days or so when recovering this "deep history" from our 
databases and then switches over to live flow to process all subsequent update 
events. I wrote this before the days of the HyrbidSource so it is literally a 
JDBC data source that queries state for the last 10 days and that stream is 
merged with a "live" stream from a db poller or Kafka stream.

In answer to your question, during recovery I have all state for the old 
business days sent with a timestamp of that business date e.g. new 
DateTime(2023, 1, 15, 0, 0, 0, UTC).getMillis() for any data associated with 
the 15th Jan 2023. Once the data source has emitted all the state for that 
date, it then emits a watermark with exactly the same timestamp as it is 
communicating downstream that all the data has been sent for that date. Then 
moves onto the next date emitting that state.

When my system starts up it records the current datetime and treats all data 
retrieved before that timestamp as being recovered state, and all data 
receieved from the live pollers/Kafka to be after that cut-off point. The live 
sources emit objects timestamped with the current time and periodically emit a 
watermark to make forward progress. I'm simplifying here but you get the point.

This pattern is useful for me because my keyed process functions are able to 
register timers to process all the data for an historic date at once - it won't 
need to fire on each message received or try to compute with missing data, but 
instead runs once all the data has been received for a date from all the 
sources. (The time is only triggered when the watermark is reached and that 
required all sources to have reached at least that point in the recovery). Once 
we have reached the startup datetime watermark the system seamlessly flips into 
live processing mode. The watermarks still trigger my timers but now we are 
processing the last ~1 minute of batched data.

So logically the meaning of a timestamp and watermark in my system always 
represents a forward moving moment in time - it is just that it means an 
historic date for data during recovery from the databases and then a current 
timestamp when the system is processing live data.

Hope that gives you some ideas and help.

James.

From: Gen Luo 
Sent: 02 February 2023 09:52
To: Jan Lukavský 
Cc: user@flink.apache.org 
Subject: Re: Non-temporal watermarks

Hi,

This is an interesting topic. I suppose the watermark is defined based on the 
event time since it's mainly used, or designed, for the event time processing. 
Flink provides the event time processing mechanism because it's widely needed. 
Every event has its event time and we usually need to group or order by the 
event time. On the other hand, this also means that we can process events from 
different sources as the event time is naturally of the same scale.

However, just as you say, technically speaking the event timestamp can be 
replaced with any other meaningful number (or event a comparable), and the 
(event time) watermark should change accordingly. If we promise this field and 
its watermark of all sources are of the same scale, we can process the 
data/event from the sources together with it just like the event time. As the 
event time processing and event time timer service doesn't rely on the actual 
time point or duration, I suppose this can be implemented by defining it as the 
event time, if it contains only positive numbers.


On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:
Hi,

I will not speak about details related to Flink specifically, the
concept of watermarks is more abstract, so I'll leave implementation
details aside.

Speaking generally, yes, there is a set of requirements that must be met
in order to be able to generate a system that uses watermarks.

The primary question is what are watermarks used for? The answer is - we
need watermarks to be able to define a partially stable order of
_events_. Event is an immutable piece of data that can be _observed_
(i.e. processed) with various consumer-dependent delays (two consumers
of the event can see the event at different processing times), or a
specific (local) timestamp. Generally an event tells us that something,
somewhere happened at given local timestamp.

Watermarks create markers in processing time of each observer, so that
the observer is able to tell if two events (e.g. event "close
time-window T1" and "new data with timestamp T2 arrived") can be ordered
(that is being able to tell which one is - globally! - preceding the other).

Having said that - there is a general algebra for "timestamps" - and
therefore watermarks. A timestamp can be any object that defines the
following operations:

  - a less-than relation <, i.e. t1 < t2: bool, this relation needs to
be 

Re: Non-temporal watermarks

2023-02-02 Thread Gen Luo
Hi,

This is an interesting topic. I suppose the watermark is defined based on
the event time since it's mainly used, or designed, for the event time
processing. Flink provides the event time processing mechanism because it's
widely needed. Every event has its event time and we usually need to group
or order by the event time. On the other hand, this also means that we can
process events from different sources as the event time is naturally of the
same scale.

However, just as you say, technically speaking the event timestamp can be
replaced with any other meaningful number (or event a comparable), and the
(event time) watermark should change accordingly. If we promise this field
and its watermark of all sources are of the same scale, we can process the
data/event from the sources together with it just like the event time. As
the event time processing and event time timer service doesn't rely on the
actual time point or duration, I suppose this can be implemented by
defining it as the event time, if it contains only positive numbers.


On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský  wrote:

> Hi,
>
> I will not speak about details related to Flink specifically, the
> concept of watermarks is more abstract, so I'll leave implementation
> details aside.
>
> Speaking generally, yes, there is a set of requirements that must be met
> in order to be able to generate a system that uses watermarks.
>
> The primary question is what are watermarks used for? The answer is - we
> need watermarks to be able to define a partially stable order of
> _events_. Event is an immutable piece of data that can be _observed_
> (i.e. processed) with various consumer-dependent delays (two consumers
> of the event can see the event at different processing times), or a
> specific (local) timestamp. Generally an event tells us that something,
> somewhere happened at given local timestamp.
>
> Watermarks create markers in processing time of each observer, so that
> the observer is able to tell if two events (e.g. event "close
> time-window T1" and "new data with timestamp T2 arrived") can be ordered
> (that is being able to tell which one is - globally! - preceding the
> other).
>
> Having said that - there is a general algebra for "timestamps" - and
> therefore watermarks. A timestamp can be any object that defines the
> following operations:
>
>   - a less-than relation <, i.e. t1 < t2: bool, this relation needs to
> be a antisymmetric, so t1 < t2 implies not t2 < t1
>
>   - a function min_timestamp_following(timestamp t1, duration):
> timestamp t2, that returns the minimal timestamp, for which t1 +
> duration < t2 (this function is actually a definition of duration)
>
> These two conditions allows to construct a working streaming processing
> system, which means there should be no problem using different
> "timestamps", provided we know how to construct the above.
>
> Using "a different number" for timestamps and watermarks seems valid in
> this sense, provided you are fine with the implicit definition of
> duration, that is currently defined as simple t2 - t1.
>
> I tried to explain why it is not good to expect that two events can be
> globally ordered and what is the actual role of watermarks in this in a
> twitter thread [1], if anyone interested.
>
> Best,
>
>   Jan
>
> [1]
>
> https://twitter.com/janl_apache/status/1478757956263071745?s=20=cMXfPHS8EjPrbF8jys43BQ
>
> On 2/2/23 00:18, Yaroslav Tkachenko wrote:
> > Hey everyone,
> >
> > I'm wondering if anyone has done any experiments trying to use
> > non-temporal watermarks? For example, a dataset may contain some kind
> > of virtual timestamp / version field that behaves just like a regular
> > timestamp (monotonically increasing, etc.), but has a different scale
> > / range.
> >
> > As far as I can see Flink assumes that the values used for event times
> > and watermark generation are actually timestamps and the Table API
> > requires you to define watermarks on TIMESTAMP columns.
> >
> > Practically speaking timestamp is just a number, so if I have a
> > "timeline" that consists of 1000 monotonically increasing integers,
> > for example, the concepts like late-arriving
> > data, bounded-out-of-orderness, etc. still work.
> >
> > Thanks for sharing any thoughts you might have on this topic!
>


Re: beam + flink + k8

2023-02-02 Thread bigdatadeveloper8
Hi Jan,
Job manager is configured and working.. when I submit python Job to flink it's 
not showing or flink UI or simply hangs without any error.


Sent from my Galaxy

 Original message From: Jan Lukavský  Date: 
02/02/2023  15:07  (GMT+05:30) To: user@flink.apache.org Subject: Re: beam + 
flink + k8 

I'm not sure how exactly minikube exposes the jobmanager, but in
  GKE you likely need to port-forward it, e.g.

  

   $ kubectl port-forward svc/flink-jobmanager 8081:8081
This should make jobmanager accessible via localhost:8081. For
  production cases you might want to use a different approach, like
  Flink operator, etc.
Best,
 Jan


On 2/1/23 17:08, P Singh wrote:



  
  
Hi Jan,



Thanks for the reply, I was able to submit the job to flink
  but it's failing due to an OOM issue so I am moving to the
  GKE. I got the flink UI there but submitted a job not
  appearing on flink UI. I am using the same script which I
  shared with you.. Do I need to make some changes for Google
  Kubernetes Environment?


  
  

  
On Tue, 31 Jan 2023 at 20:20,
  Jan Lukavský 
  wrote:



  
The script looks good to me, did you run the SDK harness?
  External environment needs the SDK harness to be run
  externally, see [1]. Generally, the best option is DOCKER,
  but that usually does not work in k8s. For this, you might
  try PROCESS environment and build your own docker image
  for flink, which will contain the Beam harness, e.g. [2].
  You will need to pass the environment config using
  --environment_config={"command": "/opt/apache/beam/boot"}.
From the screenshot it seems, that the Flink UI is
  accessible, so this is the only option that comes to my
  mind. Did you check logs of the Flink jobmanager pod?
 Jan


[1] 
https://beam.apache.org/documentation/runtime/sdk-harness-config/
[2]
  
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile


On 1/31/23 13:33, P Singh wrote:



  
HI Jan,



Thanks for your reply, please find attached script,
  I am newbie with flink and minikube though i am trying
  to connect them by script from local machine as
  suggested by flink kubernetes documents link



I have changed the log level to ERROR but didn't
  find much... Can you please help me out how to run the
  script from inside the pod.


  
  

  
On Tue, 31 Jan 2023 at
  15:40, Jan Lukavský 
  wrote:



  
Hi,
can you please share the also the script itself?
  I'd say that the problem is that the flink
  jobmanager is not accessible through
  localhost:8081, because it runs inside the
  minikube. You need to expose it outside of the
  minikube via [1], or run the script from pod
  inside the minikube and access job manager via
  flink-jobmanager:8081. I'm surprised that the log
  didn't make this more obvious, though. Is it
  possible that you changed the default log level to
  ERROR? Can you try DEBUG or similar?


 Jan


[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/


On 1/30/23 18:36, P Singh wrote:



  
Hi Jan,



Yeah I am using minikube and beam image
  with python 3.10. 





Please find the attached screenshots.








  
  

  
On Mon, 30 Jan
  2023 at 21:22, Jan Lukavský 
  wrote:


Hi,

  


Re: beam + flink + k8

2023-02-02 Thread Jan Lukavský
I'm not sure how exactly minikube exposes the jobmanager, but in GKE you 
likely need to port-forward it, e.g.


 $ kubectl port-forward svc/flink-jobmanager 8081:8081

This should make jobmanager accessible via localhost:8081. For 
production cases you might want to use a different approach, like Flink 
operator, etc.


Best,

 Jan

On 2/1/23 17:08, P Singh wrote:

Hi Jan,

Thanks for the reply, I was able to submit the job to flink but it's 
failing due to an OOM issue so I am moving to the GKE. I got the flink 
UI there but submitted a job not appearing on flink UI. I am using the 
same script which I shared with you.. Do I need to make some changes 
for Google Kubernetes Environment?


On Tue, 31 Jan 2023 at 20:20, Jan Lukavský  wrote:

The script looks good to me, did you run the SDK harness? External
environment needs the SDK harness to be run externally, see [1].
Generally, the best option is DOCKER, but that usually does not
work in k8s. For this, you might try PROCESS environment and build
your own docker image for flink, which will contain the Beam
harness, e.g. [2]. You will need to pass the environment config
using --environment_config={"command": "/opt/apache/beam/boot"}.

From the screenshot it seems, that the Flink UI is accessible, so
this is the only option that comes to my mind. Did you check logs
of the Flink jobmanager pod?

 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

[2]

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

On 1/31/23 13:33, P Singh wrote:

HI Jan,

Thanks for your reply, please find attached script, I am newbie
with flink and minikube though i am trying to connect them by
script from local machine as suggested by flink kubernetes
documents link



I have changed the log level to ERROR but didn't find much... Can
you please help me out how to run the script from inside the pod.

On Tue, 31 Jan 2023 at 15:40, Jan Lukavský  wrote:

Hi,

can you please share the also the script itself? I'd say that
the problem is that the flink jobmanager is not accessible
through localhost:8081, because it runs inside the minikube.
You need to expose it outside of the minikube via [1], or run
the script from pod inside the minikube and access job
manager via flink-jobmanager:8081. I'm surprised that the log
didn't make this more obvious, though. Is it possible that
you changed the default log level to ERROR? Can you try DEBUG
or similar?

 Jan

[1] https://minikube.sigs.k8s.io/docs/handbook/accessing/

On 1/30/23 18:36, P Singh wrote:

Hi Jan,

Yeah I am using minikube and beam image with python 3.10.

Please find the attached screenshots.



On Mon, 30 Jan 2023 at 21:22, Jan Lukavský 
wrote:

Hi,

can you please share the command-line and complete
output of the script?
Are you using minikube? Can you share list of your
running pods?

  Jan

On 1/30/23 14:25, P Singh wrote:
> Hi Team,
>
> I am trying to run beam job on top of flink on my
local machine
> (kubernetes).
>
>  I have flink 1.14 and beam 2.43 images both running
but when i submit
> the job it's not reaching to the flink cluster and
getting failed with
> below error.
>
> ERROR:apache_beam.utils.subprocess_server:Starting job
service with
> ['java', '-jar',
>

'/Users/spsingh/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.43.0.jar',

> '--flink-master', 'http://localhost:8081',
'--artifacts-dir',
>

'/var/folders/n3/dqblsr792yj4kfs7xlfmdj54gr/T/beam-tempvphhje07/artifacts6kjt60ch',

> '--job-port', '57882', '--artifact-port', '0',
'--expansion-port', '0']
> ERROR:apache_beam.utils.subprocess_server:Error
bringing up service
> Traceback (most recent call last):
>   File
>

"/Users/flink_deploy/flink_env/lib/python3.10/site-packages/apache_beam/utils/subprocess_server.py",

> line 88, in start
>     raise RuntimeError(
> RuntimeError: Service failed to start up with error 1
>
> Any help would be appreciated.


Re: Non-temporal watermarks

2023-02-02 Thread Jan Lukavský

Hi,

I will not speak about details related to Flink specifically, the 
concept of watermarks is more abstract, so I'll leave implementation 
details aside.


Speaking generally, yes, there is a set of requirements that must be met 
in order to be able to generate a system that uses watermarks.


The primary question is what are watermarks used for? The answer is - we 
need watermarks to be able to define a partially stable order of 
_events_. Event is an immutable piece of data that can be _observed_ 
(i.e. processed) with various consumer-dependent delays (two consumers 
of the event can see the event at different processing times), or a 
specific (local) timestamp. Generally an event tells us that something, 
somewhere happened at given local timestamp.


Watermarks create markers in processing time of each observer, so that 
the observer is able to tell if two events (e.g. event "close 
time-window T1" and "new data with timestamp T2 arrived") can be ordered 
(that is being able to tell which one is - globally! - preceding the other).


Having said that - there is a general algebra for "timestamps" - and 
therefore watermarks. A timestamp can be any object that defines the 
following operations:


 - a less-than relation <, i.e. t1 < t2: bool, this relation needs to 
be a antisymmetric, so t1 < t2 implies not t2 < t1


 - a function min_timestamp_following(timestamp t1, duration): 
timestamp t2, that returns the minimal timestamp, for which t1 + 
duration < t2 (this function is actually a definition of duration)


These two conditions allows to construct a working streaming processing 
system, which means there should be no problem using different 
"timestamps", provided we know how to construct the above.


Using "a different number" for timestamps and watermarks seems valid in 
this sense, provided you are fine with the implicit definition of 
duration, that is currently defined as simple t2 - t1.


I tried to explain why it is not good to expect that two events can be 
globally ordered and what is the actual role of watermarks in this in a 
twitter thread [1], if anyone interested.


Best,

 Jan

[1] 
https://twitter.com/janl_apache/status/1478757956263071745?s=20=cMXfPHS8EjPrbF8jys43BQ


On 2/2/23 00:18, Yaroslav Tkachenko wrote:

Hey everyone,

I'm wondering if anyone has done any experiments trying to use 
non-temporal watermarks? For example, a dataset may contain some kind 
of virtual timestamp / version field that behaves just like a regular 
timestamp (monotonically increasing, etc.), but has a different scale 
/ range.


As far as I can see Flink assumes that the values used for event times 
and watermark generation are actually timestamps and the Table API 
requires you to define watermarks on TIMESTAMP columns.


Practically speaking timestamp is just a number, so if I have a 
"timeline" that consists of 1000 monotonically increasing integers, 
for example, the concepts like late-arriving 
data, bounded-out-of-orderness, etc. still work.


Thanks for sharing any thoughts you might have on this topic!


Re: Flink消费消息队列写入HDFS

2023-02-02 Thread weijie guo
你好,可以使用FileSink,这个是基于新的sink API的。

Best regards,

Weijie


Howie Yang  于2023年2月2日周四 16:28写道:

> Hey,
>
>
> 最近想把消费日志写入到HDFS中,找这块的connector发现大部分都停留在使用 BucketingSink 的方式,这个好像是老版本的api了,
> 这块官方推荐的最新的方式是什么呢?
>
>
>
>
>
>
>
>
>
>
> --
>
> Best,
> Howie


What is the state of Scala wrappers?

2023-02-02 Thread Erwan Loisant
Hi,

Back in October, the Flink team announced that the Scala API was to be 
deprecated them removed. Which I think is perfectly fine, having third party 
develop Scala wrappers is a good approach.

With the announce I expected those wrapper projects to get steam, however both 
projects linked in the announcement (https://github.com/findify/flink-adt and 
https://github.com/ariskk/flink4s) don't seem much maintained, and are stuck on 
Flink 1.15.

Any team here using Flink with Scala are moving away from the official Scala 
API? Maybe there is a project that I'm missing that is getting more attentions 
than the 2 linked aboved?

Thank you!
Erwan

Flink消费消息队列写入HDFS

2023-02-02 Thread Howie Yang
Hey,


最近想把消费日志写入到HDFS中,找这块的connector发现大部分都停留在使用 BucketingSink 的方式,这个好像是老版本的api了,
这块官方推荐的最新的方式是什么呢?










--

Best,
Howie