Re: rocksdb max open file descriptor issue crashed application

2020-02-11 Thread Apoorv Upadhyay
Hi,

Below is the error I am getting :

2020-02-08 05:40:24,543 INFO  org.apache.flink.runtime.taskmanager.Task
   - order-steamBy-api-order-ip (3/6)
(34c7b05d5a75dbbcc5718acf6b18) switched from RUNNING to CANCELING.
2020-02-08 05:40:24,543 INFO  org.apache.flink.runtime.taskmanager.Task
   - Triggering cancellation of task code
order-steamBy-api-order-ip (3/6) (34c7b05d5a75dbbcc5718acf6b18).
2020-02-08 05:40:24,543 ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
Caught unexpected exception.
java.io.IOException: Error while opening RocksDB instance.
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:740)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.rocksdb.RocksDBException: While open directory:
/hadoop/yarn/local/usercache/flink/appcache/application_1580464300238_0045/flink-io-d947dea6-270b-44c0-94ca-4a49dbf02f52/job_97167effbb11a8e9ffcba36be7e4da80_op_CoStreamFlatMap_51abbbda2947171827fd9e53509c2fb4__4_6__uuid_3f8c7b20-6d17-43ad-a016-8d08f7ed9d50/db:
Too many open files
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:286)
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66)
... 17 more
2020-02-08 05:40:24,544 INFO  org.apache.flink.runtime.taskmanager.Task
   - order-status-mapping-join (4/6)
(4409b4e2d93f0441100f0f1575a1dcb9) switched from CANCELING to CANCELED.
2020-02-08 05:40:24,544 INFO  org.apache.flink.runtime.taskmanager.Task
   - Freeing task resources for order-status-mapping-join (4/6)
(4409b4e2d93f0441100f0f1575a1dcb9).
2020-02-08 05:40:24,543 ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
Caught unexpected exception.
java.io.IOException: Error while opening RocksDB instance.
at
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74)
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:268)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at

Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Becket Qin
+1 (binding)

- verified signature
- Ran word count example successfully.

Thanks,

Jiangjie (Becket) Qin

On Wed, Feb 12, 2020 at 1:29 PM Jark Wu  wrote:

> +1
>
> - checked/verified signatures and hashes
> - Pip installed the package successfully: pip install
> apache-flink-1.9.2.tar.gz
> - Run word count example successfully through the documentation [1].
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html
>
> On Tue, 11 Feb 2020 at 22:00, Hequn Cheng  wrote:
>
> > +1 (non-binding)
> >
> > - Check signature and checksum.
> > - Install package successfully with Pip under Python 3.7.4.
> > - Run wordcount example successfully under Python 3.7.4.
> >
> > Best, Hequn
> >
> > On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:
> >
> > > +1 (non-binding)
> > >
> > > - Verified the signature and checksum
> > > - Pip installed the package successfully: pip install
> > > apache-flink-1.9.2.tar.gz
> > > - Run word count example successfully.
> > >
> > > Regards,
> > > Dian
> > >
> > > 在 2020年2月11日,上午11:44,jincheng sun  写道:
> > >
> > >
> > > +1 (binding)
> > >
> > > - Install the PyFlink by `pip install` [SUCCESS]
> > > - Run word_count in both command line and IDE [SUCCESS]
> > >
> > > Best,
> > > Jincheng
> > >
> > >
> > >
> > > Wei Zhong  于2020年2月11日周二 上午11:17写道:
> > >
> > >> Hi,
> > >>
> > >> Thanks for driving this, Jincheng.
> > >>
> > >> +1 (non-binding)
> > >>
> > >> - Verified signatures and checksums.
> > >> - Verified README.md and setup.py.
> > >> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and
> > Python
> > >> 3.7.5 successfully.
> > >> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> > >> `pyflink-shell.sh local` and try the examples in the help message, run
> > well
> > >> and no exception.
> > >> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
> > >> run well and no exception.
> > >>
> > >> Best,
> > >> Wei
> > >>
> > >>
> > >> 在 2020年2月10日,19:12,jincheng sun  写道:
> > >>
> > >> Hi everyone,
> > >>
> > >> Please review and vote on the release candidate #1 for the PyFlink
> > >> version 1.9.2, as follows:
> > >>
> > >> [ ] +1, Approve the release
> > >> [ ] -1, Do not approve the release (please provide specific comments)
> > >>
> > >> The complete staging area is available for your review, which
> includes:
> > >>
> > >> * the official Apache binary convenience releases to be deployed to
> > >> dist.apache.org [1], which are signed with the key with fingerprint
> > >> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source
> code
> > [3].
> > >>
> > >> The vote will be open for at least 72 hours. It is adopted by majority
> > >> approval, with at least 3 PMC affirmative votes.
> > >>
> > >> Thanks,
> > >> Jincheng
> > >>
> > >> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> > >> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> > >> [3] https://github.com/apache/flink/tree/release-1.9.2
> > >>
> > >>
> > >
> >
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Becket Qin
+1 (binding)

- verified signature
- Ran word count example successfully.

Thanks,

Jiangjie (Becket) Qin

On Wed, Feb 12, 2020 at 1:29 PM Jark Wu  wrote:

> +1
>
> - checked/verified signatures and hashes
> - Pip installed the package successfully: pip install
> apache-flink-1.9.2.tar.gz
> - Run word count example successfully through the documentation [1].
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html
>
> On Tue, 11 Feb 2020 at 22:00, Hequn Cheng  wrote:
>
> > +1 (non-binding)
> >
> > - Check signature and checksum.
> > - Install package successfully with Pip under Python 3.7.4.
> > - Run wordcount example successfully under Python 3.7.4.
> >
> > Best, Hequn
> >
> > On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:
> >
> > > +1 (non-binding)
> > >
> > > - Verified the signature and checksum
> > > - Pip installed the package successfully: pip install
> > > apache-flink-1.9.2.tar.gz
> > > - Run word count example successfully.
> > >
> > > Regards,
> > > Dian
> > >
> > > 在 2020年2月11日,上午11:44,jincheng sun  写道:
> > >
> > >
> > > +1 (binding)
> > >
> > > - Install the PyFlink by `pip install` [SUCCESS]
> > > - Run word_count in both command line and IDE [SUCCESS]
> > >
> > > Best,
> > > Jincheng
> > >
> > >
> > >
> > > Wei Zhong  于2020年2月11日周二 上午11:17写道:
> > >
> > >> Hi,
> > >>
> > >> Thanks for driving this, Jincheng.
> > >>
> > >> +1 (non-binding)
> > >>
> > >> - Verified signatures and checksums.
> > >> - Verified README.md and setup.py.
> > >> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and
> > Python
> > >> 3.7.5 successfully.
> > >> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> > >> `pyflink-shell.sh local` and try the examples in the help message, run
> > well
> > >> and no exception.
> > >> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
> > >> run well and no exception.
> > >>
> > >> Best,
> > >> Wei
> > >>
> > >>
> > >> 在 2020年2月10日,19:12,jincheng sun  写道:
> > >>
> > >> Hi everyone,
> > >>
> > >> Please review and vote on the release candidate #1 for the PyFlink
> > >> version 1.9.2, as follows:
> > >>
> > >> [ ] +1, Approve the release
> > >> [ ] -1, Do not approve the release (please provide specific comments)
> > >>
> > >> The complete staging area is available for your review, which
> includes:
> > >>
> > >> * the official Apache binary convenience releases to be deployed to
> > >> dist.apache.org [1], which are signed with the key with fingerprint
> > >> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source
> code
> > [3].
> > >>
> > >> The vote will be open for at least 72 hours. It is adopted by majority
> > >> approval, with at least 3 PMC affirmative votes.
> > >>
> > >> Thanks,
> > >> Jincheng
> > >>
> > >> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> > >> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> > >> [3] https://github.com/apache/flink/tree/release-1.9.2
> > >>
> > >>
> > >
> >
>


Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-02-11 Thread Salva Alcántara
I am working on a `CoProcessFunction` that uses a third party library for
detecting certain patterns of events based on some rules. So, in the end,
the `ProcessElement1` method is basically forwarding the events to this
library and registering a callback so that, when a match is detected, the
CoProcessFunction can emit an output event. For achieving this, the callback
relies on a reference to the `out: Collector[T]` parameter in
`ProcessElement1`.

Having said that, I am not sure whether this use case is well-supported by
Flink, since:

1. There might be multiple threads spanned by the third party library (let's
I have not any control over the amount of threads spanned, this is decided
by the library)
2. I am not sure whether `out` might be recreated or something by Flink at
some point, invalidating the references in the callbacks, making them crash

So far I have not observed any issues, but I have just run my program in the
small. It would be great to hear from the experts whether my approach is
valid or not.

PS: Also posted in
https://stackoverflow.com/questions/60181678/using-multithreaded-library-within-processfunction-with-callbacks-relying-on-the



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Jark Wu
+1

- checked/verified signatures and hashes
- Pip installed the package successfully: pip install
apache-flink-1.9.2.tar.gz
- Run word count example successfully through the documentation [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html

On Tue, 11 Feb 2020 at 22:00, Hequn Cheng  wrote:

> +1 (non-binding)
>
> - Check signature and checksum.
> - Install package successfully with Pip under Python 3.7.4.
> - Run wordcount example successfully under Python 3.7.4.
>
> Best, Hequn
>
> On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:
>
> > +1 (non-binding)
> >
> > - Verified the signature and checksum
> > - Pip installed the package successfully: pip install
> > apache-flink-1.9.2.tar.gz
> > - Run word count example successfully.
> >
> > Regards,
> > Dian
> >
> > 在 2020年2月11日,上午11:44,jincheng sun  写道:
> >
> >
> > +1 (binding)
> >
> > - Install the PyFlink by `pip install` [SUCCESS]
> > - Run word_count in both command line and IDE [SUCCESS]
> >
> > Best,
> > Jincheng
> >
> >
> >
> > Wei Zhong  于2020年2月11日周二 上午11:17写道:
> >
> >> Hi,
> >>
> >> Thanks for driving this, Jincheng.
> >>
> >> +1 (non-binding)
> >>
> >> - Verified signatures and checksums.
> >> - Verified README.md and setup.py.
> >> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and
> Python
> >> 3.7.5 successfully.
> >> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> >> `pyflink-shell.sh local` and try the examples in the help message, run
> well
> >> and no exception.
> >> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
> >> run well and no exception.
> >>
> >> Best,
> >> Wei
> >>
> >>
> >> 在 2020年2月10日,19:12,jincheng sun  写道:
> >>
> >> Hi everyone,
> >>
> >> Please review and vote on the release candidate #1 for the PyFlink
> >> version 1.9.2, as follows:
> >>
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific comments)
> >>
> >> The complete staging area is available for your review, which includes:
> >>
> >> * the official Apache binary convenience releases to be deployed to
> >> dist.apache.org [1], which are signed with the key with fingerprint
> >> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code
> [3].
> >>
> >> The vote will be open for at least 72 hours. It is adopted by majority
> >> approval, with at least 3 PMC affirmative votes.
> >>
> >> Thanks,
> >> Jincheng
> >>
> >> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> >> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> >> [3] https://github.com/apache/flink/tree/release-1.9.2
> >>
> >>
> >
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Jark Wu
+1

- checked/verified signatures and hashes
- Pip installed the package successfully: pip install
apache-flink-1.9.2.tar.gz
- Run word count example successfully through the documentation [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html

On Tue, 11 Feb 2020 at 22:00, Hequn Cheng  wrote:

> +1 (non-binding)
>
> - Check signature and checksum.
> - Install package successfully with Pip under Python 3.7.4.
> - Run wordcount example successfully under Python 3.7.4.
>
> Best, Hequn
>
> On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:
>
> > +1 (non-binding)
> >
> > - Verified the signature and checksum
> > - Pip installed the package successfully: pip install
> > apache-flink-1.9.2.tar.gz
> > - Run word count example successfully.
> >
> > Regards,
> > Dian
> >
> > 在 2020年2月11日,上午11:44,jincheng sun  写道:
> >
> >
> > +1 (binding)
> >
> > - Install the PyFlink by `pip install` [SUCCESS]
> > - Run word_count in both command line and IDE [SUCCESS]
> >
> > Best,
> > Jincheng
> >
> >
> >
> > Wei Zhong  于2020年2月11日周二 上午11:17写道:
> >
> >> Hi,
> >>
> >> Thanks for driving this, Jincheng.
> >>
> >> +1 (non-binding)
> >>
> >> - Verified signatures and checksums.
> >> - Verified README.md and setup.py.
> >> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and
> Python
> >> 3.7.5 successfully.
> >> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> >> `pyflink-shell.sh local` and try the examples in the help message, run
> well
> >> and no exception.
> >> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
> >> run well and no exception.
> >>
> >> Best,
> >> Wei
> >>
> >>
> >> 在 2020年2月10日,19:12,jincheng sun  写道:
> >>
> >> Hi everyone,
> >>
> >> Please review and vote on the release candidate #1 for the PyFlink
> >> version 1.9.2, as follows:
> >>
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific comments)
> >>
> >> The complete staging area is available for your review, which includes:
> >>
> >> * the official Apache binary convenience releases to be deployed to
> >> dist.apache.org [1], which are signed with the key with fingerprint
> >> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code
> [3].
> >>
> >> The vote will be open for at least 72 hours. It is adopted by majority
> >> approval, with at least 3 PMC affirmative votes.
> >>
> >> Thanks,
> >> Jincheng
> >>
> >> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> >> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> >> [3] https://github.com/apache/flink/tree/release-1.9.2
> >>
> >>
> >
>


Dedup all data in stream

2020-02-11 Thread Akshay Shinde
Hi Community 

In our Flink job, in source we are creating our own stream to process n number 
of objects per 2 minutes. And in process function for each object from 
generated source stream we are doing some operation which we expect to get 
finished in 2 minutes.

Every 2 minutes we are generating same ’N’ objects in stream which process 
function will process.  But in some cases process function is taking longer 
time around 10 minutes. In this case stream will have 5 number of sets for ’N’ 
objects as process function is waiting for 10 minutes as source is adding ’N’ 
objects in stream at every 2 minutes. Problem is we don’t want to process these 
objects 5 times, we want it to process only once for the latest ’N’ objects.   

This lag can be more or less from process function which results in lag from 
source to process function in job execution.


Thanks in advance !!!

Flink complaining when trying to write to s3 in Parquet format

2020-02-11 Thread Fatima Omer
I have a java app that is using a flink SQL query to perform aggregations
on a data stream being read in from Kafka. Attached is the java file for
reference.

The query results are being written to s3. I can write successfully in Json
format but when I try to use Parquet format, flink complains that min_ts is
an optional group. I have verified that min_ts can never be null in our
scheme of things.

Would appreciate help on this. Thanks!

Stack trace:

Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot
write a schema with an empty group: optional group min_ts {

}

at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)

at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)

at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)

at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)

at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)

at 
org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)

at 
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:233)

at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)

at 
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)

at 
com.dlvr.pipeline.falcon.sink.ParquetWriterSink.createAvroParquetWriter(ParquetWriterSink.java:37)

at 
com.dlvr.pipeline.falcon.sink.ParquetWriterSink.lambda$forReflectRecord$3c375096$1(ParquetWriterSink.java:48)

at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:222)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)

... 50 more


StatsPipeline.java
Description: Binary data


Re: Aggregation for last n seconds for each event

2020-02-11 Thread Fanbin Bu
can u do
RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?

On Tue, Feb 11, 2020 at 12:15 PM oleg  wrote:

> Hi Community,
>
> I do streaming in event time and I want to preserve ordering and late
> events. I have a use case where I need to fire an aggregation function
> for events of last n seconds(time units in general) for every incoming
> event.
>
> It seems to me that windowing is not suitable since it may be expressed
> either in time or in events count, not "last n seconds for each single
> event".
>
> Is there an idiomatic way to do this? Any examples or help are
> appreciated. Thanks in advance.
>
>
> Best regards,
>
> Oleg Bonar
>
>


Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Vishwas Siravara
Hi Khachatryan,
Thanks for your reply. Can you help me understand how it works with hdfs
specifically , even a link to a document will help.


Best,
Vishwas

On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Vishwas,
>
> Yes, Streaming File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara 
> wrote:
>
>> Hi all,
>> I want to use the StreamingFile sink for writing data to hdfs. Can I
>> achieve exactly once semantics with this sink ?
>>
>>
>> Best,
>> HW.
>>
>


Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

2020-02-11 Thread John Smith
Just wondering is this on the client side in the flink Job? I rebooted the
task and the job deployed correctly on another node.

Is there a specific ulimit that we should set for flink tasks nodes?

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:650)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too
many open files
at org.apache.kafka.common.network.Selector.(Selector.java:154)
at org.apache.kafka.common.network.Selector.(Selector.java:188)
at org.apache.kafka.common.network.Selector.(Selector.java:192)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:722)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
at
sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.(Selector.java:152)
... 14 more


Aggregation for last n seconds for each event

2020-02-11 Thread oleg

Hi Community,

I do streaming in event time and I want to preserve ordering and late 
events. I have a use case where I need to fire an aggregation function 
for events of last n seconds(time units in general) for every incoming 
event.


It seems to me that windowing is not suitable since it may be expressed 
either in time or in events count, not "last n seconds for each single 
event".


Is there an idiomatic way to do this? Any examples or help are 
appreciated. Thanks in advance.



Best regards,

Oleg Bonar



Re: rocksdb max open file descriptor issue crashed application

2020-02-11 Thread Congxian Qiu
Hi
>From the given description, you use RocksDBStateBackend, and will always
open 20k files in one machine, and app suddenly opened 35K files than
crashed.
Could you please share what are the opened files?   and what the exception
(given the full taskmanager.log maybe helpful)

Best,
Congxian


ApoorvK  于2020年2月11日周二 下午5:22写道:

> flink app is crashing due to "too many file opens" issue , currently app is
> having 300 operator and 60GB is the state size. suddenly app is opening 35k
> around files which was 20k few weeks before, hence app is crashing, I have
> updated the machine as well as yarn limit to 60k hoping it will not crash
> again.
> Please suggest if there is any alternative solution for this
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Hequn Cheng
+1 (non-binding)

- Check signature and checksum.
- Install package successfully with Pip under Python 3.7.4.
- Run wordcount example successfully under Python 3.7.4.

Best, Hequn

On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:

> +1 (non-binding)
>
> - Verified the signature and checksum
> - Pip installed the package successfully: pip install
> apache-flink-1.9.2.tar.gz
> - Run word count example successfully.
>
> Regards,
> Dian
>
> 在 2020年2月11日,上午11:44,jincheng sun  写道:
>
>
> +1 (binding)
>
> - Install the PyFlink by `pip install` [SUCCESS]
> - Run word_count in both command line and IDE [SUCCESS]
>
> Best,
> Jincheng
>
>
>
> Wei Zhong  于2020年2月11日周二 上午11:17写道:
>
>> Hi,
>>
>> Thanks for driving this, Jincheng.
>>
>> +1 (non-binding)
>>
>> - Verified signatures and checksums.
>> - Verified README.md and setup.py.
>> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python
>> 3.7.5 successfully.
>> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
>> `pyflink-shell.sh local` and try the examples in the help message, run well
>> and no exception.
>> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
>> run well and no exception.
>>
>> Best,
>> Wei
>>
>>
>> 在 2020年2月10日,19:12,jincheng sun  写道:
>>
>> Hi everyone,
>>
>> Please review and vote on the release candidate #1 for the PyFlink
>> version 1.9.2, as follows:
>>
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> The complete staging area is available for your review, which includes:
>>
>> * the official Apache binary convenience releases to be deployed to
>> dist.apache.org [1], which are signed with the key with fingerprint
>> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> Jincheng
>>
>> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
>> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [3] https://github.com/apache/flink/tree/release-1.9.2
>>
>>
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-11 Thread Hequn Cheng
+1 (non-binding)

- Check signature and checksum.
- Install package successfully with Pip under Python 3.7.4.
- Run wordcount example successfully under Python 3.7.4.

Best, Hequn

On Tue, Feb 11, 2020 at 12:17 PM Dian Fu  wrote:

> +1 (non-binding)
>
> - Verified the signature and checksum
> - Pip installed the package successfully: pip install
> apache-flink-1.9.2.tar.gz
> - Run word count example successfully.
>
> Regards,
> Dian
>
> 在 2020年2月11日,上午11:44,jincheng sun  写道:
>
>
> +1 (binding)
>
> - Install the PyFlink by `pip install` [SUCCESS]
> - Run word_count in both command line and IDE [SUCCESS]
>
> Best,
> Jincheng
>
>
>
> Wei Zhong  于2020年2月11日周二 上午11:17写道:
>
>> Hi,
>>
>> Thanks for driving this, Jincheng.
>>
>> +1 (non-binding)
>>
>> - Verified signatures and checksums.
>> - Verified README.md and setup.py.
>> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python
>> 3.7.5 successfully.
>> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
>> `pyflink-shell.sh local` and try the examples in the help message, run well
>> and no exception.
>> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5,
>> run well and no exception.
>>
>> Best,
>> Wei
>>
>>
>> 在 2020年2月10日,19:12,jincheng sun  写道:
>>
>> Hi everyone,
>>
>> Please review and vote on the release candidate #1 for the PyFlink
>> version 1.9.2, as follows:
>>
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> The complete staging area is available for your review, which includes:
>>
>> * the official Apache binary convenience releases to be deployed to
>> dist.apache.org [1], which are signed with the key with fingerprint
>> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> Jincheng
>>
>> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
>> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [3] https://github.com/apache/flink/tree/release-1.9.2
>>
>>
>


Re: Rescaling a running topology

2020-02-11 Thread Andrey Zagrebin
Hi Stephen,

I am sorry that you had this experience with the rescale API.
Unfortunately, the rescale API was always experimental and had some flaws.
Recently, Flink community decided to disable it temporarily with the 1.9
release, see more explanation here [1].

I would advise the manual rescaling (path 1 in your original message).
Technically, the rescale operation skips some steps like job master startup
and uploading job artefacts but then it still does a similar thing as the
manual workflow:
1. take savepoint
2. redeploy tasks with the new parallelism from that savepoint
So practically, there should not be a big difference but it depends on the
job, of course, whether the rescale operation is faster or not.

Thanks,
Andrey

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html

On Fri, Feb 7, 2020 at 1:31 PM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Ooooh more fun... If I rescale down a job, the job's config at
> jobs/{jobid}/config does not reflect the new parallelism (there may not
> even be any way to detect such a parallelism change)... but more critically
> the job is now unstoppable and seems to end up stuck in the CANCELLING
> state for some time (I gave up waiting)
>
> On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> And now the job is stuck in a suspended state and I seem to have no way
>> to get it out of that state again!
>>
>> On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> The plot thickens... I was able to rescale down... just not back up
>>> again!!!
>>>
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
>>> localhost:8081
>>> Waiting for response...
>>> -- Running/Restarting Jobs ---
>>> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
>>> (RUNNING)
>>> --
>>> No scheduled jobs.
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
>>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
>>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.util.FlinkException: Could not rescale job
>>> ebc20a700c334f61ea03ecdf3d8939ca.
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.IllegalStateException: Suspend needs to happen atomically
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>>> at
>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>>> at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at 

Re: FlinkCEP questions - architecture

2020-02-11 Thread Arvid Heise
Hi Juergen,

1) yes, you are using a changelog of events. If you need more information,
you could search for change data capture architecture.

For alle CEP question, I'm pulling in Kostas.

12) It depends in which format the data is exported. If you use a format
with schema evolution (e.g. Avro), then schema changes will be handled
gracefully.

Best,

Arvid

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag <
juergen.donners...@gmail.com> wrote:

> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but
> did read some of the docs and watched videos. Could you please help me
> understand if and how certain of our reqs are covered by Flink (CEP). Is
> this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have modify-cols.
> Even though they are files but because they contain changes only, I belief
> the file records shall be considered events in Flink terminology. Is that
> assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and
> we can not change the export. Our use case is a "complex event processing"
> case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C
> within 30 days, then do something". Does that work with FlinkCEP despite
> the events/records are not in chrono order within the file? The files are
> 100MB to 20GB in size. Do I need to sort the files first before CEP
> processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within
> the database and manually trigger a re-export of ALL of the changes for
> that respective day (e.g. last weeks Tuesday). Consequently we receive a
> correction file. Same filename but "_1" appended. All filenames include the
> date (of the original export). What are the options to handle that case
> (besides telling the DB admins not to, which we did already). Regular
> checkpoints and re-process all files since then?  What happens to the CEP
> state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window. E.g. If
> A, then B, but C did not occur within 30 days since A. Is that supported by
> FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able
> to query the temporary CEP state. What is the (CEP) schema used to persist
> the CEP state and how can we query it? And does such query work on the
> whole cluster or only per node (e.g. because of shuffle and nodes
> responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or
> remove a nodes. Will the state still be found, despite it being stored in
> another node? I read that I need to be equally careful when changing rules?
> Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay
> forever?  For tracing/investigation reasons I can imagine that purging it
> at the earliest possible time is not always the best option. May be after
> 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you
>  filter first on the "smallest" attributes. CEP rules form a sequence.
> Hence that approach will not work. Is that an issue at all? What are
> practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed
> things up by processing all files in parallel on multiple nodes, despite
> their sequence (CEP use case)? This would only work if FlinkCEP in step 1
> simply filters on all relevant events of a sequence, updates state, and in
> a step 2 - after the files are processed - evaluates the updated state if
> that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system
> schema is changed, and not always in a backwards compatible way (insert new
> fields in the middle), and also the export will have the field in the
> middle. This means that starting from a specific (file) date, I need to
> consider a different schema. This must also be handled when re-running
> files for the last month, because of corrections provided. And if the file
> format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
>


Re:Re: Flink DataTypes json parse exception

2020-02-11 Thread sunfulin
Hi, 
I am using the latest Flink 1.10 rc. When I run the same code using Flink 
1.8.2, there is no problem. But using 1.10 the issue just occur. 
Confused by the related reason.











At 2020-02-11 18:33:50, "Timo Walther"  wrote:
>Hi,
>
>from which Flink version are you upgrading? There were some changes in 
>1.9 for how to parse timestamps in JSON format.
>
>Your error might be related to those changes:
>
>https://issues.apache.org/jira/browse/FLINK-11727
>
>I hope this helps.
>
>Timo
>
>
>On 07.02.20 07:57, sunfulin wrote:
>> Hi, guys
>> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema 
>> defination.
>> I am reading and consuming records from kafka with json schema like   
>> {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is 
>> :
>> 
>> 
>> 
>> .withSchema(
>>  new Schema()
>>  // eventTime
>>  .field("rowtime", 
>> DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
>>  new Rowtime()
>>  .timestampsFromField("recv_time")
>>  .watermarksPeriodicBounded(1000)
>>  )
>>  .field("user_id", DataTypes.STRING())
>> 
>> 
>> 
>> 
>> 
>> 
>> But, I am running an issue and got exception like the following:
>> 
>> 
>> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' 
>> could not be parsed at index 0
>> at 
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at 
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> 


Re: Flink DataTypes json parse exception

2020-02-11 Thread Timo Walther

Hi,

from which Flink version are you upgrading? There were some changes in 
1.9 for how to parse timestamps in JSON format.


Your error might be related to those changes:

https://issues.apache.org/jira/browse/FLINK-11727

I hope this helps.

Timo


On 07.02.20 07:57, sunfulin wrote:

Hi, guys
When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema 
defination.
I am reading and consuming records from kafka with json schema like   
{"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is :



.withSchema(
 new Schema()
 // eventTime
 .field("rowtime", 
DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime(
 new Rowtime()
 .timestampsFromField("recv_time")
 .watermarksPeriodicBounded(1000)
 )
 .field("user_id", DataTypes.STRING())






But, I am running an issue and got exception like the following:


Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could 
not be parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)





Re: Flink 1.10 on MapR secure cluster with high availability

2020-02-11 Thread Arvid Heise
Hi Maxim,

in general, we have used shading in the past to avoid dependency hells for
users. We are in the process to replace more and more relocations with
plugins having a different classloader. However, I can't tell you if that
will also work for zookeeper and when that would happen.

If you manage to find out how Hadoop is solving that, I'd appreciate if you
could let us know.

Best,

Arvid

On Fri, Feb 7, 2020 at 5:02 PM Maxim Parkachov 
wrote:

> Hi Chesnay,
>
> I managed to re-compile with MapR zookeeper and can confirm that it works
> with HA as well.
> Still I find it strange that HA uses shadow version of zookeeper instead
> of version from classpath how it is done for hadoop.
>
> Thanks,
> Maxim.
>
> On Wed, Feb 5, 2020 at 3:43 PM Chesnay Schepler 
> wrote:
>
>> No, since a) HA will never use classes from the user-jar and b) zookeeper
>> is relocated to a different package (to avoid conflicts) and hence any
>> replacement has to follow the same relocation convention.
>>
>> On 05/02/2020 15:38, Maxim Parkachov wrote:
>>
>> Hi Chesnay,
>>
>> thanks for advise. Will it work if I include MapR specific zookeeper in
>> job dependencies and still use out-of-box Flink binary distribution ?
>>
>> Regards,
>> Maxim.
>>
>> On Wed, Feb 5, 2020 at 3:25 PM Chesnay Schepler 
>> wrote:
>>
>>> You must rebuild Flink while overriding zookeeper.version property to
>>> match your MapR setup.
>>> For example: mvn clean package -Dzookeeper.version=3.4.5-mapr-1604
>>> Note that you will also have to configure the MapR repository in your
>>> local setup as described here
>>> .
>>>
>>> On 05/02/2020 15:12, Maxim Parkachov wrote:
>>>
>>> Hi everyone,
>>>
>>> I have already written about issue with Flink 1.9 on secure MapR cluster
>>> and high availability. The issue was resolved with custom compiled Flink
>>> with vendor mapr repositories enabled. The history could be found
>>> https://www.mail-archive.com/user@flink.apache.org/msg28235.html
>>>
>>> Unfortunately, in current 1.10 RC vendor repositories were removed and
>>> I'm failing to get working configuration.  Current situation with 1.10 RC
>>> and secure MapR cluster:
>>>
>>> 1. Without HA, Flink uses class path provided zookeeper jar (mapr
>>> specific) and everything works fine.
>>>
>>> 2. With HA enabled, Flink uses shaded zookeeper  
>>> (org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn)
>>> which  doesn't have MapR specific changes and fails to authenticate.
>>>
>>> I would really appreciate any help in resolving this issue.I'm ready to
>>> provide any required details.
>>>
>>> Regards,
>>> Maxim.
>>>
>>>
>>>
>>


Re: Question: Determining Total Recovery Time

2020-02-11 Thread Morgan Geldenhuys

Thanks for the advice, i will look into it.

Had a quick think about another simple solution but we would need a hook 
into the checkpoint process from the task/operator perspective, which I 
haven't looked into yet. It would work like this:


- The sink operators (?) would keep a local copy of the last message 
processed (or digest?), the current timestamp, and a boolean value 
indicating whether or not the system is in recovery or not.
- While not in recovery, update the local copy and timestamp with each 
new event processed.
- When a failure is detected and the taskmanagers are notified to 
rollback, we use the hook into this process to switch the boolean value 
to true.
- While true, it compares each new message with the last one processed 
before the recovery process was initiated.
- When a match is found, the difference between the previous and current 
timestamp is calculated and outputted as a custom metric and the boolean 
is reset to false.


From here, the mean total recovery time could be calculated across the 
operators. Not sure how it would impact on performance, but i doubt it 
would be significant. We would need to ensure exactly once so that the 
message would be guaranteed to be seen again. thoughts?


On 11.02.20 08:57, Arvid Heise wrote:

Hi Morgan,

as Timo pointed out, there is no general solution, but in your 
setting, you could look at the consumer lag of the input topic after a 
crash. Lag would spike until all tasks restarted and reprocessing 
begins. Offsets are only committed on checkpoints though by default.


Best,

Arvid

On Tue, Feb 4, 2020 at 12:32 PM Timo Walther > wrote:


Hi Morgan,

as far as I know this is not possible mostly because measuring
"till the
point when the system catches up to the last message" is very
pipeline/connector dependent. Some sources might need to read from
the
very beginning, some just continue from the latest checkpointed
offset.

Measure things like that (e.g. for experiments) might require
collecting
own metrics as part of your pipeline definition.

Regards,
Timo


On 03.02.20 12:20, Morgan Geldenhuys wrote:
> Community,
>
> I am interested in determining the total time to recover for a
Flink
> application after experiencing a partial failure. Let's assume a
> pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
> guarantees enabled.
>
> Taking a look at the documentation
>

(https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),

> one of the metrics which can be gathered is /recoveryTime/.
However, as
> far as I can tell this is only the time taken for the system to
go from
> an inconsistent state back into a consistent state, i.e.
restarting the
> job. Is there any way of measuring the amount of time taken from
the
> point when the failure occurred till the point when the system
catches
> up to the last message that was processed before the outage?
>
> Thank you very much in advance!
>
> Regards,
> Morgan.





Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink

2020-02-11 Thread Arvid Heise
Hi David,

this seems to be a bug in our s3 plugin. The joda dependency should be
bundled there.

Are you using s3 as a plugin by any chance? Which flink version are you
using?

If you are using s3 as a plugin, you could put joda in your plugin folder
like this

flink-dist
├── conf
├── lib
...
└── plugins
└── s3
├── joda.jar
└── flink-s3-fs-hadoop.jar

If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.

Adding joda to your user code will unfortunately not work.

Best,

Arvid

On Thu, Feb 6, 2020 at 11:16 PM David Magalhães 
wrote:

> Hi Andrey, thanks for your reply.
>
> The class is on the jar created with `*sbt assembly*` that is
> submitted to Flink to start a Job.
>
> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>  1649  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedField.class
>  1984  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedState.class
>  8651  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket.class
>
> Shouldn't this be enough ?
>
> I think it uses is when nothing happens, but as soon it have some
> exceptions, looks like it "forgets" it.
>
> Like I said before, this is kind of intermittent.
>
> Thanks,
> David
>
> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin 
> wrote:
>
>> Hi David,
>>
>> This looks like a problem with resolution of maven dependencies or
>> something.
>> The custom WindowParquetGenericRecordListFileSink probably transitively
>> depends on org/joda/time/format/DateTimeParserBucket
>> and it is missing on the runtime classpath of Flink.
>>
>> Best,
>> Andrey
>>
>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães 
>> wrote:
>>
>>> I'm implementing an exponential backoff inside a custom sink that uses
>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>> offline, it waits until it is online.
>>>
>>> I also want to test that the back pressure and the checkpoints are
>>> working as intended, and for the first one, I can see the back pressure in
>>> Flink UI going up, and recover as expected and not reading more data from
>>> Kafka.
>>>
>>> For the checkpoints, and I've added inside the sink invoke function a
>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>> need to recover from the last good checkpoint), but something strange
>>> happens. I can see the job is being canceled and created again, and running
>>> fine, other times after a X number of times of being created and canceled,
>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>> forever.
>>>
>>> Do you guys have any thoughts?
>>>
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>> ... 7 more
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket
>>> at
>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>> at
>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>> at
>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)

rocksdb max open file descriptor issue crashed application

2020-02-11 Thread ApoorvK
flink app is crashing due to "too many file opens" issue , currently app is
having 300 operator and 60GB is the state size. suddenly app is opening 35k
around files which was 20k few weeks before, hence app is crashing, I have
updated the machine as well as yarn limit to 60k hoping it will not crash
again.
Please suggest if there is any alternative solution for this



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/