Re: Very long launch of the Flink application in BATCH mode

2023-06-28 Thread Vladislav Keda
Hi Shammon,

When I set log.level=DEBUG I have no more logs except  *2023-06-21
14:51:30,921 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Trigger heartbeat request.*

Job freezes on stream graph generation. In STREAMING mode the job starts
fast without same problems.

ср, 28 июн. 2023 г. в 06:44, Shammon FY :

> Hi Brendan,
>
> I think you may need to confirm which stage the job is blocked, the client
> is submitting job or resourcemanage is scheduling job or tasks are
> launching in TM? May be you need provide more information to help us to
> figure the issue
>
> Best,
> Shammon FY
>
> On Tuesday, June 27, 2023, Weihua Hu  wrote:
>
>> Hi, Brendan
>>
>> It looks like it's invoking your main method referring to the log. You
>> can add more logs in the main method to figure out which part takes too
>> long.
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jun 27, 2023 at 5:06 AM Brendan Cortez <
>> brendan.cortez...@gmail.com> wrote:
>>
>>> No, I'm using a collection source + 20 same JDBC lookups + Kafka sink.
>>>
>>> On Mon, 26 Jun 2023 at 19:17, Yaroslav Tkachenko 
>>> wrote:
>>>
 Hey Brendan,

 Do you use a file source by any chance?

 On Mon, Jun 26, 2023 at 4:31 AM Brendan Cortez <
 brendan.cortez...@gmail.com> wrote:

> Hi all!
>
> I'm trying to submit a Flink Job in Application Mode in the Kubernetes
> cluster.
>
> I see some problems when an application has a big number of operators
> (more than 20 same operators) - it freezes for ~6 minutes after
> *2023-06-21 15:46:45,082 WARN
>  org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - 
> Property
> [transaction.timeout.ms ] not specified.
> Setting it to PT1H*
>  and until
>
> *2023-06-21 15:53:20,002 INFO
>  org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - 
> Disabled
> Checkpointing. Checkpointing is not supported and not needed when 
> executing
> jobs in BATCH mode.*(logs in attachment)
>
> When I set log.level=DEBUG, I see only this message each 10 seconds:
> *2023-06-21 14:51:30,921 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Trigger heartbeat request.*
>
> Please, could you help me understand the cause of this problem and how
> to fix it. I use the Flink 1.15.3 version.
>
> Thank you in advance!
>
> Best regards,
> Brendan Cortez.
>



Flink Kubernetes Application freezes in BATCH mode

2023-06-22 Thread Vladislav Keda
Hi all!

I'm trying to submit a Flink Job in Application Mode in the Kubernetes
cluster. I see some problems when an application has a big number of
operators (more than 20 same operators) - it freezes for ~6 minutes after
*2023-06-21 15:46:45,082 WARN
 org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - Property
[transaction.timeout.ms <http://transaction.timeout.ms/>] not specified.
Setting it to PT1H*
 and until

*2023-06-21 15:53:20,002 INFO
 org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - Disabled
Checkpointing. Checkpointing is not supported and not needed when executing
jobs in BATCH mode.*(logs in attachment)

When I set log.level=DEBUG, I see only this message each 10 seconds:
*2023-06-21 14:51:30,921 DEBUG
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Trigger heartbeat request.*

Please, could you help me understand the cause of this problem and how to
fix it.

Thank you in advance!

Best regards,
Vladislav Keda.


flink-k8s-app.log
Description: Binary data


Re: Default Log4j properties in Native Kubernetes

2023-06-20 Thread Vladislav Keda
Hi all again!

Please tell me if you can answer my question, thanks.

---

Best Regards,
Vladislav Keda

пт, 16 июн. 2023 г. в 16:12, Vladislav Keda <
vladislav.k...@glowbyteconsulting.com>:

> Hi all!
>
> Is it possible to change Flink* log4j-console.properties* in Native
> Kubernetes (for example in Kubernetes Application mode) without rebuilding
> the application docker image?
>
> I was trying to inject a .sh script call (in the attachment) before
> /docker-entrypoint.sh, but this workaround did not work (k8s gives me an
> exception that the log4j* files are write-locked because there is a
> configmap over them).
>
> Is there another way to change log4j* files?
>
> Thank you very much in advance!
>
> Best Regards,
> Vladislav Keda
>


Default Log4j properties in Native Kubernetes

2023-06-16 Thread Vladislav Keda
Hi all!

Is it possible to change Flink* log4j-console.properties* in Native
Kubernetes (for example in Kubernetes Application mode) without rebuilding
the application docker image?

I was trying to inject a .sh script call (in the attachment) before
/docker-entrypoint.sh, but this workaround did not work (k8s gives me an
exception that the log4j* files are write-locked because there is a
configmap over them).

Is there another way to change log4j* files?

Thank you very much in advance!

Best Regards,
Vladislav Keda
#!/usr/bin/env bash

# shellcheck disable=SC2034
LOG4J_CLI_PROPERTIES_PATH="${FLINK_HOME}/conf/log4j-cli.properties"
# shellcheck disable=SC2034
LOG4J_CONSOLE_PROPERTIES_PATH="${FLINK_HOME}/conf/log4j-console.properties"
# shellcheck disable=SC2034
LOG4J_SESSION_PROPERTIES_PATH="${FLINK_HOME}/conf/log4j-session.properties"
# shellcheck disable=SC2034
LOG4J_PROPERTIES_PATH="${FLINK_HOME}/conf/log4j.properties"

override_properties() {
  local properties_var=$1
  local properties_path_var="${properties_var}_PATH"

  local content="${!properties_var}"
  local path="${!properties_path_var}"

  if [ -n "${content}" ]; then
echo "$0: ${properties_var} env variable is set. Overwriting ${path}"
echo "${content}" > "${path}"
  else
echo "$0: ${properties_var} env variable is not set. Using Flink's ${path}"
  fi
}

override_properties "LOG4J_CLI_PROPERTIES"
override_properties "LOG4J_CONSOLE_PROPERTIES"
override_properties "LOG4J_SESSION_PROPERTIES"
override_properties "LOG4J_PROPERTIES"


Flink Interval Joins

2023-01-10 Thread Vladislav Keda
Hi,

I was trying to write alternative of query (for example)

SELECT
FROM t1 AS t1
LEFT JOIN t2 as t2
ON t1.fk = t2.key
AND t1.proctime
  BETWEEN
t2.proctime - INTERVAL ...
  AND
t2.proctime + INTERVAL ...

on DataStreams API but I found that interval join (in DataStreams API) does
not support left/full joins.

So I'm trying to understand the difference between interval join in
DataStreams API and SQL API to write my custom CoProcess realization. If I
create timers for messages waiting for a pair in the case of a left join,
will this not entail problems with waiting for all registered timers to
fire during checkpoints?

P.S. Maybe you can send the internals of SQL interval join implementation
in Java/Scala.

---

Best Regards,
Vladislav Keda


Re: Resources configuration on Kubernetes Session Cluster

2022-08-05 Thread Vladislav Keda
Thanks, Gyula!

Best Regards,
Vladislav Keda.


вт, 2 авг. 2022 г., 12:41 Gyula Fóra :

> Hi Vladislav!
>
> I am afraid there is no way to specify resources independently for jobs
> within a session cluster currently in Flink.
>
> For this I suggest using the Application Mode instead where each job can
> have its own resources.
>
> In any case you should check out the Flink Kubernetes Operator -
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> It allows you to manage session clusters, session jobs and application
> deployments very conveniently :)
>
> Cheers,
> Gyula
>
> On Tue, Aug 2, 2022 at 11:17 AM Vladislav Keda <
> vladislav.k...@glowbyteconsulting.com> wrote:
>
>> Hi,
>>
>> I'm trying to specify different TaskManager resources for different Flink
>> jobs on Kubernetes Session Cluster. Can you help me to find a way to do
>> that?
>>
>> I use this options, but Flink picks them up only when I start new
>> Kubernetes Session deployment:
>> Memory:
>> jobmanager.memory.process.size
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#jobmanager-memory-process-size>
>> , taskmanager.memory.process.size
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#taskmanager-memory-process-size>
>> CPU:
>> kubernetes.jobmanager.cpu
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-cpu>
>> , kubernetes.taskmanager.cpu
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-taskmanager-cpu>
>>
>> *FYI*
>> I deploy Flink jobs on cluster and set up specific configuration
>> parameters for jobs using
>> *org.apache.flink.client.program.rest.RestClusterClient*
>>
>> Flink version - 1.13.6.
>>
>> ---
>>
>> Best Regards,
>> Vladislav Keda
>>
>


Resources configuration on Kubernetes Session Cluster

2022-08-02 Thread Vladislav Keda
Hi,

I'm trying to specify different TaskManager resources for different Flink
jobs on Kubernetes Session Cluster. Can you help me to find a way to do
that?

I use this options, but Flink picks them up only when I start new
Kubernetes Session deployment:
Memory:
jobmanager.memory.process.size
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#jobmanager-memory-process-size>
, taskmanager.memory.process.size
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#taskmanager-memory-process-size>
CPU:
kubernetes.jobmanager.cpu
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-cpu>
, kubernetes.taskmanager.cpu
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-taskmanager-cpu>

*FYI*
I deploy Flink jobs on cluster and set up specific configuration parameters
for jobs using *org.apache.flink.client.program.rest.RestClusterClient*

Flink version - 1.13.6.

---

Best Regards,
Vladislav Keda


Re: Creating Flink SQL Row with named fields

2022-02-04 Thread Vladislav Keda
Thanks a lot!

---

Best Regards,
Vladislav Keda


чт, 3 февр. 2022 г. в 12:13, Francesco Guardiani :

> Hi,
>
> Unfortunately at the moment, creating a row with named fields is not
> possible from the ROW constructor.
>
> One solution could be to wrap it in a cast, like: CAST((f0 + 12, 'Hello
> world') AS ROW)
> Or you could create a UDF and use the @DataTypeHint to define the row
> return type, with named fields.
>
> Feel free to open an issue about that
>
> FG
>
> On Wed, Feb 2, 2022 at 5:18 PM Vladislav Keda <
> vladislav.k...@glowbyteconsulting.com> wrote:
>
>> Hi,
>>
>> I'm trying to create Row(..) using Flink SQL, but I can't assign names to
>> its fields.
>>
>>
>> *For example:*Input table1 structure:* (id INT, some_name STRING)*
>> Query:  *select *, ROW(id, some_name) as row1 from table1*
>> Output result structure:
>> *(id  INT , some_name  STRING, row1 ROW (EXPR$0 INT, EXPR$1 STRING))*
>>
>> *Each nested field has a name like EXPR$ that does not satisfy me.*
>>
>> *If I write, for example:*Input table1 structure:* (id INT, some_name
>> STRING)*
>> Query:  *select *, ROW(id as nested_id, some_name as nested_some_name)
>> as row1 from table1*
>> Output result structure: *(id  INT , some_name  STRING, row1 ROW (EXPR$0
>> INT, EXPR$1 STRING))*
>>
>>
>> *I will get an exception like: *
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Encountered "as" at line 1, column 20.Was expecting one of:")"
>> ..."," ...at
>> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>> at
>> ru.glowbyte.streaming.core.operators.internal.sql.SqlDrivenOperator.sqlQuery(SqlDrivenOperator.java:159)
>> ... 59 more*
>>
>> How can I set the name for the field?
>>
>> Flink version - 1.13.3.
>>
>> ---
>>
>> Best Regards,
>> Vladislav Keda
>>
>


Creating Flink SQL Row with named fields

2022-02-02 Thread Vladislav Keda
Hi,

I'm trying to create Row(..) using Flink SQL, but I can't assign names to
its fields.


*For example:*Input table1 structure:* (id INT, some_name STRING)*
Query:  *select *, ROW(id, some_name) as row1 from table1*
Output result structure:
*(id  INT , some_name  STRING, row1 ROW (EXPR$0 INT, EXPR$1 STRING))*

*Each nested field has a name like EXPR$ that does not satisfy me.*

*If I write, for example:*Input table1 structure:* (id INT, some_name
STRING)*
Query:  *select *, ROW(id as nested_id, some_name as nested_some_name) as
row1 from table1*
Output result structure: *(id  INT , some_name  STRING, row1 ROW (EXPR$0
INT, EXPR$1 STRING))*


*I will get an exception like: *








*Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
failed. Encountered "as" at line 1, column 20.Was expecting one of:")"
..."," ...at
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at
ru.glowbyte.streaming.core.operators.internal.sql.SqlDrivenOperator.sqlQuery(SqlDrivenOperator.java:159)
... 59 more*

How can I set the name for the field?

Flink version - 1.13.3.

---

Best Regards,
Vladislav Keda


Re: RestClusterClient locks file after calling `submitJob(JobGraph)` method on Windows OS

2020-09-13 Thread Vladislav Keda
Thanks, I missed the fact that this issue was created early.

вс, 13 сент. 2020 г., 14:58 Chesnay Schepler :

> This is a known issue: https://issues.apache.org/jira/browse/FLINK-9844
>
> On 9/11/2020 8:34 PM, Vladislav Keda wrote:
>
> Hi Flink Community,
>
> I was trying to submit a flink job on a standalone cluster
> using RestClusterClient. After waiting for job submission, I got JobID
> correctly and tried to delete the source jar file. But then I got
> the exception:
>
>
> java.nio.file.FileSystemException: /path/to/jar: Процесс не может получить
> доступ к файлу, так как этот файл занят другим процессом.
>
> at
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
> at
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
> at
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
> at
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
> at
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
> at java.nio.file.Files.delete(Files.java:1126)
> at ru.glowbyte.streaming.manager.TestApp.main(TestApp.java:57)
>
>
> See the program code below:
>
>
> public static void main(String[] args) {
> final Configuration flinkConf = new Configuration();
> flinkConf.set(RestOptions.ADDRESS, "localhost");
> flinkConf.set(RestOptions.PORT, 8081);
>
> final File jarFile = new File("/path/to/jar");
>
> try {
> final RestClusterClient client = new
> RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>
> final PackagedProgram packagedProgram =
> PackagedProgram.newBuilder()
> .setJarFile(jarFile)
> .setConfiguration(flinkConf)
> .build();
>
> final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(
> packagedProgram,
> flinkConf,
> 1,
> true);
>
> final DetachedJobExecutionResult jobExecutionResult =
> client.submitJob(jobGraph)
> .thenApply(DetachedJobExecutionResult::new)
> .get();
>
> System.out.println(jobExecutionResult.getJobID());
> } catch (Exception ex) {
> ex.printStackTrace();
> System.exit(1);
> }
>
> try {
> // failed to delete jar on Windows OS, process cannot access
> the file
> Files.delete(jarFile.toPath());
> } catch (IOException ex) {
> ex.printStackTrace();
> System.exit(1);
>     }
> }
>
> I execute this code on Windows OS. I think that after calling the
> `submitJob(JobGraph)` method, the jar file remains unclosed.
>
> Is it a bug or not? Maybe I'm doing something wrong?
>
> ---
>
> Kind Regards
> Vladislav Keda
>
>
>


RestClusterClient locks file after calling `submitJob(JobGraph)` method on Windows OS

2020-09-11 Thread Vladislav Keda
Hi Flink Community,

I was trying to submit a flink job on a standalone cluster
using RestClusterClient. After waiting for job submission, I got JobID
correctly and tried to delete the source jar file. But then I got
the exception:


java.nio.file.FileSystemException: /path/to/jar: Процесс не может получить
доступ к файлу, так как этот файл занят другим процессом.

at
sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at
sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
at
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
at
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at ru.glowbyte.streaming.manager.TestApp.main(TestApp.java:57)


See the program code below:


public static void main(String[] args) {
final Configuration flinkConf = new Configuration();
flinkConf.set(RestOptions.ADDRESS, "localhost");
flinkConf.set(RestOptions.PORT, 8081);

final File jarFile = new File("/path/to/jar");

try {
final RestClusterClient client = new
RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

final PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setConfiguration(flinkConf)
.build();

final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(
packagedProgram,
flinkConf,
1,
true);

final DetachedJobExecutionResult jobExecutionResult =
client.submitJob(jobGraph)
.thenApply(DetachedJobExecutionResult::new)
.get();

System.out.println(jobExecutionResult.getJobID());
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
}

try {
// failed to delete jar on Windows OS, process cannot access
the file
Files.delete(jarFile.toPath());
} catch (IOException ex) {
ex.printStackTrace();
System.exit(1);
}
}

I execute this code on Windows OS. I think that after calling the
`submitJob(JobGraph)` method, the jar file remains unclosed.

Is it a bug or not? Maybe I'm doing something wrong?

---

Kind Regards
Vladislav Keda