Re: aws s3 configuring error for flink image

2021-09-08 Thread Chesnay Schepler
This is a limitation of the presto version; use 
flink-s3-fs-hadoop-1.11.3.jar instead.


On 08/09/2021 20:39, Dhiru wrote:

I copied
FROM flink:1.11.3-scala_2.12-java11 RUN mkdir 
./plugins/flink-s3-fs-presto RUN cp 
./opt/flink-s3-fs-presto-1.11.3.jar   ./plugins/flink-s3-fs-presto/
then started getting this error , trying to run on aws eks and trying 
to access s3 bucket

2021-09-08 14:38:10
java.lang.UnsupportedOperationException: This s3 file system 
implementation does not support recoverable writers.
at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Unknown Source)


On Wednesday, September 8, 2021, 12:47:10 PM EDT, Chesnay Schepler 
 wrote:



you need to put the flink-s3-fs-hadoop/presto jar into a directory 
within the plugins directory, for example the final path should look 
like this:


/opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.13.1.jar

Furthermore, you only need either the hadoop or presto jar, _not_ both 
of them.


See also:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins 

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/ 



On 08/09/2021 17:10, Dhiru wrote:
yes I copied to plugin folder but not sure same jar I see in  /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt  flink-s3-fs-hadoop-1.13.1.jar metrics-datadog  
 metrics-influx metrics-prometheus  metrics-statsd
external-resource-gpu flink-s3-fs-presto-1.13.1.jar metrics-graphite  
metrics-jmx  metrics-slf4j


I need  help sooner on this

On Wednesday, September 8, 2021, 09:26:46 AM EDT, Dhiru 
  wrote:




yes I copied to plugin folder but not sure same jar I see in /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt  flink-s3-fs-hadoop-1.13.1.jar metrics-datadog  
 metrics-influx metrics-prometheus  metrics-statsd
external-resource-gpu flink-s3-fs-presto-1.13.1.jar metrics-graphite  
metrics-jmx  metrics-slf4j



On Wednesday, September 8, 2021, 02:58:38 AM EDT, Martijn Visser 
  wrote:



Hi,

Have you copied the correct JAR [1] to the plugins directory?

Best regards,

Martijn

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html 



On Wed, 8 Sept 2021 at 04:27, Dhiru > wrote:


Need to configure aws S3 getting this error
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3'. The
scheme is directly supported by Flink through the following
plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please 

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-08 Thread Piotr Nowojski
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>


Re: Duplicate copies of job in Flink UI/API

2021-09-08 Thread Piotr Nowojski
Hi Peter,

Can you provide relevant JobManager logs? And can you write down what steps
have you taken before the failure happened? Did this failure occur during
upgrading Flink, or after the upgrade etc.

Best,
Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann 
napisał(a):

> We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing some weird
> behavior after a change in jobmanager leadership: We’re seeing two copies
> of the same job, one of those is in SUSPENDED state and has a start time of
> zero. Here’s the output from the /jobs/overview endpoint:
>
> {
>
>   "jobs": [{
>
> "jid": "2db4ee6397151a1109d1ca05188a4cbb",
>
> "name": "analytics-flink-v1",
>
> "state": "RUNNING",
>
> "start-time": 1631106146284,
>
> "end-time": -1,
>
> "duration": 2954642,
>
> "last-modification": 1631106152322,
>
> "tasks": {
>
>   "total": 112,
>
>   "created": 0,
>
>   "scheduled": 0,
>
>   "deploying": 0,
>
>   "running": 112,
>
>   "finished": 0,
>
>   "canceling": 0,
>
>   "canceled": 0,
>
>   "failed": 0,
>
>   "reconciling": 0
>
> }
>
>   }, {
>
> "jid": "2db4ee6397151a1109d1ca05188a4cbb",
>
> "name": "analytics-flink-v1",
>
> "state": "SUSPENDED",
>
> "start-time": 0,
>
> "end-time": -1,
>
> "duration": 1631105900760,
>
> "last-modification": 0,
>
> "tasks": {
>
>   "total": 0,
>
>   "created": 0,
>
>   "scheduled": 0,
>
>   "deploying": 0,
>
>   "running": 0,
>
>   "finished": 0,
>
>   "canceling": 0,
>
>   "canceled": 0,
>
>   "failed": 0,
>
>   "reconciling": 0
>
> }
>
>   }]
>
> }
>
>
>
> Has anyone seen this behavior before?
>
>
>
> Thanks,
>
> Peter
>


Re: Allocation-preserving scheduling and task-local recovery

2021-09-08 Thread Xiang Zhang
We also have this configuration set in case it makes any difference when 
allocation tasks: cluster.evenly-spread-out-slots.

On 2021/09/08 18:09:52, Xiang Zhang  wrote: 
> Hello,
> 
> We have an app running on Flink 1.10.2 deployed in standalone mode. We
> enabled task-local recovery by setting both *state.backend.local-recovery *and
> *state.backend.rocksdb.localdir*. The app has over 100 task managers and 2
> job managers (active and passive).
> 
> This is what we have observed. When we restarted a task manager, all tasks
> got canceled (due to the default failover configuration
> ).
> Then these tasks were re-distributed among the task manager (e.g. some
> tasks manager have more slots used than before restart). This caused all
> task managers to download state from remote storage all over again.
> 
> The same thing happened when we restarted a job manager. The job manager
> failed over to the passive one successfully, however all tasks were
> canceled and reallocated among the task managers again.
> 
> My understanding is that if task-local recovery is enabled, Flink will try
> to enable sticky assignment of tasks to previous task managers they run on.
> This doesn't seem to be the case. My question is how we can enable
> this allocation-preserving
> scheduling
> 
> when Flink handles failures.
> 
> Thanks,
> Xiang
> 


Re: aws s3 configuring error for flink image

2021-09-08 Thread Dhiru
 I copied FROM flink:1.11.3-scala_2.12-java11
RUN mkdir ./plugins/flink-s3-fs-presto
RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar   ./plugins/flink-s3-fs-presto/
then started getting this error , trying to run on aws eks and trying to access 
s3 bucket 2021-09-08 14:38:10java.lang.UnsupportedOperationException: This s3 
file system implementation does not support recoverable writers.at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) 
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)at 
java.base/java.lang.Thread.run(Unknown Source)


On Wednesday, September 8, 2021, 12:47:10 PM EDT, Chesnay Schepler 
 wrote:  
 
  you need to put the flink-s3-fs-hadoop/presto jar into a directory within the 
plugins directory, for example the final path should look like this:
  
  /opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.13.1.jar
  
  Furthermore, you only need either the hadoop or presto jar, _not_ both of 
them.
  
  See also: 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
  
  On 08/09/2021 17:10, Dhiru wrote:
  
 
   yes I copied to plugin folder but not sure same jar I see in  /opt as well 
by default  
   root@d852f125da1f:/opt/flink/plugins# ls README.txt             
flink-s3-fs-hadoop-1.13.1.jar  metrics-datadog   metrics-influx  
metrics-prometheus  metrics-statsd external-resource-gpu  
flink-s3-fs-presto-1.13.1.jar  metrics-graphite  metrics-jmx     metrics-slf4j  
 
  I need  help sooner on this  
  On Wednesday, September 8, 2021, 09:26:46 AM EDT, Dhiru 
 wrote:  
  
  
  yes I copied to plugin folder but not sure same jar I see in  /opt as well by 
default  
root@d852f125da1f:/opt/flink/plugins# ls README.txt             
flink-s3-fs-hadoop-1.13.1.jar  metrics-datadog   metrics-influx  
metrics-prometheus  metrics-statsd external-resource-gpu  
flink-s3-fs-presto-1.13.1.jar  metrics-graphite  metrics-jmx     metrics-slf4j  
  
   On Wednesday, September 8, 2021, 02:58:38 AM EDT, Martijn Visser 
 wrote:  
  
 Hi, 
  Have you copied the correct JAR [1] to the plugins directory? 
  Best regards, 
  Martijn 
  [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html
  
   On Wed, 8 Sept 2021 at 04:27, Dhiru  wrote:
  
 Need to configure aws S3 getting this error  
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a 
file system implementation for scheme 's3'. The scheme is directly supported by 
Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. 
Please ensure that each plugin resides within its own subfolder within the 
plugins directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-

Allocation-preserving scheduling and task-local recovery

2021-09-08 Thread Xiang Zhang
Hello,

We have an app running on Flink 1.10.2 deployed in standalone mode. We
enabled task-local recovery by setting both *state.backend.local-recovery *and
*state.backend.rocksdb.localdir*. The app has over 100 task managers and 2
job managers (active and passive).

This is what we have observed. When we restarted a task manager, all tasks
got canceled (due to the default failover configuration
).
Then these tasks were re-distributed among the task manager (e.g. some
tasks manager have more slots used than before restart). This caused all
task managers to download state from remote storage all over again.

The same thing happened when we restarted a job manager. The job manager
failed over to the passive one successfully, however all tasks were
canceled and reallocated among the task managers again.

My understanding is that if task-local recovery is enabled, Flink will try
to enable sticky assignment of tasks to previous task managers they run on.
This doesn't seem to be the case. My question is how we can enable
this allocation-preserving
scheduling

when Flink handles failures.

Thanks,
Xiang


Re: aws s3 configuring error for flink image

2021-09-08 Thread Chesnay Schepler
you need to put the flink-s3-fs-hadoop/presto jar into a directory 
within the plugins directory, for example the final path should look 
like this:


/opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.13.1.jar

Furthermore, you only need either the hadoop or presto jar, _not_ both 
of them.


See also:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/

On 08/09/2021 17:10, Dhiru wrote:
yes I copied to plugin folder but not sure same jar I see in  /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt             flink-s3-fs-hadoop-1.13.1.jar metrics-datadog  
 metrics-influx  metrics-prometheus metrics-statsd
external-resource-gpu  flink-s3-fs-presto-1.13.1.jar metrics-graphite  
metrics-jmx     metrics-slf4j


I need  help sooner on this

On Wednesday, September 8, 2021, 09:26:46 AM EDT, Dhiru 
 wrote:




yes I copied to plugin folder but not sure same jar I see in  /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt  flink-s3-fs-hadoop-1.13.1.jar  metrics-datadog 
 metrics-influx  metrics-prometheus metrics-statsd
external-resource-gpu flink-s3-fs-presto-1.13.1.jar  metrics-graphite 
metrics-jmx     metrics-slf4j



On Wednesday, September 8, 2021, 02:58:38 AM EDT, Martijn Visser 
 wrote:



Hi,

Have you copied the correct JAR [1] to the plugins directory?

Best regards,

Martijn

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html 



On Wed, 8 Sept 2021 at 04:27, Dhiru > wrote:


Need to configure aws S3 getting this error
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3'. The
scheme is directly supported by Flink through the following
plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure
that each plugin resides within its own subfolder within the
plugins directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html

for more information. If you want to use a Hadoop file system for
that scheme, please add the scheme to the configuration
fs.allowed-fallback-filesystems. For a full list of supported file
systems, please see
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
.
at

org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
at

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at

org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at

org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at

org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Unknown Source)
image used flink:1.11.3-scala_2.12-java11
some of the part which I added for flink-conf.yaml
  # REQUIRED: set storage location for job metadata in remote storage
    high-availability.storageDir: s3://msc-sandbox-flink/test
    state.backend: filesystem
    state.checkpoints.dir: s3://msc-sandbox-flink/

Re: aws s3 configuring error for flink image

2021-09-08 Thread Dhiru
 yes I copied to plugin folder but not sure same jar I see in  /opt as well by 
default 
root@d852f125da1f:/opt/flink/plugins# lsREADME.txt             
flink-s3-fs-hadoop-1.13.1.jar  metrics-datadog   metrics-influx  
metrics-prometheus  metrics-statsdexternal-resource-gpu  
flink-s3-fs-presto-1.13.1.jar  metrics-graphite  metrics-jmx     metrics-slf4j
I need  help sooner on this 
On Wednesday, September 8, 2021, 09:26:46 AM EDT, Dhiru 
 wrote:  
 
  
yes I copied to plugin folder but not sure same jar I see in  /opt as well by 
default 
root@d852f125da1f:/opt/flink/plugins# lsREADME.txt             
flink-s3-fs-hadoop-1.13.1.jar  metrics-datadog   metrics-influx  
metrics-prometheus  metrics-statsdexternal-resource-gpu  
flink-s3-fs-presto-1.13.1.jar  metrics-graphite  metrics-jmx     metrics-slf4j

On Wednesday, September 8, 2021, 02:58:38 AM EDT, Martijn Visser 
 wrote:  
 
 Hi,
Have you copied the correct JAR [1] to the plugins directory?
Best regards,
Martijn
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html
On Wed, 8 Sept 2021 at 04:27, Dhiru  wrote:

 Need to configure aws S3 getting this error 
 org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find 
a file system implementation for scheme 's3'. The scheme is directly supported 
by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. 
Please ensure that each plugin resides within its own subfolder within the 
plugins directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473) 
   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) 
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)at 
java.base/java.lang.Thread.run(Unknown Source)
image used flink:1.11.3-scala_2.12-java11
some of the part which I added for flink-conf.yaml  # REQUIRED: set storage 
location for job metadata in remote storage    high-availability.storageDir: 
s3://msc-sandbox-flink/test    state.backend: filesystem    
state.checkpoints.dir: s3://msc-sandbox-flink/test    
state.backend.fs.checkpointdir: s3://msc-sandbox-flink/test    
s3.path.style.access: true
I am trying to deploy jobmanager and taskmanager using eks , please let me know 
if I need to do any more support for adding s3 




Duplicate copies of job in Flink UI/API

2021-09-08 Thread Peter Westermann
We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing some weird 
behavior after a change in jobmanager leadership: We’re seeing two copies of 
the same job, one of those is in SUSPENDED state and has a start time of zero. 
Here’s the output from the /jobs/overview endpoint:
{
  "jobs": [{
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "RUNNING",
"start-time": 1631106146284,
"end-time": -1,
"duration": 2954642,
"last-modification": 1631106152322,
"tasks": {
  "total": 112,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 112,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }, {
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "SUSPENDED",
"start-time": 0,
"end-time": -1,
"duration": 1631105900760,
"last-modification": 0,
"tasks": {
  "total": 0,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 0,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }]
}

Has anyone seen this behavior before?

Thanks,
Peter


Documentation for deep diving into flink (data-streaming) job restart process

2021-09-08 Thread Puneet Duggal
Hi,

So for past 2-3 days i have been looking for documentation which elaborates how 
flink takes care of restarting the data streaming job. I know all the restart 
and failover strategies but wanted to know how different components (Job 
Manager, Task Manager etc) play a role while restarting the flink data 
streaming job. 

I am asking this because recently in production.. when i restarted a task 
manger, all the jobs running on it, instead of getting restarted, disappeared. 
Within flink UI, couldn't tack those jobs in completed jobs as well. Logs also 
couldnt provide me with good enough information.

Also if anyone can tell me what is the role of /tmp/executionGraphStore  folder 
in Job Manager machine.

Thanks




State processor API very slow reading a keyed state with RocksDB

2021-09-08 Thread David Causse
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a
long value. Generating the bootstrap state from a CSV file with 100M
entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
allowed). But another job that does the opposite (converts this state into
a CSV file) takes several hours. I would have expected these two job
runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This program has 3
jobs:
- CreateState: generate a keyed state (string->long) using the state
processor api
- StreamJob: reads all the keys using a StreamingExecutionEnvironment
- ReadState: reads all the keys using the state processor api

Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
StreamJob are done in less than a minute.
ReadState is much slower (> 30minutes) on my system. The RocksDB state
appears to be restored relatively quickly but after that the slots are
performing at very different speeds. Some slots finish quickly but some
others struggle to advance.
Looking at the thread dumps I always see threads in org.rocksdb.RocksDB.get:

"DataSource (at readKeyedState(ExistingSavepoint.java:314)
(org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
RUNNABLE
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:2084)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)

It seems suspiciously slow to me and I'm wondering if I'm missing something
in the way the state processor api works.

Thanks for your help!

David.

1: https://github.com/nomoa/rocksdb-state-processor-test


Re: Required built-in function [plus] could not be found in any catalog.

2021-09-08 Thread Timo Walther

Hi,

did you try to use a different order? Core module first and then Hive 
module?


The compatibility layer should work sufficiently for regular Hive UDFs 
that don't aggregate data. Hive aggregation functions should work well 
in batch scenarios. However, for streaming pipeline the aggregate 
functions need to be able to consume updates (such as retraction in your 
case).


In summary: Ideally, for simply stuff such as SUM or COUNT, you should 
use the core functions instead of Hive one. Using Hive agg functions in 
streaming could lead to issues if the input operator is not insert-only.


Regards,
Timo

On 08.09.21 06:47, vtygoss wrote:


Hi, Flink Community!


i met a problem using flink 1.12.0 standalone cluster with hive catalog.


scene 1:

- module: hive module

- execute sql: select sum(1) from xxx

- exception: *org.apache.flink.table.api.TableException: Required 
built-in function [plus] could not be found in any catalog.*



scene 2:

- module: hive module and core module

- execute sql: select sum(1)

- exception: *org.apache.flink.table.api.ValidationException: Could not 
find an implementation method 'retract' in class 'class 
org.apache.flink.table.functions.hive.HiveGenericUDAF' for function 
'sum' that matches the following signature:*


*void 
retract(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer, 
java.lang.Integer)*



scene 3:

- module: core module

- execute sql: select sum(1)

- no exception, but hive udf is invalid.



so is there a solution to use both hive udf and avoid these exceptions?


Thank you for any suggestions.


Best Regards!