flink session job retention time

2020-10-08 Thread Richard Moorhead
Is there a configuration that controls how long jobs are retained in a
flink session?


Re: NoResourceAvailableException

2020-10-08 Thread Khachatryan Roman
I assume that before submitting a job you started a cluster with default
settings with ./bin/start-cluster.sh.

Did you submit any other jobs?
Can you share the logs from log folder?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:03 PM Alexander Semeshchenko 
wrote:

>
> 
>
> Installing (download & tar zxf) Apache Flink 1.11.1 and running: ./bin/flink
> run examples/streaming/WordCount.jar it show on the nice message after
> more less 5 min. the trying of submitting:  Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources. at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ... 45 more Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> It's Flink default configuration.
>
> Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Byte Order: Little
> Endian CPU(s): 8 On-line CPU(s) list: 0-7 Thread(s) per core: 1 Core(s) per
> socket: 1
> free -g total used free shared buff/cache available
>
> Mem: 62 1 23 3 37 57 Swap: 7 0 7
>
> are there some advices about what is happened?
>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users

Very sorry for the spoiled formats, I resent the discussion as follows.

As discussed in FLIP-131[1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not s​upport checkpoints after some tasks finished, which causes 
some problems for bounded or mixed jobs:
1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be 
replayed before committed to external systems in streaming mode. If sources are 
bounded and checkpoints are disabled after some tasks are finished, the data 
sent after the last checkpoint would always not be able to be committed. This 
issue has already been reported some times in the user ML[2][3][4] and is 
future brought up when working on FLIP-143: Unified Sink API [5]. 
2. The jobs with both bounded and unbounded sources might have to 
replay a large amount of records after failover due to no periodic checkpoints 
are taken after the bounded sources finished.

Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 

Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
 --Original Mail --
Sender:Yun Gao 
Send Date:Fri Oct 9 14:16:52 2020
Recipients:Flink Dev , User-Flink 
Subject:[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-08 Thread Yun Gao
Hi, devs & users

As discussed in FLIP-131 [1], Flink will make DataStream the unified API for 
processing bounded and unbounded data in both streaming and blocking modes. 
However, one long-standing problem for the streaming mode is that currently 
Flink does not support checkpoints after some tasks finished, which causes some 
problems for bounded or mixed jobs:
Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed 
before committed to external systems in streaming mode. If sources are bounded 
and checkpoints are disabled after some tasks are finished, the data sent after 
the last checkpoint would always not be able to be committed. This issue has 
already been reported some times in the user ML[2][3][4] and is future brought 
up when working on FLIP-143: Unified Sink API [5]. 
The jobs with both bounded and unbounded sources might have to replay a large 
amount of records after failover due to no periodic checkpoints are taken after 
the bounded sources finished.
Therefore, we propose to also support checkpoints after some tasks finished. 
Your Could find more details in FLIP-147[6]. 
Best,
Yun

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] 
https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E
[3] 
https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E
[4] 
https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E
[5] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[6] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Khachatryan Roman
Thanks for checking this workaround!

I've created a jira issue [1] to check if AWS SDK version can be upgraded
in Flink distribution.

Regards,
Roman


On Fri, Oct 9, 2020 at 12:54 AM Dan Diephouse  wrote:

> Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
> appears to respect interrupts in a test case I created. (the test fails
> with the SDK that is in use by Flink)
>
> I will try it in a full fledged Flink environment and report back.
>
> On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse  wrote:
>
>> Did some digging... definitely appears that the Amazon SDK definitely is
>> not picking up the interrupt.  I will try playing with the connection
>> timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
>> Anyone have any other ideas?
>>
>> In theory this should be fixed by SDK v2 which uses NIO, but I don't
>> think I'm up for all the changes that would involve in the downstream
>> components.
>>
>> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:
>>
>>> Using the latest - 1.11.2.
>>>
>>> I would assume the interruption is being ignored in the Hadoop / S3
>>> layer. I was looking at the defaults and (if I understood correctly) the
>>> client will retry 20 times. Which would explain why it never gets
>>> cancelled...
>>>
>>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Dan Diephouse,

 From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
 where 2 is a bug.
 It's unclear though where the interruption is ignored (Flink/Hadoop
 FS/S3 client).

 What version of Flink are you using?

 Regards,
 Roman


 On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:

> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
> If/when the network connection has issues, it seems to put Flink into an
> irrecoverable state. Am I understanding this correctly? Any suggestions on
> how to troubleshoot / fix?
>
> Here is what I'm observing:
>
> *1. Network is dropped *
>
> *2. S3 connections do not exit gracefully*
>
> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
> not react to cancelling signal for 30 seconds, but is stuck in method:
>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
> java.base@14.0.2
> /java.net.Socket$SocketInputStream.read(Socket.java:982)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
> java.base@14.0.2
> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>
> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>
> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>
> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> java.base@14.0.2
> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>
> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>
> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>
> app//org.apache.http.impl.execchain.MainClientExec.e

Any testing issues when using StreamTableEnvironment.createTemporaryView?

2020-10-08 Thread Dan Hill
*Summary*
I'm hitting an error when running a test that is related to using
createTemporaryView to convert a Protobuf input stream to Flink Table API.
I'm not sure how to debug "SourceConversion$5.processElement(Unknown
Source)" line.  Is this generated code?  How can I debug this?

Any help would be appreciated.  Thanks! - Dan

*Details*
My current input is a protocol buffer stream.  I convert it to the Table
API spec using createTemporaryView.  The code is hacky.  I want to get some
tests implemented before cleaning it up.

KeyedStream batchLogStream =
env.fromElements(BatchLog.class, new
LogGenerator.BatchLogIterator().next())
.keyBy((logRequest) -> logRequest.getUserId());

tableEnv.createTemporaryView(
"input_user",
batchLogStream.flatMap(new ToUsers()),
$("userId"),
$("timeEpochMillis"),
$("userTime").rowtime());

This appears to work in my prototype (maybe serialization is broken).  In a
Flink test, I hit the following error.

org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
SourceConversion(table=[default.mydb.input_user], fields=[userId,
timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
RUNNING to FAILED. java.lang.NullPointerException
at SourceConversion$5.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)


I wasn't able to find this exact stacktrace when looking on Google.


Re: Native State in Python Stateful Functions?

2020-10-08 Thread Tzu-Li (Gordon) Tai
Hi,

Nice to hear that you are trying out StateFun!

It is by design that function state is attached to each HTTP invocation
request, as defined by StateFun's remote invocation request-reply protocol.
This decision was made with typical application cloud-native architectures
in mind - having function deployments be stateless and require no session
dependencies between the StateFun runtime and the functions services allows
the functions to scale out very easily.

There are some discussions on potentially adding a bi-directional protocol
in the future so that state can be lazily fetched on demand instead of
every invocation, but that is still in very early stages of discussion.

Could you briefly describe what the state access pattern in your
application looks like?
Maybe this can provide some insight for us in figuring out how a more
advanced / efficient protocol should be designed in future releases.

On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
wrote:

> Hi,
>
>
>
> In doing some testing with Flink stateful functions in Python and I’ve
> gotten a small POC working.  One of our key requirements for our stream
> processors is that they be written in python due to the skillset of our
> team.  Given that the Python DataStreams api seems to be under development
> in Flink 1.12, we’ve implemented our business logic as a stateful function
> using the remote pattern.  In some testing, it seems the state object is
> getting serialized and sent along with each HTTP request
>
One clarification here:
StateFun does not serialize or deserialize state, everything is maintained
and provided to functions as byte arrays.
Serialization / deserialization happens in user code (i.e. the functions).

Cheers,
Gordon

> and given that we’re storing quite a bit of data in this state, this seems
> to contribute to the latency of the application in a linear fashion.  Is
> there any way around this?  Is there a way to store the state local to the
> python application?
>
>
>
> Thanks,
>
> Dan
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Best way to test Table API and SQL

2020-10-08 Thread Rex Fenley
 Hello

I'd like to write a unit test for my Flink Job. It consists mostly of the
Table API and SQL using a StreamExecutionEnvironment with the blink
planner, from source to sink.
What's the best approach for testing Table API/SQL?

I read
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
however that seems to cover more for specialized functions with DataStreams
vs entire Table API constructs. What I think I'd like is to be able to have
some stubbed input sources and mocked out sinks which I use to test against
my Tables.

Does this seem reasonable?

I did find TestStreamEnvironment and maybe that would be useful at least
for running the tests locally it seems?
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/TestStreamEnvironment.html

Any help appreciated. Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: The file STDOUT does not exist on the TaskExecutor

2020-10-08 Thread sidhant gupta
Thanks Yang for providing another alternative solution.

On Fri, Oct 9, 2020, 7:49 AM Yang Wang  wrote:

> I second till's suggestion. Currently in container
> environment(docker/K8s), we could not output the
> STDOUT/STDERR to console and log
> files(taskmanager.out/err) simultaneously. In consideration
> of the user experience, we are using "conf/log4j-console.properties" to
> only output the STDOUT/STDERR
> to console. Then users could use "docker logs " or "kubectl
> logs " to view
> the logs easily.
>
> Except for disabling the logging of TaskManagerStdoutFileHandler
> in log4j-console.properties, you
> could also customize the image entrypoint to redirect the STDOUT/STDERR to
> separate files(taskmanager.out/err).
>
>
> Best,
> Yang
>
> Till Rohrmann  于2020年10月8日周四 下午3:30写道:
>
>> The easiest way to suppress this error would be to disable the logging
>> for TaskManagerStdoutFileHandler in your log4j.properties file.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 7, 2020 at 8:48 PM sidhant gupta  wrote:
>>
>>> Hi Till,
>>>
>>> I understand the errors which appears in my logs are not stopping me
>>> from running the job. I am running flink session cluster in ECS and also
>>> configured graylog to get the container logs. So getting the docker logs is
>>> also not an issue.
>>> But is there a way to suppress this error or any work around ?
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>> On Wed, Oct 7, 2020, 9:15 PM Till Rohrmann  wrote:
>>>
 Hi Sidhant,

 when using Flink's Docker image, then the cluster won't create the out
 files. Instead the components will directly write to STDOUT which is
 captured by Kubernetes and can be viewed using `kubectl logs POD_NAME`. The
 error which appears in your logs is not a problem. It is simply the REST
 handler which tries to serve the out files.

 Cheers,
 Till

 On Wed, Oct 7, 2020 at 5:11 PM 大森林  wrote:

> what's your running mode?
> if your flink cluster is on yarn mode,then the output you need has no
> relation to $FLINK_HOME/logs/*.out
>
>
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月7日(星期三) 晚上11:33
> *收件人:* "大森林";"user";
> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>
> Hi,
>
> I'm running flink cluster in ecs. There is a pipeline which creates
> the job manager and then the task manager using the docker image.
>
> Not sure if we would want to restart the cluster in production.
>
> Is there any way we can make sure the .out files will be created
> without restart ?
>
> I am able to see the logs in the logs tab but not the stdout logs in
> the web ui and getting the below mentioned error after running the job.
>
> Thanks
> Sidhant Gupta
>
>
> On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:
>
>> it's easy,
>> just restart your flink cluster(standalone mode)
>>
>> if you run flink in yarn mode,then the result will display on
>> $HADOOP/logs/*.out files
>>
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月7日(星期三) 晚上9:52
>> *收件人:* "大森林";
>> *抄送:* "user";
>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>
>> ++ user
>>
>> On Wed, Oct 7, 2020, 6:47 PM sidhant gupta 
>> wrote:
>>
>>> Hi
>>>
>>> I checked in the $FLINK_HOME/logs. The .out file was not there. Can
>>> you suggest what should be the action item ?
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>>
>>> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>>>

 check if the .out file is in $FLINK_HOME/logs  please.

 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月7日(星期三) 凌晨1:52
 *收件人:* "大森林";
 *主题:* Re: The file STDOUT does not exist on the TaskExecutor

 Hi,

 I am just running the docker container as it is by adding just the
 conf/flink.yaml .
 I am not sure if the .out file got deleted. Do we need to expose
 some ports ?

 Thanks
 Sidhant Gupta



 On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:

>
> Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
> you can just use your default log settings.
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月6日(星期二) 晚上10:59
> *收件人:* "user";
> *主题:* The file STDOUT does not exist on the TaskExecutor
>
> Hi,
>
> I am running dockerized flink:1.11.0-scala_2.11 container in ecs.
> I am getting the following error after the job runs:
>
> ERROR org.apache.flink.runtime.rest.handler.taskmanager.
> TaskManagerStdoutFileHandler [] - U

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-08 Thread Jark Wu
Hi Dylan,

Sorry for the late reply. We've just come back from a long holiday.

Thanks for reporting this problem. First, I think this is a bug that
`autoCommit` is false by default (JdbcRowDataInputFormat.Builder).
We can fix the default to true in 1.11 series, and I think this can solve
your problem in a short term?
Besides, we should expose the connector options to set auto commit and this
can be another issue to be implemented in master.
I'm glad to review the code.

What do you think?

Regarding to the failed JMXReporterFactoryTest, I think this is a known
issue, see FLINK-19539 [1]

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-19539

On Fri, 9 Oct 2020 at 01:29, Dylan Forciea  wrote:

> I’ve updated the unit tests and documentation, and I was running the azure
> test pipeline as described in the instructions. However, it appears that
> what seems to be an unrelated test for the JMX code failed. Is this a
> matter of me not setting things up correctly? I wanted to ensure everything
> looked good before I submitted the PR.
>
>
>
> [ERROR] Failures:
>
> [ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
>
> Expected: (a value equal to or greater than <9000> and a value less than
> or equal to <9010>)
>
>  but: a value less than or equal to <9010> <9040> was greater than
> <9010>
>
> [ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
>
> [INFO]
>
> [ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0
>
>
>
> Thanks,
>
> Dylan Forciea
>
>
>
> *From: *Till Rohrmann 
> *Date: *Thursday, October 8, 2020 at 2:29 AM
> *To: *Dylan Forciea 
> *Cc: *dev , Shengkai Fang , "
> user@flink.apache.org" , "j...@apache.org" <
> j...@apache.org>, Leonard Xu 
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> This sounds good. Maybe there are others in the community who can help
> with the review before the Jark and Leonard are back.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea  wrote:
>
> Actually…. It looks like what I did covers both cases. I’ll see about
> getting some unit tests and documentation updated.
>
>
>
> Dylan
>
>
>
> *From: *Dylan Forciea 
> *Date: *Wednesday, October 7, 2020 at 11:47 AM
> *To: *Till Rohrmann , dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Ok, I have created FLINK-19522 describing the issue. I have the code I
> made so far checked in at
> https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but
> this only fixes the SQL API. It sounds like there may be another change
> needed for the Table API… I’ll look into that and see if I can figure it
> out on my own while they’re out. I will also need to add some unit tests
> and update some documentation to get this ready for a PR.
>
>
>
> Thanks,
>
> Dylan
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, October 7, 2020 at 10:55 AM
> *To: *dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Hi Dylan,
>
>
>
> thanks for reaching out to the Flink community and excuse our late
> response. I am not an expert for the Table API and its JDBC connector but
> what you describe sounds like a missing feature. Also given that
> FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we
> might simply need to make it configurable from the JdbcTableSource. I am
> pulling in Jark and Leonard who worked on the JdbcTableSource and might
> help you to get this feature into Flink. Their response could take a week
> because they are currently on vacation if I am not mistaken.
>
>
>
> What you could already do is to open an issue linking FLINK-12198 and
> describing the problem and your solution proposal.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-12198
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea  wrote:
>
> I appreciate it! Let me know if you want me to submit a PR against the
> issue after it is created. It wasn’t a huge amount of code, so it’s
> probably not a big deal if you wanted to redo it.
>
> Thanks,
> Dylan
>
> From: Shengkai Fang 
> Date: Wednesday, October 7, 2020 at 9:06 AM
> To: Dylan Forciea 
> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
> Sorry for late response. +1 to support it. I will open a jira about it
> later.
>
> Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日 周三下午
> 9:53写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
> I hadn’t heard a response on this, so I’m going to expand this to the dev
> email list.
>
>
>
> If this is indeed an issue and not my misunderstanding, I have most of a
> patch already coded up. Please let me know, and I can create a JIRA issue
> and send out a PR.
>
>
>
> Regards,
>
> Dylan Forciea
>
> Oseberg

Re: The file STDOUT does not exist on the TaskExecutor

2020-10-08 Thread Yang Wang
I second till's suggestion. Currently in container environment(docker/K8s),
we could not output the
STDOUT/STDERR to console and log files(taskmanager.out/err) simultaneously.
In consideration
of the user experience, we are using "conf/log4j-console.properties" to
only output the STDOUT/STDERR
to console. Then users could use "docker logs " or "kubectl
logs " to view
the logs easily.

Except for disabling the logging of TaskManagerStdoutFileHandler
in log4j-console.properties, you
could also customize the image entrypoint to redirect the STDOUT/STDERR to
separate files(taskmanager.out/err).


Best,
Yang

Till Rohrmann  于2020年10月8日周四 下午3:30写道:

> The easiest way to suppress this error would be to disable the logging for
> TaskManagerStdoutFileHandler in your log4j.properties file.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 8:48 PM sidhant gupta  wrote:
>
>> Hi Till,
>>
>> I understand the errors which appears in my logs are not stopping me from
>> running the job. I am running flink session cluster in ECS and also
>> configured graylog to get the container logs. So getting the docker logs is
>> also not an issue.
>> But is there a way to suppress this error or any work around ?
>>
>> Thanks
>> Sidhant Gupta
>>
>> On Wed, Oct 7, 2020, 9:15 PM Till Rohrmann  wrote:
>>
>>> Hi Sidhant,
>>>
>>> when using Flink's Docker image, then the cluster won't create the out
>>> files. Instead the components will directly write to STDOUT which is
>>> captured by Kubernetes and can be viewed using `kubectl logs POD_NAME`. The
>>> error which appears in your logs is not a problem. It is simply the REST
>>> handler which tries to serve the out files.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 7, 2020 at 5:11 PM 大森林  wrote:
>>>
 what's your running mode?
 if your flink cluster is on yarn mode,then the output you need has no
 relation to $FLINK_HOME/logs/*.out


 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月7日(星期三) 晚上11:33
 *收件人:* "大森林";"user";
 *主题:* Re: The file STDOUT does not exist on the TaskExecutor

 Hi,

 I'm running flink cluster in ecs. There is a pipeline which creates the
 job manager and then the task manager using the docker image.

 Not sure if we would want to restart the cluster in production.

 Is there any way we can make sure the .out files will be created
 without restart ?

 I am able to see the logs in the logs tab but not the stdout logs in
 the web ui and getting the below mentioned error after running the job.

 Thanks
 Sidhant Gupta


 On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:

> it's easy,
> just restart your flink cluster(standalone mode)
>
> if you run flink in yarn mode,then the result will display on
> $HADOOP/logs/*.out files
>
> -- 原始邮件 --
> *发件人:* "sidhant gupta" ;
> *发送时间:* 2020年10月7日(星期三) 晚上9:52
> *收件人:* "大森林";
> *抄送:* "user";
> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>
> ++ user
>
> On Wed, Oct 7, 2020, 6:47 PM sidhant gupta 
> wrote:
>
>> Hi
>>
>> I checked in the $FLINK_HOME/logs. The .out file was not there. Can
>> you suggest what should be the action item ?
>>
>> Thanks
>> Sidhant Gupta
>>
>>
>> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>>
>>>
>>> check if the .out file is in $FLINK_HOME/logs  please.
>>>
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月7日(星期三) 凌晨1:52
>>> *收件人:* "大森林";
>>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>>
>>> Hi,
>>>
>>> I am just running the docker container as it is by adding just the
>>> conf/flink.yaml .
>>> I am not sure if the .out file got deleted. Do we need to expose
>>> some ports ?
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>>
>>>
>>> On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:
>>>

 Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
 you can just use your default log settings.
 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月6日(星期二) 晚上10:59
 *收件人:* "user";
 *主题:* The file STDOUT does not exist on the TaskExecutor

 Hi,

 I am running dockerized flink:1.11.0-scala_2.11 container in ecs.
 I am getting the following error after the job runs:

 ERROR org.apache.flink.runtime.rest.handler.taskmanager.
 TaskManagerStdoutFileHandler [] - Unhandled exception.
 org.apache.flink.util.FlinkException: The file STDOUT does not
 exist on the TaskExecutor.
 at org.apache.flink.runtime.taskexecutor.TaskExecutor
 .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>>>

Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Dan Diephouse
Well, I just dropped in the latest Amazon 1.11.878 SDK and now it
appears to respect interrupts in a test case I created. (the test fails
with the SDK that is in use by Flink)

I will try it in a full fledged Flink environment and report back.

On Thu, Oct 8, 2020 at 3:41 PM Dan Diephouse  wrote:

> Did some digging... definitely appears that the Amazon SDK definitely is
> not picking up the interrupt.  I will try playing with the connection
> timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
> Anyone have any other ideas?
>
> In theory this should be fixed by SDK v2 which uses NIO, but I don't think
> I'm up for all the changes that would involve in the downstream components.
>
> On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:
>
>> Using the latest - 1.11.2.
>>
>> I would assume the interruption is being ignored in the Hadoop / S3
>> layer. I was looking at the defaults and (if I understood correctly) the
>> client will retry 20 times. Which would explain why it never gets
>> cancelled...
>>
>> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Dan Diephouse,
>>>
>>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>>> where 2 is a bug.
>>> It's unclear though where the interruption is ignored (Flink/Hadoop
>>> FS/S3 client).
>>>
>>> What version of Flink are you using?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:
>>>
 I am now using the S3 StreamingFileSink to send data to an S3 bucket.
 If/when the network connection has issues, it seems to put Flink into an
 irrecoverable state. Am I understanding this correctly? Any suggestions on
 how to troubleshoot / fix?

 Here is what I'm observing:

 *1. Network is dropped *

 *2. S3 connections do not exit gracefully*

 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
 o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
 not react to cancelling signal for 30 seconds, but is stuck in method:
  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
 java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
 java.base@14.0.2
 /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
 java.base@14.0.2
 /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
 java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
 java.base@14.0.2
 /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
 java.base@14.0.2
 /java.net.Socket$SocketInputStream.read(Socket.java:982)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
 java.base@14.0.2
 /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
 java.base@14.0.2
 /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)

 app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)

 app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)

 app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)

 app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)

 app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
 jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
 java.base@14.0.2
 /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)

 app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
 app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)

 app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)

 app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)

 app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)

 app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)

 app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83

Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Dan Diephouse
Did some digging... definitely appears that the Amazon SDK definitely is
not picking up the interrupt.  I will try playing with the connection
timeout. Hadoop defaults it to 20 ms, which may be part of the problem.
Anyone have any other ideas?

In theory this should be fixed by SDK v2 which uses NIO, but I don't think
I'm up for all the changes that would involve in the downstream components.

On Thu, Oct 8, 2020 at 8:36 AM Dan Diephouse  wrote:

> Using the latest - 1.11.2.
>
> I would assume the interruption is being ignored in the Hadoop / S3 layer.
> I was looking at the defaults and (if I understood correctly) the client
> will retry 20 times. Which would explain why it never gets cancelled...
>
> On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dan Diephouse,
>>
>> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
>> where 2 is a bug.
>> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
>> client).
>>
>> What version of Flink are you using?
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:
>>
>>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>>> If/when the network connection has issues, it seems to put Flink into an
>>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>>> how to troubleshoot / fix?
>>>
>>> Here is what I'm observing:
>>>
>>> *1. Network is dropped *
>>>
>>> *2. S3 connections do not exit gracefully*
>>>
>>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>>> java.base@14.0.2
>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>>> java.base@14.0.2
>>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>>
>>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>>
>>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>>
>>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>>
>>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>>> java.base@14.0.2
>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>>
>>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>>
>>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>>
>>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>>
>>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>>
>>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>>
>>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>>
>>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>>
>>>

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-08 Thread Dan Hill
I figured out the issue.  The join caused part of the job's execution to be
delayed.  I added my own hacky wait condition into the test to make sure
the join job finishes first and it's fine.

What common test utilities exist for Flink?  I found
flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
for jobs to finish.  I'm guessing this can be done with one of the other
utilities.

Are there any open source test examples?

How are watermarks usually sent with Table API in tests?

After I collect some answers, I'm fine updating the Flink testing page.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs

On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Can't comment on the SQL issues, but here's our exact setup for Bazel and
> Junit5 w/ the resource files approach:
> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
>
> Best,
> Austin
>
> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:
>
>> I was able to get finer grained logs showing.  I switched from
>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
>> larger test case, I was hitting a silent log4j error.  When I created a
>> small test case to just test logging, I received a log4j error.
>>
>> Here is a tar
>> 
>> with the info logs for:
>> - (test-nojoin.log) this one works as expected
>> - (test-join.log) this does not work as expected
>>
>> I don't see an obvious issue just by scanning the logs.  I'll take a
>> deeper in 9 hours.
>>
>>
>>
>>
>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
>>
>>> Switching to junit4 did not help.
>>>
>>> If I make a request to the url returned from
>>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>>> I get
>>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>>
>>>
>>>
>>>
>>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>>>
 @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
 reference.  However, the actual log calls are not printing to the console.
 Only errors appear in my terminal window and the test logs.  Maybe console
 logger does not work for this junit setup.  I'll see if the file version
 works.

 On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> What Aljoscha suggested is what works for us!
>
> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
> wrote:
>
>> Hi Dan,
>>
>> to make the log properties file work this should do it: assuming the
>> log4j.properties is in //src/main/resources. You will need a
>> BUILD.bazel
>> in that directory that has only the line
>> "exports_files(["log4j.properties"]). Then you can reference it in
>> your
>> test via "resources = ["//src/main/resources:log4j.properties"],". Of
>> course you also need to have the right log4j deps (or slf4j if you're
>> using that)
>>
>> Hope that helps!
>>
>> Aljoscha
>>
>> On 07.10.20 00:41, Dan Hill wrote:
>> > I'm trying to use Table API for my job.  I'll soon try to get a test
>> > working for my stream job.
>> > - I'll parameterize so I can have different sources and sink for
>> tests.
>> > How should I mock out a Kafka source?  For my test, I was planning
>> on
>> > changing the input to be from a temp file (instead of Kafka).
>> > - What's a good way of forcing a watermark using the Table API?
>> >
>> >
>> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
>> wrote:
>> >
>> >> Thanks!
>> >>
>> >> Great to know.  I copied this junit5-jupiter-starter-bazel
>> >> <
>> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
>> rule
>> >> into my repository (I don't think junit5 is supported directly with
>> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
>> into the
>> >> jar and didn't get them to work.  My current iteration hacks the
>> >> log4j.properties file as an absolute path.  My failed attempts
>> would spit
>> >> an error saying log4j.properties file was not found.  This route
>> finds it
>> >> but the log properties are not used for the java logger.
>> >>
>> >> Are there a better set of rules to use for junit5?
>> >>
>> >> # build rule
>> >> java_junit5_test(
>> >>  name = "tests",
>> >>  srcs = glob(["*.java"]),
>> >>  test_package = "ai.promoted.logprocessor.batch",
>> >>  deps = [...],
>> >>  jvm_flags =
>> >>
>> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
>> >> )
>> >>
>> >> # log4j.properties
>> >> status = error
>> >> name = Log4j2Pro

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-08 Thread Dylan Forciea
I’ve updated the unit tests and documentation, and I was running the azure test 
pipeline as described in the instructions. However, it appears that what seems 
to be an unrelated test for the JMX code failed. Is this a matter of me not 
setting things up correctly? I wanted to ensure everything looked good before I 
submitted the PR.

[ERROR] Failures:
[ERROR]   JMXReporterFactoryTest.testPortRangeArgument:46
Expected: (a value equal to or greater than <9000> and a value less than or 
equal to <9010>)
 but: a value less than or equal to <9010> <9040> was greater than <9010>
[ERROR]   JMXReporterFactoryTest.testWithoutArgument:60
[INFO]
[ERROR] Tests run: 10, Failures: 2, Errors: 0, Skipped: 0

Thanks,
Dylan Forciea

From: Till Rohrmann 
Date: Thursday, October 8, 2020 at 2:29 AM
To: Dylan Forciea 
Cc: dev , Shengkai Fang , 
"user@flink.apache.org" , "j...@apache.org" 
, Leonard Xu 
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

This sounds good. Maybe there are others in the community who can help with the 
review before the Jark and Leonard are back.

Cheers,
Till

On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Actually…. It looks like what I did covers both cases. I’ll see about getting 
some unit tests and documentation updated.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, October 7, 2020 at 11:47 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>, dev 
mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"j...@apache.org" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Ok, I have created FLINK-19522 describing the issue. I have the code I made so 
far checked in at 
https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but this 
only fixes the SQL API. It sounds like there may be another change needed for 
the Table API… I’ll look into that and see if I can figure it out on my own 
while they’re out. I will also need to add some unit tests and update some 
documentation to get this ready for a PR.

Thanks,
Dylan

From: Till Rohrmann mailto:trohrm...@apache.org>>
Date: Wednesday, October 7, 2020 at 10:55 AM
To: dev mailto:d...@flink.apache.org>>
Cc: Shengkai Fang mailto:fskm...@gmail.com>>, 
"user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"j...@apache.org" 
mailto:j...@apache.org>>, Leonard Xu 
mailto:xbjt...@gmail.com>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Hi Dylan,

thanks for reaching out to the Flink community and excuse our late response. I 
am not an expert for the Table API and its JDBC connector but what you describe 
sounds like a missing feature. Also given that FLINK-12198 enabled this feature 
for the JDBCInputFormat indicates that we might simply need to make it 
configurable from the JdbcTableSource. I am pulling in Jark and Leonard who 
worked on the JdbcTableSource and might help you to get this feature into 
Flink. Their response could take a week because they are currently on vacation 
if I am not mistaken.

What you could already do is to open an issue linking FLINK-12198 and 
describing the problem and your solution proposal.

[1] https://issues.apache.org/jira/browse/FLINK-12198

Cheers,
Till

On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
I appreciate it! Let me know if you want me to submit a PR against the issue 
after it is created. It wasn’t a huge amount of code, so it’s probably not a 
big deal if you wanted to redo it.

Thanks,
Dylan

From: Shengkai Fang mailto:fskm...@gmail.com>>
Date: Wednesday, October 7, 2020 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API

Sorry for late response. +1 to support it. I will open a jira about it later.

Dylan Forciea 
mailto:dy...@oseberg.io>>>于2020年10月7日
 周三下午9:53写道:













I hadn’t heard a response on this, so I’m going to expand this to the dev email 
list.



If this is indeed an issue and not my misunderstanding, I have most of a patch 
already coded up. Please let me know, and I can create a JIRA issue and send 
out a PR.



Regards,

Dylan Forciea

Oseberg




From: Dylan Forciea 
mailto:dy...@oseberg.io>>>


Date: Thursday, October 1, 2020 at 5:14 PM


To: 
"user@flink.apache.org>"
 
mailto:user@flink.apache.org>>>


Subject: autoCommit for postgres jdbc streaming in Table/SQL API






Hi! I’ve just recently started evaluating Flink for our ETL needs, and I ran 

Re: How can I increase the parallelism on the Table API for Streaming Aggregation?

2020-10-08 Thread Khachatryan Roman
Hi Felipe,

Your source is not parallel so it doesn't make sense to make local group
operator parallel.
If the source implemented ParallelSourceFunction, subsequent operators
would be parallelized too.

Regards,
Roman


On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I was implementing the stream aggregation using Table API [1] and
> trying out the local aggregation plan to optimize the query. Basically
> I had to configure it like this:
>
> Configuration configuration = tableEnv.getConfig().getConfiguration();
> // set low-level key-value options
> configuration.setInteger("table.exec.resource.default-parallelism", 4);
> // local-global aggregation depends on mini-batch is enabled
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
> configuration.setString("table.exec.mini-batch.size", "1000");
> // enable two-phase, i.e. local-global aggregation
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>
> and when I saw the query plan on the dashboard I realized that the
> LocalGroupAggregate is with parallelism 1 while the
> GlobalGroupAggregate is with parallelism 4. Why was the
> LocalGroupAggregate also with parallelism 4 since I set it on the
> property ("table.exec.resource.default-parallelism"? Here is my code
> [2].
>
> Thanks,
> Felipe
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-08 Thread Austin Cawley-Edwards
Can't comment on the SQL issues, but here's our exact setup for Bazel and
Junit5 w/ the resource files approach:
https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:

> I was able to get finer grained logs showing.  I switched from
> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
> larger test case, I was hitting a silent log4j error.  When I created a
> small test case to just test logging, I received a log4j error.
>
> Here is a tar
> 
> with the info logs for:
> - (test-nojoin.log) this one works as expected
> - (test-join.log) this does not work as expected
>
> I don't see an obvious issue just by scanning the logs.  I'll take a
> deeper in 9 hours.
>
>
>
>
> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
>
>> Switching to junit4 did not help.
>>
>> If I make a request to the url returned from
>> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>> I get
>> {"errors":["Not found."]}.  I'm not sure if this is intentional.
>>
>>
>>
>>
>> On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:
>>
>>> @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
>>> reference.  However, the actual log calls are not printing to the console.
>>> Only errors appear in my terminal window and the test logs.  Maybe console
>>> logger does not work for this junit setup.  I'll see if the file version
>>> works.
>>>
>>> On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 What Aljoscha suggested is what works for us!

 On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
 wrote:

> Hi Dan,
>
> to make the log properties file work this should do it: assuming the
> log4j.properties is in //src/main/resources. You will need a
> BUILD.bazel
> in that directory that has only the line
> "exports_files(["log4j.properties"]). Then you can reference it in
> your
> test via "resources = ["//src/main/resources:log4j.properties"],". Of
> course you also need to have the right log4j deps (or slf4j if you're
> using that)
>
> Hope that helps!
>
> Aljoscha
>
> On 07.10.20 00:41, Dan Hill wrote:
> > I'm trying to use Table API for my job.  I'll soon try to get a test
> > working for my stream job.
> > - I'll parameterize so I can have different sources and sink for
> tests.
> > How should I mock out a Kafka source?  For my test, I was planning on
> > changing the input to be from a temp file (instead of Kafka).
> > - What's a good way of forcing a watermark using the Table API?
> >
> >
> > On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
> wrote:
> >
> >> Thanks!
> >>
> >> Great to know.  I copied this junit5-jupiter-starter-bazel
> >> <
> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
> rule
> >> into my repository (I don't think junit5 is supported directly with
> >> java_test yet).  I tried a few ways of bundling `log4j.properties`
> into the
> >> jar and didn't get them to work.  My current iteration hacks the
> >> log4j.properties file as an absolute path.  My failed attempts
> would spit
> >> an error saying log4j.properties file was not found.  This route
> finds it
> >> but the log properties are not used for the java logger.
> >>
> >> Are there a better set of rules to use for junit5?
> >>
> >> # build rule
> >> java_junit5_test(
> >>  name = "tests",
> >>  srcs = glob(["*.java"]),
> >>  test_package = "ai.promoted.logprocessor.batch",
> >>  deps = [...],
> >>  jvm_flags =
> >>
> ["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
> >> )
> >>
> >> # log4j.properties
> >> status = error
> >> name = Log4j2PropertiesConfig
> >> appenders = console
> >> appender.console.type = Console
> >> appender.console.name = LogToConsole
> >> appender.console.layout.type = PatternLayout
> >> appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
> >> rootLogger.level = info
> >> rootLogger.appenderRefs = stdout
> >> rootLogger.appenderRef.stdout.ref = LogToConsole
> >>
> >> On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
> >> austin.caw...@gmail.com> wrote:
> >>
> >>> Oops, this is actually the JOIN issue thread [1]. Guess I should
> revise
> >>> my previous "haven't had issues" statement hah. Sorry for the spam!
> >>>
> >>> [1]:
> >>>
> apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html
> >>>
> >>> On Tue, Oct 6, 20

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-08 Thread Austin Cawley-Edwards
Hey Timo,

Sorry for the delayed reply. I'm using the Blink planner and using
non-time-based joins. I've got an example repo here that shows my query/
setup [1]. It's got the manual timestamp assignment commented out for now,
but that does indeed solve the issue.

I'd really like to not worry about time at all in this job hah -- I started
just using processing time, but Till pointed out that processing time
timers won't be fired when input ends, which is the case for this streaming
job processing CSV files, so I should be using event time. With that
suggestion, I switched to ingestion time, where I then discovered the issue
converting from SQL to data stream.

IMO, as a user manually assigning timestamps on conversion makes sense if
you're using event time and already handling time attributes yourself, but
for ingestion time you really don't want to think about time at all, which
is why it might make sense to propigate the automatically assigned
timestamps in that case. Though not sure how difficult that would be. Let
me know what you think!


Best + thanks again,
Austin

[1]: https://github.com/austince/flink-1.10-sql-windowing-error

On Mon, Oct 5, 2020 at 4:24 AM Timo Walther  wrote:

> Btw which planner are you using?
>
> Regards,
> Timo
>
> On 05.10.20 10:23, Timo Walther wrote:
> > Hi Austin,
> >
> > could you share some details of your SQL query with us? The reason why
> > I'm asking is because I guess that the rowtime field is not inserted
> > into the `StreamRecord` of DataStream API. The rowtime field is only
> > inserted if there is a single field in the output of the query that is a
> > valid "time attribute".
> >
> > Esp. after non-time-based joins and aggregations, time attributes loose
> > there properties and become regular timestamps. Because timestamp and
> > watermarks might have diverged.
> >
> > If you know what you're doing, you can also assign the timestamp
> > manually after `toRetractStream.assignTimestampAndWatermarks` and
> > reinsert the field into the stream record. But before you do that, I
> > think it is better to share more information about the query with us.
> >
> > I hope this helps.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 05.10.20 09:25, Till Rohrmann wrote:
> >> Hi Austin,
> >>
> >> thanks for offering to help. First I would suggest asking Timo whether
> >> this is an aspect which is still missing or whether we overlooked it.
> >> Based on that we can then take the next steps.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 7:05 PM Austin Cawley-Edwards
> >> mailto:austin.caw...@gmail.com>> wrote:
> >>
> >> Hey Till,
> >>
> >> Thanks for the notes. Yeah, the docs don't mention anything specific
> >> to this case, not sure if it's an uncommon one. Assigning timestamps
> >> on conversion does solve the issue. I'm happy to take a stab at
> >> implementing the feature if it is indeed missing and you all think
> >> it'd be worthwhile. I think it's definitely a confusing aspect of
> >> working w/ the Table & DataStream APIs together.
> >>
> >> Best,
> >> Austin
> >>
> >> On Fri, Oct 2, 2020 at 6:05 AM Till Rohrmann  >> > wrote:
> >>
> >> Hi Austin,
> >>
> >> yes, it should also work for ingestion time.
> >>
> >> I am not entirely sure whether event time is preserved when
> >> converting a Table into a retract stream. It should be possible
> >> and if it is not working, then I guess it is a missing feature.
> >> But I am sure that @Timo Walther
> >>  knows more about it. In doubt, you
> >> could assign a new watermark generator when having obtained the
> >> retract stream.
> >>
> >> Here is also a link to some information about event time and
> >> watermarks [1]. Unfortunately, it does not state anything about
> >> the direction Table => DataStream.
> >>
> >> [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html
> >>
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Oct 2, 2020 at 12:10 AM Austin Cawley-Edwards
> >> mailto:austin.caw...@gmail.com>>
> wrote:
> >>
> >> Hey Till,
> >>
> >> Just a quick question on time characteristics -- this should
> >> work for IngestionTime as well, correct? Is there anything
> >> special I need to do to have the CsvTableSource/
> >> toRetractStream call to carry through the assigned
> >> timestamps, or do I have to re-assign timestamps during the
> >> conversion? I'm currently getting the `Record has
> >> Long.MIN_VALUE timestamp (= no timestamp marker)` error,
> >> though I'm seeing timestamps being assigned if I step
> >> through the AutomaticWatermarkContext.
> >>
> >> Thanks,
> >> Austin
> >>
>

Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Dan Diephouse
Using the latest - 1.11.2.

I would assume the interruption is being ignored in the Hadoop / S3 layer.
I was looking at the defaults and (if I understood correctly) the client
will retry 20 times. Which would explain why it never gets cancelled...

On Thu, Oct 8, 2020 at 1:27 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Dan Diephouse,
>
> From the logs you provided indeed it looks like 1 causes 2 => 3 => 4,
> where 2 is a bug.
> It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
> client).
>
> What version of Flink are you using?
>
> Regards,
> Roman
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:
>
>> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
>> If/when the network connection has issues, it seems to put Flink into an
>> irrecoverable state. Am I understanding this correctly? Any suggestions on
>> how to troubleshoot / fix?
>>
>> Here is what I'm observing:
>>
>> *1. Network is dropped *
>>
>> *2. S3 connections do not exit gracefully*
>>
>> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
>> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
>> not react to cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
>> java.base@14.0.2
>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
>> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
>> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
>> java.base@14.0.2
>> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
>> java.base@14.0.2
>> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>>
>> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>>
>> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>>
>> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>>
>> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
>> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
>> java.base@14.0.2
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>>
>> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
>> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>>
>> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>>
>> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>>
>> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>>
>> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>>
>> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>>
>> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>>
>> app//com.amazonaws.http.AmazonHttpClient.exe

How can I increase the parallelism on the Table API for Streaming Aggregation?

2020-10-08 Thread Felipe Gutierrez
Hi community,

I was implementing the stream aggregation using Table API [1] and
trying out the local aggregation plan to optimize the query. Basically
I had to configure it like this:

Configuration configuration = tableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setInteger("table.exec.resource.default-parallelism", 4);
// local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
configuration.setString("table.exec.mini-batch.size", "1000");
// enable two-phase, i.e. local-global aggregation
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

and when I saw the query plan on the dashboard I realized that the
LocalGroupAggregate is with parallelism 1 while the
GlobalGroupAggregate is with parallelism 4. Why was the
LocalGroupAggregate also with parallelism 4 since I set it on the
property ("table.exec.resource.default-parallelism"? Here is my code
[2].

Thanks,
Felipe

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
[2] 
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: 订阅

2020-10-08 Thread tison
Please send email with any content to -subscr...@flink.apache.org
for subscription.

For example, mailto:user-zh-subscr...@flink.apache.org to subscribe
user...@flink.apache.org

Best,
tison.


葛春法-18667112979  于2020年10月8日周四 下午8:45写道:

> I want to subscribe flink mail.


Re: state access causing segmentation fault

2020-10-08 Thread Dawid Wysakowicz
Hi,

It should be absolutely fine to use multiple state objects. I am not
aware of any limits to that. A minimal, reproducible example would
definitely be helpful. For those kind of exceptions, I'd look into the
serializers you use. Other than that I cannot think of an obvious reason
for that kind of exceptions.

Best,

Dawid

On 08/10/2020 12:12, Colletta, Edward wrote:
>
> Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on
> EC2 instances.
>
>  
>
> I have a KeyedProcessFunction that is causing a segmentation fault,
> crashing the flink task manager.  The seems to be caused by using 3
> State variables in the operator.  The crash happens consistently after
> some load is processed.
>
> This is the second time I have encountered this.   The first time I
> had 3 ValueState variables, this time I had 2 ValueState variables and
> a MapState variable.  Both times the error was alleviated by removing
> one of the state variables.
>
> This time I replaced the 2 valueState variables with a Tuple2 of the
> types of the individual variables.   I can try to put together a
> minimal example, but I was wondering if anyone has encountered this
> problem.
>
>  
>
> Are there any documented limits of the number of state variables 1
> operator can use?
>
>  
>
> For background the reason I use multiple state variables is the
> operator is processing 2 types of inputs, Left and Right.  When Left
> is received it is put it into a PriorityQueue. When the Right type is
> received I put that into a ring buffer.
>
> I replaced the PriorityQueue with a queue of Ids and MapState to hold
> the elements.  So I have Left stored in a queue ValueState variable
> and MapState variable, and Right is stored in the ring buffer
> ValueState variable.
>
>  
>
>  
>


signature.asc
Description: OpenPGP digital signature


Native State in Python Stateful Functions?

2020-10-08 Thread Clements, Danial C
Hi,

In doing some testing with Flink stateful functions in Python and I’ve gotten a 
small POC working.  One of our key requirements for our stream processors is 
that they be written in python due to the skillset of our team.  Given that the 
Python DataStreams api seems to be under development in Flink 1.12, we’ve 
implemented our business logic as a stateful function using the remote pattern. 
 In some testing, it seems the state object is getting serialized and sent 
along with each HTTP request and given that we’re storing quite a bit of data 
in this state, this seems to contribute to the latency of the application in a 
linear fashion.  Is there any way around this?  Is there a way to store the 
state local to the python application?

Thanks,
Dan

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


state access causing segmentation fault

2020-10-08 Thread Colletta, Edward
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 
instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing 
the flink task manager.  The seems to be caused by using 3 State variables in 
the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 
ValueState variables, this time I had 2 ValueState variables and a MapState 
variable.  Both times the error was alleviated by removing one of the state 
variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of 
the individual variables.   I can try to put together a minimal example, but I 
was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can 
use?

For background the reason I use multiple state variables is the operator is 
processing 2 types of inputs, Left and Right.  When Left is received it is put 
it into a PriorityQueue. When the Right type is received I put that into a ring 
buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the 
elements.  So I have Left stored in a queue ValueState variable and MapState 
variable, and Right is stored in the ring buffer ValueState variable.




Re: flink configuration: best practice for checkpoint storage secrets

2020-10-08 Thread XU Qinghui
Hello Till

Thanks a lot for the reply. But it turns out the IAM is applicable only
when the job is running inside AWS, which is not my case (basically we are
just using the S3 API provided by other services).
By reading again the flink doc, it seems it's suggesting to use the
flink-conf.yaml file, though.

Best regards,
Qinghui

Le mer. 7 oct. 2020 à 18:21, Till Rohrmann  a écrit :

> Hi Qinghui,
>
> the recommended way would be to use AWS identity and access management
> (IAM) [1] if possible.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#configure-access-credentials
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 12:31 PM XU Qinghui  wrote:
>
>> Hello, folks
>>
>> We are trying to use S3 for the checkpoint storage, and this
>> involves some secrets in the configuration. We tried two approaches to
>> configure those secrets:
>> - in the jvm application argument for jobmanager and taskmanager, such as
>> -Ds3.secret-key
>> - in the flink-conf.yaml file for jobmanager and taskmanager
>>
>> Is there a third way? What's the best practice?
>> Thanks a lot!
>>
>> Best regards,
>> Qinghui
>>
>


Re: Network issue leading to "No pooled slot available"

2020-10-08 Thread Khachatryan Roman
Hi Dan Diephouse,

>From the logs you provided indeed it looks like 1 causes 2 => 3 => 4, where
2 is a bug.
It's unclear though where the interruption is ignored (Flink/Hadoop FS/S3
client).

What version of Flink are you using?

Regards,
Roman


On Wed, Oct 7, 2020 at 11:16 PM Dan Diephouse  wrote:

> I am now using the S3 StreamingFileSink to send data to an S3 bucket.
> If/when the network connection has issues, it seems to put Flink into an
> irrecoverable state. Am I understanding this correctly? Any suggestions on
> how to troubleshoot / fix?
>
> Here is what I'm observing:
>
> *1. Network is dropped *
>
> *2. S3 connections do not exit gracefully*
>
> 2020-10-07 20:58:07.468  WARN 1 --- [aa565930b86fb).]
> o.apache.flink.runtime.taskmanager.Task  : Task 'Sink: Unnamed (1/12)' did
> not react to cancelling signal for 30 seconds, but is stuck in method:
>  java.base@14.0.2/sun.nio.ch.Net.poll(Native Method)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:181)
> java.base@14.0.2
> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:285)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:309)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
> java.base@14.0.2/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
> java.base@14.0.2/java.net.Socket$SocketInputStream.read(Socket.java:982)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:469)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:463)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:160)
> java.base@14.0.2
> /sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1475)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1381)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:441)
> java.base@14.0.2
> /sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:412)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:436)
>
> app//org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:384)
>
> app//com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
>
> app//org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
>
> app//org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:374)
> jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> java.base@14.0.2
> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.base@14.0.2/java.lang.reflect.Method.invoke(Method.java:564)
>
> app//com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> app//com.amazonaws.http.conn.$Proxy89.connect(Unknown Source)
>
> app//org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
>
> app//org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
>
> app//org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
>
> app//org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
>
> app//org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
>
> app//com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1323)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>
> app//com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> app//com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054)
>
> app//com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000)
>
> app//com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3574)
>
> app//org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3A

Re: The file STDOUT does not exist on the TaskExecutor

2020-10-08 Thread Till Rohrmann
The easiest way to suppress this error would be to disable the logging for
TaskManagerStdoutFileHandler in your log4j.properties file.

Cheers,
Till

On Wed, Oct 7, 2020 at 8:48 PM sidhant gupta  wrote:

> Hi Till,
>
> I understand the errors which appears in my logs are not stopping me from
> running the job. I am running flink session cluster in ECS and also
> configured graylog to get the container logs. So getting the docker logs is
> also not an issue.
> But is there a way to suppress this error or any work around ?
>
> Thanks
> Sidhant Gupta
>
> On Wed, Oct 7, 2020, 9:15 PM Till Rohrmann  wrote:
>
>> Hi Sidhant,
>>
>> when using Flink's Docker image, then the cluster won't create the out
>> files. Instead the components will directly write to STDOUT which is
>> captured by Kubernetes and can be viewed using `kubectl logs POD_NAME`. The
>> error which appears in your logs is not a problem. It is simply the REST
>> handler which tries to serve the out files.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 7, 2020 at 5:11 PM 大森林  wrote:
>>
>>> what's your running mode?
>>> if your flink cluster is on yarn mode,then the output you need has no
>>> relation to $FLINK_HOME/logs/*.out
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月7日(星期三) 晚上11:33
>>> *收件人:* "大森林";"user";
>>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>>
>>> Hi,
>>>
>>> I'm running flink cluster in ecs. There is a pipeline which creates the
>>> job manager and then the task manager using the docker image.
>>>
>>> Not sure if we would want to restart the cluster in production.
>>>
>>> Is there any way we can make sure the .out files will be created without
>>> restart ?
>>>
>>> I am able to see the logs in the logs tab but not the stdout logs in the
>>> web ui and getting the below mentioned error after running the job.
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>>
>>> On Wed, Oct 7, 2020, 8:00 PM 大森林  wrote:
>>>
 it's easy,
 just restart your flink cluster(standalone mode)

 if you run flink in yarn mode,then the result will display on
 $HADOOP/logs/*.out files

 -- 原始邮件 --
 *发件人:* "sidhant gupta" ;
 *发送时间:* 2020年10月7日(星期三) 晚上9:52
 *收件人:* "大森林";
 *抄送:* "user";
 *主题:* Re: The file STDOUT does not exist on the TaskExecutor

 ++ user

 On Wed, Oct 7, 2020, 6:47 PM sidhant gupta  wrote:

> Hi
>
> I checked in the $FLINK_HOME/logs. The .out file was not there. Can
> you suggest what should be the action item ?
>
> Thanks
> Sidhant Gupta
>
>
> On Wed, Oct 7, 2020, 7:17 AM 大森林  wrote:
>
>>
>> check if the .out file is in $FLINK_HOME/logs  please.
>>
>> -- 原始邮件 --
>> *发件人:* "sidhant gupta" ;
>> *发送时间:* 2020年10月7日(星期三) 凌晨1:52
>> *收件人:* "大森林";
>> *主题:* Re: The file STDOUT does not exist on the TaskExecutor
>>
>> Hi,
>>
>> I am just running the docker container as it is by adding just the
>> conf/flink.yaml .
>> I am not sure if the .out file got deleted. Do we need to expose some
>> ports ?
>>
>> Thanks
>> Sidhant Gupta
>>
>>
>>
>> On Tue, Oct 6, 2020, 8:51 PM 大森林  wrote:
>>
>>>
>>> Hi,I guess you may deleted .out file in $FLINK_HOME/logs.
>>> you can just use your default log settings.
>>> -- 原始邮件 --
>>> *发件人:* "sidhant gupta" ;
>>> *发送时间:* 2020年10月6日(星期二) 晚上10:59
>>> *收件人:* "user";
>>> *主题:* The file STDOUT does not exist on the TaskExecutor
>>>
>>> Hi,
>>>
>>> I am running dockerized flink:1.11.0-scala_2.11 container in ecs. I
>>> am getting the following error after the job runs:
>>>
>>> ERROR org.apache.flink.runtime.rest.handler.taskmanager.
>>> TaskManagerStdoutFileHandler [] - Unhandled exception.
>>> org.apache.flink.util.FlinkException: The file STDOUT does not
>>> exist on the TaskExecutor.
>>> at org.apache.flink.runtime.taskexecutor.TaskExecutor
>>> .lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
>>> CompletableFuture.java:1604) ~[?:1.8.0_262]
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149) ~[?:1.8.0_262]
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624) ~[?:1.8.0_262]
>>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_262]
>>>
>>> I guess "file" needs to be added in log4j.properties in the docker
>>> container e.g. log4j.rootLogger=INFO, file
>>> Are there any other properties which needs to be configured in any
>>> of the other property files or any jar needs to be added in the 
>>> */opt/flink
>>> *path ?
>>> Thank

Re: autoCommit for postgres jdbc streaming in Table/SQL API

2020-10-08 Thread Till Rohrmann
This sounds good. Maybe there are others in the community who can help with
the review before the Jark and Leonard are back.

Cheers,
Till

On Wed, Oct 7, 2020 at 7:33 PM Dylan Forciea  wrote:

> Actually…. It looks like what I did covers both cases. I’ll see about
> getting some unit tests and documentation updated.
>
>
>
> Dylan
>
>
>
> *From: *Dylan Forciea 
> *Date: *Wednesday, October 7, 2020 at 11:47 AM
> *To: *Till Rohrmann , dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Ok, I have created FLINK-19522 describing the issue. I have the code I
> made so far checked in at
> https://github.com/apache/flink/compare/master...dforciea:FLINK-19522 but
> this only fixes the SQL API. It sounds like there may be another change
> needed for the Table API… I’ll look into that and see if I can figure it
> out on my own while they’re out. I will also need to add some unit tests
> and update some documentation to get this ready for a PR.
>
>
>
> Thanks,
>
> Dylan
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, October 7, 2020 at 10:55 AM
> *To: *dev 
> *Cc: *Shengkai Fang , "user@flink.apache.org" <
> user@flink.apache.org>, "j...@apache.org" , Leonard Xu <
> xbjt...@gmail.com>
> *Subject: *Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
> Hi Dylan,
>
>
>
> thanks for reaching out to the Flink community and excuse our late
> response. I am not an expert for the Table API and its JDBC connector but
> what you describe sounds like a missing feature. Also given that
> FLINK-12198 enabled this feature for the JDBCInputFormat indicates that we
> might simply need to make it configurable from the JdbcTableSource. I am
> pulling in Jark and Leonard who worked on the JdbcTableSource and might
> help you to get this feature into Flink. Their response could take a week
> because they are currently on vacation if I am not mistaken.
>
>
>
> What you could already do is to open an issue linking FLINK-12198 and
> describing the problem and your solution proposal.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-12198
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Oct 7, 2020 at 5:00 PM Dylan Forciea  wrote:
>
> I appreciate it! Let me know if you want me to submit a PR against the
> issue after it is created. It wasn’t a huge amount of code, so it’s
> probably not a big deal if you wanted to redo it.
>
> Thanks,
> Dylan
>
> From: Shengkai Fang 
> Date: Wednesday, October 7, 2020 at 9:06 AM
> To: Dylan Forciea 
> Subject: Re: autoCommit for postgres jdbc streaming in Table/SQL API
>
> Sorry for late response. +1 to support it. I will open a jira about it
> later.
>
> Dylan Forciea mailto:dy...@oseberg.io>>于2020年10月7日 周三下午
> 9:53写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
> I hadn’t heard a response on this, so I’m going to expand this to the dev
> email list.
>
>
>
> If this is indeed an issue and not my misunderstanding, I have most of a
> patch already coded up. Please let me know, and I can create a JIRA issue
> and send out a PR.
>
>
>
> Regards,
>
> Dylan Forciea
>
> Oseberg
>
>
>
>
> From: Dylan Forciea mailto:dy...@oseberg.io>>
>
>
> Date: Thursday, October 1, 2020 at 5:14 PM
>
>
> To: "user@flink.apache.org" <
> user@flink.apache.org>
>
>
> Subject: autoCommit for postgres jdbc streaming in Table/SQL API
>
>
>
>
>
>
> Hi! I’ve just recently started evaluating Flink for our ETL needs, and I
> ran across an issue with streaming postgres data via the Table/SQL API.
>
>
>
> I see that the API has the scan.fetch-size option, but not
> scan.auto-commit per
>
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html
> . I had attempted to load a large table in, but it completely slurped it
> into memory before starting the streaming. I modified the flink source code
> to add a scan.auto-commit
>
> option, and I was then able to immediately start streaming and cut my
> memory usage way down.
>
>
>
> I see in this thread that there was a similar issue resolved for
> JDBCInputFormat in this thread:
>
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-JDBC-Disable-auto-commit-mode-td27256.html
> , but I don’t see a way to utilize that in the Table/SQL API.
>
>
>
> Am I missing something on how to pull this off?
>
>
>
> Regards,
>
> Dylan Forciea
>
> Oseberg
>
>
>
>
>
>