Re: Read parquet data from S3 with Flink 1.12

2021-12-20 Thread Alexandre Montecucco
Hello,

I also face the same issue as documented in a previous mail from the
mailing list [1]
Basically when using flink-parquet, I get:

>  java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

I have no idea what I need to do to fix this and could not find anything
from the doc. I tried importing various hadoop libraries, but it always
causes yet another issue.

I think this might be the root cause of my problem.

Best,
Alex

[1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2

On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
alexandre.montecu...@grabtaxi.com> wrote:

> Hello Piotrek,
> Thank you for the help.
> Regarding the S3 issue I have followed the documentation for the plugins.
> Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
> Also, in this case, just reading regular plain text file works, I only
> have an issue when using Parquet.
>
> I tried switching to Flink 1.14, however I am stumbling upon other
> blockers.
> To give more context, I am trying to build a Flink savepoint for cold
> start data. So I am using the Flink State Processor API. But:
>  -  Flink State Processor API is using the DataSet api which is now marked
> as deprecated (Legacy)
>  - the doc you shared regarding reading from Parquet uses the DataStream
> API
>  - the Flink State Processor API doc [1] states there is interoperability
> of DataSet and Table API
> 
>  (but the link is now erroneous), it was last correct in Flink 1.12 [2]
>
> Given that we can convert from DataStream to Table API, I was thinking I
> could then convert from Table to DataSet API (though very cumbersome and
> unsure if any performance / memory impact).
> But for the Table to DataSet conversion, the doc is using a 
> BatchTableEnvironment
> class which does not seem to exist in Flink 1.14 anymore
>
> Any recommendations or anything I might have missed?
>
> Thank you.
>
> Best,
> Alex
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>
> 
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>
>
> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Reading in the DataStream API (that's what I'm using you are doing) from
>> Parquet files is officially supported and documented only since 1.14 [1].
>> Before that it was only supported for the Table API. As far as I can tell,
>> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
>> already been in the code base since 1.12.x. I don't know how stable it was
>> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
>> a last resort you can try using the very least the latest version of 1.12.x
>> branch as documented by 1.14 version, but I can not guarantee that it will
>> be working.
>>
>> Regarding the S3 issue, have you followed the documentation? [2][3]
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>>
>>
>> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
>> alexandre.montecu...@grabtaxi.com> napisał(a):
>>
>>> Hello everyone,
>>> I am struggling to read S3 parquet files from S3 with Flink Streaming
>>> 1.12.2
>>> I had some difficulty simply reading from local parquet files. I finally
>>> managed that part, though the solution feels dirty:
>>> - I use the readFile function + ParquetInputFormat abstract class (that
>>> is protected) (as I could not find a way to use the public
>>> ParquetRowInputFormat).
>>> - the open function, in ParquetInputFormat is
>>> using org.apache.hadoop.conf.Configuration. I am not sure which import to
>>> add. It seems the flink-parquet library is importing the dependency from
>>> hadoop-common but the dep is marked as provided. THe doc only shows usage
>>> of flink-parquet from Flink SQL. So I am under the impression that this
>>> might not work in the streaming case without extra code. I 'solved' this by
>>> adding a dependency to hadoop-common. We did something similar to write
>>> parquet data to S3.
>>>
>>> Now, when trying to run the application to read from S3, I get an
>>> exception with root cause:
>>> ```
>>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>>> FileSystem for scheme "s3"
>>> ```
>>> I guess there ar

Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-20 Thread Martijn Visser
Hi,

The status and Flink ticket for upgrading to Log4j 2.17.0 can be tracked at
https://issues.apache.org/jira/browse/FLINK-25375.

Best regards,

Martijn

On Sat, 18 Dec 2021 at 16:50, V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hi,
>
>
>
> It seems there is high severity vulnerability in log4j 2.16.0.(
> CVE-2021-45105
> )
>
> Refer : https://logging.apache.org/log4j/2.x/security.html
>
> Any update on this please?
>
>
>
> Regards,
>
> Suchithra
>
>
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, December 16, 2021 4:35 PM
> *To:* Parag Somani 
> *Cc:* Michael Guterl ; V N, Suchithra (Nokia -
> IN/Bangalore) ; Richard Deurwaarder <
> rich...@xeli.eu>; user 
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> We will announce the releases when the binaries are available.
>
>
>
> On 16/12/2021 05:37, Parag Somani wrote:
>
> Thank you Chesnay for expediting this fix...!
>
>
>
> Can you suggest, when can I get binaries for 1.14.2 flink version?
>
>
>
> On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler 
> wrote:
>
> We will push docker images for all new releases, yes.
>
>
>
> On 16/12/2021 01:16, Michael Guterl wrote:
>
> Will you all be pushing Docker images for the 1.11.6 release?
>
>
>
> On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler 
> wrote:
>
> The current ETA is 40h for an official announcement.
>
> We are validating the release today (concludes in 16h), publish it
> tonight, then wait for mirrors to be sync (about a day), then we announce
> it.
>
>
>
> On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:
>
> Hello,
>
>
>
> Could you please tell when we can expect Flink 1.12.7 release? We are
> waiting for the CVE fix.
>
>
>
> Regards,
>
> Suchithra
>
>
>
>
>
> *From:* Chesnay Schepler  
> *Sent:* Wednesday, December 15, 2021 4:04 PM
> *To:* Richard Deurwaarder  
> *Cc:* user  
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> We will also update the docker images.
>
>
>
> On 15/12/2021 11:29, Richard Deurwaarder wrote:
>
> Thanks for picking this up quickly!
>
>
>
> I saw you've made a second minor upgrade to upgrade to log4j2 2.16 which
> is perfect.
>
>
>
> Just to clarify: Will you also push new docker images for these releases
> as well? In particular flink 1.11.6 (Sorry we must upgrade soon! :()
>
>
>
> On Tue, Dec 14, 2021 at 2:33 AM narasimha  wrote:
>
> Thanks TImo, that was helpful.
>
>
>
> On Mon, Dec 13, 2021 at 7:19 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
> Chesnay Thank you for the clarification.
>
>
>
> On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler 
> wrote:
>
> The flink-shaded-zookeeper jars do not contain log4j.
>
>
>
> On 13/12/2021 14:11, Prasanna kumar wrote:
>
> Does Zookeeper have this vulnerability dependency ? I see references to
> log4j in Shaded Zookeeper jar included as part of the flink distribution.
>
>
>
> On Mon, Dec 13, 2021 at 1:40 PM Timo Walther  wrote:
>
> While we are working to upgrade the affected dependencies of all
> components, we recommend users follow the advisory of the Apache Log4j
> Community. Also Ververica platform can be patched with a similar approach:
>
> To configure the JVMs used by Ververica Platform, you can pass custom
> Java options via the JAVA_TOOL_OPTIONS environment variable. Add the
> following to your platform values.yaml, or append to the existing value
> of JAVA_TOOL_OPTIONS if you are using it already there, then redeploy
> the platform with Helm:
> env:
>- name: JAVA_TOOL_OPTIONS
>  value: -Dlog4j2.formatMsgNoLookups=true
>
>
> For any questions, please contact us via our support portal.
>
> Regards,
> Timo
>
> On 11.12.21 06:45, narasimha wrote:
> > Folks, what about the veverica platform. Is there any
> mitigation around it?
> >
> > On Fri, Dec 10, 2021 at 3:32 PM Chesnay Schepler  > > wrote:
> >
> > I would recommend to modify your log4j configurations to set
> > log4j2.formatMsgNoLookups to true/./
> > /
> > /
> > As far as I can tell this is equivalent to upgrading log4j, which
> > just disabled this lookup by default.
> > /
> > /
> > On 10/12/2021 10:21, Richard Deurwaarder wrote:
> >> Hello,
> >>
> >> There has been a log4j2 vulnerability made public
> >> https://www.randori.com/blog/cve-2021-44228/
> >>  which is making
> >> some waves :)
> >> This post even explicitly mentions Apache Flink:
> >>
> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
> >> <
> https://securityonline.info/apache-log4j2-remote-code-execution-vulnerability-alert/
> >
> >>
> >> And fortunately, I saw this was already on your radar:
> >> https://issues.apache.org/jira/browse/FLINK-25240
> >> 
> >>
> >> What would the advice be for flink users? Do you ex

Re: Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread Seth Wiesman
No. The default max parallelism of 128 will be applied. If you try to
restore above that value, the restore will fail and you can simply restore
at a smaller value.

No data loss.

On Mon, Dec 20, 2021 at 2:28 AM 杨浩  wrote:

>
> Thanks for your replay. If we don't set the max parallelism, and we change
> the parallelism to a very big num, will the state loss?
>
>
>
>
>
> At 2021-11-27 01:20:49, "Yun Tang"  wrote:
> >Hi Yang,
> >
> >Flink keeps the max key groups the same no matter how parallelism changes, 
> >and use this to avoid state data lost [1]
> >
> >[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
> >
> >
> >Best
> >Yun Tang
> >
> >On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
> >> Hi,
> >>
> >> to rescale, you should take a savepoint, stop the job, then restart from
> >> the savepoint with your new desired parallelism. This way, no data will be
> >> lost.
> >>
> >> Best,
> >> Nico
> >>
> >> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
> >>
> >> > Will Flink loss some old Keyed State when changing the parallelism, like 
> >> > 2
> >> > -> 5, or 5->3?
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
>
>
>
>
>


flink-playground docker/mvn clean install Unknown host repo.maven.apache.org: Name or service not known

2021-12-20 Thread HG
Hello
I am trying to the flink-playground examples.
The docker-compose build fails on the mvn clean install command .

I am behind a proxy.
To diagnose this I started a container based on the already created image
  docker run -it  --name my-maven-project -v "$(pwd)":/usr/src/mymaven -w
/usr/src/mymaven b92c6af9fde8 /bin/bash
Adding packages with apt goes fine
I added the proxy names with their  ip addresses to /etc/hosts, added the
proxies to .m2/settings.xml

After that it worked .

But why does it not work without manual intervention?

Regards Hans-Peter


Re: unaligned checkpoint for job with large start delay

2021-12-20 Thread Piotr Nowojski
Hi Mason,

Those checkpoint timeouts (30 minutes) have you already observed with the
alignment timeout set to 0ms? Or as you were previously running it with 1s
alignment timeout?

If the latter, it might be because unaligned checkpoints are failing to
kick in in the first place. Setting the timeout to 0ms should solve the
problem.

If the former, have you checked why the checkpoints are timeouting? What
part of the checkpointing process is taking a long time? For example can
you post a screenshot from the WebUI of checkpoint stats for each task? The
only explanation I could think of is this sleep time that you added. 25ms
per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record
= 72 000 records. One of the unaligned checkpoints limitations is that
Flink can not snapshot a state of an operator in the middle of processing a
record. In your particular case, Flink will not be able to snapshot the
state of the session window operator in the middle of the windows being
fired. If your window operator is firing a lot of windows at the same time,
or a single window is producing 72k of records (which would be an
unusual but not unimaginable amount), this could block checkpointing of the
window operator for 30 minutes due to this 25ms sleep down the stream.

Piotrek

pt., 17 gru 2021 o 19:19 Mason Chen  napisał(a):

> Hi Piotr,
>
> Thanks for the link to the JIRA ticket, we actually don’t see much state
> size overhead between checkpoints in aligned vs unaligned, so we will go
> with your recommendation of using unaligned checkpoints with 0s alignment
> timeout.
>
> For context, we are testing unaligned checkpoints with our application
> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
> -> [various kafka sinks]. The first task has parallelism 40 and the rest of
> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>
> We added an artificial sleep (25 ms per invocation of in process function)
> the session window task to simulate backpressure; however, we still see
> checkpoints failing due to task acknowledgement doesn’t complete within our
> checkpoint timeout (30 minutes).
>
> I am able to correlate that the input buffers from *window* and output
> buffers from *source* being 100% usage corresponds to the checkpoint
> failures. When they are not full (input can drop to as low as 60% usage and
> output can drop to as low as 55% usage), the checkpoints succeed within
> less than 2 ms. In all cases, it is the session window task or source task
> failing to 100% acknowledge the barriers within timeout. I do see the
> *source* task acknowledgement taking long in some of the failures (e.g.
> 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and
> not busy at this time.
>
> All other input buffers are low usage (mostly 0). For output buffer, the
> usage is around 50% for window--everything else is near 0% all the time
> except the source mentioned before (makes sense since rest are just sinks).
>
> We are also running a parallel Flink job with the same configurations,
> except with unaligned checkpoints disabled. Here we see observe the same
> behavior except now some of the checkpoints are failing due to the source
> task not acknowledging everything within timeout—however, most failures are
> still due to session window acknowledgement.
>
> All the data seems to points an issue with the source? Now, I don’t know
> how to explain this behavior since unaligned checkpoints should overtake
> records in the buffers (once seen at the input buffer, forward immediately
> downstream to output buffer).
>
> Just to confirm, this is our checkpoint configuration:
> ```
> Option
> Value
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend EmbeddedRocksDBStateBackend
> Interval 5m 0s
> Timeout 30m 0s
> Minimum Pause Between Checkpoints 2m 0s
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Enabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 10
> ```
>
> Are there other metrics should I look at—why else should tasks fail
> acknowledgement in unaligned mode? Is it something about the implementation
> details of window function that I am not considering? My main hunch is
> something to do with the source.
>
> Best,
> Mason
>
> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski  wrote:
>
> Hi Mason,
>
> In Flink 1.14 we have also changed the timeout behavior from checking
> against the alignment duration, to simply checking how old is the
> checkpoint barrier (so it would also account for the start delay) [1]. It
> was done in order to solve problems as you are describing. Unfortunately we
> can not backport this change to 1.13.x as it's a breaking change.
>
> Anyway, from our experience I would recommend going all in with the
> unaligned checkpoints, so setting the timeout back to the default value of
> 0ms. With timeouts you are gaining very little (a tiny

Re: Flink fails to load class from configured classpath using PipelineOptions

2021-12-20 Thread Yang Wang
Yes. You need to set the "pipeline.classpath" via flink-conf.yaml or CLI
options(-C/--classpath).
I do not think setting it in your main class could work. Just like you
said, the user classloader will not be updated after the user main class is
executed.

Best,
Yang

Pouria Pirzadeh  于2021年12月18日周六 01:23写道:

> I have tried 'PipelineOptions.CLASSPATHS'; It also fails with
> ClassNotFoundException with the exact same error stack trace as
> PipelineOptions.JARS.
>
> FYI The Same application jar works fine if submitted via Flink CLI using
> 'flink run' with the "-C" option to update classpath:
> /bin/flink run --detached -C file:///path/to/udf.jar 
>
> The problem seems to be that the classpath for the ClassLoader which
> codegen in table planner uses is not updated according to Configuration
> passed to the StreamExecutionEnvironment, and I am not sure how that can
> be done.
>
> Pouria
>
>
> On Thu, Dec 16, 2021 at 8:46 PM Yang Wang  wrote:
>
>> The config option "pipeline.jars" is used to specify the user jar, which
>> contains the main class.
>> I think what you need is "pipeline.classpaths".
>>
>> /**
>>  * A list of URLs that are added to the classpath of each user code 
>> classloader of the program.
>>  * Paths must specify a protocol (e.g. file://) and be accessible on all 
>> nodes
>>  */
>> public static final ConfigOption> CLASSPATHS =
>> key("pipeline.classpaths")
>> .stringType()
>> .asList()
>> .noDefaultValue()
>> .withDescription(
>> "A semicolon-separated list of the classpaths to 
>> package with the job jars to be sent to"
>> + " the cluster. These have to be valid 
>> URLs.");
>>
>>
>> Best,
>> Yang
>>
>> Pouria Pirzadeh  于2021年12月17日周五 03:43写道:
>>
>>> I am developing a Java application which uses UDFs on Flink 1.14.
>>> It uses PipelineOptions.JARS config to add jar files, containing UDF
>>> classes, dynamically to the user classpath in the main method; However the
>>> application fails to load UDF class from configured jar files at job
>>> launch time with and crashes with ClassNotFoundException.
>>>
>>> Is PipelineOptions.JARS the correct option to add files to classpath on
>>> Job manager and all task managers?
>>>
>>> Sample code snippet:
>>>
>>> final Configuration configuration = new Configuration();
>>>
>>> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
>>> StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>>> StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv);
>>> ...
>>> Class udfClass = Class.forName("demo.MyUDF", ...);
>>> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
>>> ...
>>>
>>> Error stack trace:
>>> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>>> at
>>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>>> at
>>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>> at
>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>> at
>>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>>> at
>>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>>> at
>>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>> at
>>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>>> at
>>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>>> at
>>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>>> at
>>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>>> at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>>> at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>>> at
>>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>>> at
>>> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>>> at
>>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.sca

回复:Re: Will Flink loss some old Keyed State when changing the parallelism

2021-12-20 Thread 杨浩



Thanks for your replay. If we don't set the max parallelism, and we change the 
parallelism to a very big num, will the state loss?











At 2021-11-27 01:20:49, "Yun Tang"  wrote:
>Hi Yang,
>
>Flink keeps the max key groups the same no matter how parallelism changes, and 
>use this to avoid state data lost [1]
>
>[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>
>
>Best
>Yun Tang
>
>On 2021/11/26 10:07:29 Nicolaus Weidner wrote:
>> Hi,
>> 
>> to rescale, you should take a savepoint, stop the job, then restart from
>> the savepoint with your new desired parallelism. This way, no data will be
>> lost.
>> 
>> Best,
>> Nico
>> 
>> On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:
>> 
>> > Will Flink loss some old Keyed State when changing the parallelism, like 2
>> > -> 5, or 5->3?
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> 


Re: Read parquet data from S3 with Flink 1.12

2021-12-20 Thread Alexandre Montecucco
Hello Piotrek,
Thank you for the help.
Regarding the S3 issue I have followed the documentation for the plugins.
Many of our other apps are using S3 through the Hadoop Fs Flink plugin.
Also, in this case, just reading regular plain text file works, I only have
an issue when using Parquet.

I tried switching to Flink 1.14, however I am stumbling upon other
blockers.
To give more context, I am trying to build a Flink savepoint for cold start
data. So I am using the Flink State Processor API. But:
 -  Flink State Processor API is using the DataSet api which is now marked
as deprecated (Legacy)
 - the doc you shared regarding reading from Parquet uses the DataStream API
 - the Flink State Processor API doc [1] states there is interoperability
of DataSet and Table API

 (but the link is now erroneous), it was last correct in Flink 1.12 [2]

Given that we can convert from DataStream to Table API, I was thinking I
could then convert from Table to DataSet API (though very cumbersome and
unsure if any performance / memory impact).
But for the Table to DataSet conversion, the doc is using a
BatchTableEnvironment
class which does not seem to exist in Flink 1.14 anymore

Any recommendations or anything I might have missed?

Thank you.

Best,
Alex


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api


[2]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/


On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski  wrote:

> Hi,
>
> Reading in the DataStream API (that's what I'm using you are doing) from
> Parquet files is officially supported and documented only since 1.14 [1].
> Before that it was only supported for the Table API. As far as I can tell,
> the basic classes (`FileSource` and `ParquetColumnarRowInputFormat`) have
> already been in the code base since 1.12.x. I don't know how stable it was
> and how well it was working. I would suggest upgrading to Flink 1.14.1. As
> a last resort you can try using the very least the latest version of 1.12.x
> branch as documented by 1.14 version, but I can not guarantee that it will
> be working.
>
> Regarding the S3 issue, have you followed the documentation? [2][3]
>
> Best,
> Piotrek
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/filesystems/s3.html
>
>
> pt., 17 gru 2021 o 10:10 Alexandre Montecucco <
> alexandre.montecu...@grabtaxi.com> napisał(a):
>
>> Hello everyone,
>> I am struggling to read S3 parquet files from S3 with Flink Streaming
>> 1.12.2
>> I had some difficulty simply reading from local parquet files. I finally
>> managed that part, though the solution feels dirty:
>> - I use the readFile function + ParquetInputFormat abstract class (that
>> is protected) (as I could not find a way to use the public
>> ParquetRowInputFormat).
>> - the open function, in ParquetInputFormat is
>> using org.apache.hadoop.conf.Configuration. I am not sure which import to
>> add. It seems the flink-parquet library is importing the dependency from
>> hadoop-common but the dep is marked as provided. THe doc only shows usage
>> of flink-parquet from Flink SQL. So I am under the impression that this
>> might not work in the streaming case without extra code. I 'solved' this by
>> adding a dependency to hadoop-common. We did something similar to write
>> parquet data to S3.
>>
>> Now, when trying to run the application to read from S3, I get an
>> exception with root cause:
>> ```
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "s3"
>> ```
>> I guess there are some issues with hadoop-common not knowing about the
>> flink-s3-hadoop plugin setup. But I ran out of ideas on how to solve this.
>>
>>
>> I also noticed there were some changes with flink-parquet in Flink 1.14,
>> but I had some issues with simply reading data (but I did not investigate
>> so deeply for that version).
>>
>> Many thanks for any help.
>> --
>>
>> [image: Grab] 
>>
>> [image: Twitter]   [image: Facebook]
>>  [image: LinkedIn]
>>  [image: Instagram]
>>  [image: Youtube]
>> 
>>
>> Alexandre Montecucco / Grab, Software D