Additional options to S3 Filesystem: Interest?

2020-10-10 Thread Padarn Wilson
Hi Flink Users,

We need to expose some additional options for the s3 hadoop filesystem:
Specifically, we want to set object tagging and lifecycle. This would be a
fairly easy change and we initially thought to create a new Filsystem with
very minor changes to allow this.

However then I wondered, would others use this? If it something that is
worth raising as a Flink issue and then contributing back upstream.

Any others who would like to be able to set object tags for the s3
filesystem?

Cheers,
Padarn


RE: state access causing segmentation fault

2020-10-10 Thread Colletta, Edward
Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction {

private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

public TypeInformation leftTypeInfo;

public transient ValueState leftState;



public int initQueueSize;

public long emitFrequencyMs;



public MyKeyedProcessFunction() {

initQueueSize = 10;

emitFrequencyMs = 1;

}



@Override

public void open(Configuration conf) {

leftTypeInfo = TypeInformation.of(new TypeHint(){});

leftState = getRuntimeContext().getState(

new ValueStateDescriptor<>("left", leftTypeInfo, null));

}



@Override

public void processElement(Exec leftIn, Context ctx, Collector out) {

try {

ExecQueue eq = leftState.value();

if (eq == null) {

eq = new ExecQueue(10);


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

leftState.update(eq);

}

catch (Exception e) {

LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());



}

}





@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) {

try {

ExecQueue eq = leftState.value();


ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

}

catch ( Exception e) {

LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

for (java.lang.StackTraceElement s:e.getStackTrace())

LOG.error(s.toString());

}

}

public class ExecQueue {

public RingBufferExec queue;

public ExecQueue (){}

public ExecQueue (int initSize) {

queue = new RingBufferExec(initSize);

}



public class RingBufferExec {

public Integer size;

public Integer count;

public RingBufferExec(){ }

public RingBufferExec(int sizeIn){

size = sizeIn;

count = 0;

}

}

}

}


From: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of 
any limits to that. A minimal, reproducible example would definitely be 
helpful. For those kind of exceptions, I'd look into the serializers you use. 
Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 
instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing 
the flink task manager.  The seems to be caused by using 3 State variables in 
the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 
ValueState variables, this time I had 2 ValueState variables and a MapState 
variable.  Both times the error was alleviated by removing one of the state 
vari

Re: how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-10 Thread David Anderson
>
> Could I use your command with no docker?


Hypothetically, yes, but it's a somewhat impractical idea. The
ClickCountJob needs Flink and Kafka, and there is another java application
(the clickevent-generator) that writes into Kafka the data that is being
processed.

On Sat, Oct 10, 2020 at 5:32 PM 大森林  wrote:

>
> Could I use your command with no docker?
>
> -- 原始邮件 --
> *发件人:* "David Anderson" ;
> *发送时间:* 2020年10月10日(星期六) 晚上10:30
> *收件人:* "大森林";
> *抄送:* "Arvid Heise";"user";
> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>
> The ClickCountJob used in the operations playground accepts an application
> parameter, like this:
>
> flink run -d /opt/ClickCountJob.jar --bootstrap.servers kafka:9092
> --checkpointing --event-time --backpressure
>
> To try this, you would modify the docker-compose.yaml file in [1]. If you
> want to see how it is implemented, see [2].
>
> You can not use this --backpressure option with any other application.
>
> [1]
> https://github.com/apache/flink-playgrounds/blob/release-1.11/operations-playground/docker-compose.yaml
> [2]
> https://github.com/apache/flink-playgrounds/blob/release-1.11/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java
>
>
> On Sat, Oct 10, 2020 at 7:26 AM 大森林  wrote:
>
>> Thanks for both of your help...
>> but...
>>
>> I can not understand both:
>>
>> 
>> Dear David Anderson:
>> Is the whole command like this?
>> flink run *--backpressure*  -c wordcount_increstate
>> datastream_api-1.0-SNAPSHOT.jar
>>
>>
>> 
>> Dear Arvid Heise:
>> For conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
>> will this settings work to sleep when the output stream is generating?
>>
>>
>> 
>> apologise for my poor basic knowledge of flink~
>> Thanks for both of your help~
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "David Anderson" ;
>> *发送时间:* 2020年10月9日(星期五) 晚上8:23
>> *收件人:* "Arvid Heise";
>> *抄送:* "大森林";"user";
>> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>>
>> The Flink Operations Playground includes an optional backpressure
>> simulation you can experiment with. It is described at the end of [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/flink-operations-playground.html#variants
>>
>>
>> On Fri, Oct 9, 2020 at 10:02 AM Arvid Heise  wrote:
>>
>>> You can add
>>>
>>> conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
>>>
>>> at any place before creating the environment [1]. Default value is 8081,
>>> so you can access web ui through http://localhost:8081, but you can
>>> really choose any other free port.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
>>> On Fri, Oct 9, 2020 at 9:24 AM 大森林  wrote:
>>>
  Thanks for your repies,
 could you tell me where to set RestOption.POPT?in configuration
 what's the value should I set for RestOption.PORT?

 Thanks.


 -- 原始邮件 --
 *发件人:* "Arvid Heise" ;
 *发送时间:* 2020年10月9日(星期五) 下午3:00
 *收件人:* "大森林";
 *抄送:* "David ,Anderson";"user"<
 user@flink.apache.org>;
 *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!

 The easiest way to see backpressure is to add some sleep to your sink,
 check [1] for an example.
 If you execute that unit test with a RestOption.PORT set in the
 configuration, you can even load the Web UI and watch the backpressure
 accumulate and finally go away at the end of the test.

 [1]
 https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536

 On Tue, Oct 6, 2020 at 4:13 PM 大森林  wrote:

>
> I want to learn the concept "back pressure".
> but I can not find the datastream generator example to generate a lot
> of data.
>
> besides,
> is there any example on how to simulate the scene "back pressure"  in
> WEB UI?
>
> Thanks for your help~
>


 --

 Arvid Heise | Senior Java Developer

 

 Follow us @VervericaData

 --

 Join Flink Forward  - The Apache Flink
 Conference

 Stream Processing | Event Driven | Real Time

 --

 Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, German

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-10 Thread Dan Hill
No, thanks!  I used JobClient to getJobStatus and sleep if it was not
terminal.  I'll switch to this.


On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek 
wrote:

> Hi Dan,
>
> did you try using the JobClient you can get from the TableResult to wait
> for job completion? You can get a CompletableFuture for the JobResult
> which should help you.
>
> Best,
> Aljoscha
>
> On 08.10.20 23:55, Dan Hill wrote:
> > I figured out the issue.  The join caused part of the job's execution to
> be
> > delayed.  I added my own hacky wait condition into the test to make sure
> > the join job finishes first and it's fine.
> >
> > What common test utilities exist for Flink?  I found
> > flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
> > for jobs to finish.  I'm guessing this can be done with one of the other
> > utilities.
> >
> > Are there any open source test examples?
> >
> > How are watermarks usually sent with Table API in tests?
> >
> > After I collect some answers, I'm fine updating the Flink testing page.
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
> >
> > On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
> > austin.caw...@gmail.com> wrote:
> >
> >> Can't comment on the SQL issues, but here's our exact setup for Bazel
> and
> >> Junit5 w/ the resource files approach:
> >>
> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit
> >>
> >> Best,
> >> Austin
> >>
> >> On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:
> >>
> >>> I was able to get finer grained logs showing.  I switched from
> >>> -Dlog4j.configuration to -Dlog4j.configurationFile and it worked.
> With my
> >>> larger test case, I was hitting a silent log4j error.  When I created a
> >>> small test case to just test logging, I received a log4j error.
> >>>
> >>> Here is a tar
> >>> <
> https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing
> >
> >>> with the info logs for:
> >>> - (test-nojoin.log) this one works as expected
> >>> - (test-join.log) this does not work as expected
> >>>
> >>> I don't see an obvious issue just by scanning the logs.  I'll take a
> >>> deeper in 9 hours.
> >>>
> >>>
> >>>
> >>>
> >>> On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:
> >>>
>  Switching to junit4 did not help.
> 
>  If I make a request to the url returned from
> 
> MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
>  I get
>  {"errors":["Not found."]}.  I'm not sure if this is intentional.
> 
> 
> 
> 
>  On Tue, Oct 6, 2020 at 4:16 PM Dan Hill 
> wrote:
> 
> > @Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
> > reference.  However, the actual log calls are not printing to the
> console.
> > Only errors appear in my terminal window and the test logs.  Maybe
> console
> > logger does not work for this junit setup.  I'll see if the file
> version
> > works.
> >
> > On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
> > austin.caw...@gmail.com> wrote:
> >
> >> What Aljoscha suggested is what works for us!
> >>
> >> On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek <
> aljos...@apache.org>
> >> wrote:
> >>
> >>> Hi Dan,
> >>>
> >>> to make the log properties file work this should do it: assuming
> the
> >>> log4j.properties is in //src/main/resources. You will need a
> >>> BUILD.bazel
> >>> in that directory that has only the line
> >>> "exports_files(["log4j.properties"]). Then you can reference it in
> >>> your
> >>> test via "resources = ["//src/main/resources:log4j.properties"],".
> Of
> >>> course you also need to have the right log4j deps (or slf4j if
> you're
> >>> using that)
> >>>
> >>> Hope that helps!
> >>>
> >>> Aljoscha
> >>>
> >>> On 07.10.20 00:41, Dan Hill wrote:
>  I'm trying to use Table API for my job.  I'll soon try to get a
> test
>  working for my stream job.
>  - I'll parameterize so I can have different sources and sink for
> >>> tests.
>  How should I mock out a Kafka source?  For my test, I was planning
> >>> on
>  changing the input to be from a temp file (instead of Kafka).
>  - What's a good way of forcing a watermark using the Table API?
> 
> 
>  On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 
> >>> wrote:
> 
> > Thanks!
> >
> > Great to know.  I copied this junit5-jupiter-starter-bazel
> > <
> >>>
> https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel
> >
> >>> rule
> > into my repository (I don't think junit5 is supported directly
> with
> > java_test yet).  I tried a few ways of bundling
> `log4j.properties`
> >>> into the
> > jar and didn't get them to work.  My current iteration

[PyFlink] update udf functions on the fly

2020-10-10 Thread Sharipov, Rinat
Hi mates !

I'm in the beginning of the road of building a recommendation pipeline on
top of Flink.
I'm going to register a list of UDF python functions on job
startups where each UDF is an ML model.

Over time new model versions appear in the ML registry and I would like to
update my UDF functions on the fly without need to restart the whole job.
Could you tell me, whether it's possible or not ? Maybe the community can
give advice on how such tasks can be solved using Flink and what other
approaches exist.

Thanks a lot for your help and advice !


?????? how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-10 Thread ??????
Could I use your command with no docker?


--  --
??: 
   "David Anderson" 
   
https://github.com/apache/flink-playgrounds/blob/release-1.11/operations-playground/docker-compose.yaml
[2] https://github.com/apache/flink-playgrounds/blob/release-1.11/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java




On Sat, Oct 10, 2020 at 7:26 AM ?? https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/flink-operations-playground.html#variants






On Fri, Oct 9, 2020 at 10:02 AM Arvid Heise http://localhost:8081, but you can really choose any 
other free port.



[1] 
https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
On Fri, Oct 9, 2020 at 9:24 AM ?? https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536


On Tue, Oct 6, 2020 at 4:13 PM ?? 

Re: how to simulate the scene "back pressure" in flink?Thanks~!

2020-10-10 Thread David Anderson
The ClickCountJob used in the operations playground accepts an application
parameter, like this:

flink run -d /opt/ClickCountJob.jar --bootstrap.servers kafka:9092
--checkpointing --event-time --backpressure

To try this, you would modify the docker-compose.yaml file in [1]. If you
want to see how it is implemented, see [2].

You can not use this --backpressure option with any other application.

[1]
https://github.com/apache/flink-playgrounds/blob/release-1.11/operations-playground/docker-compose.yaml
[2]
https://github.com/apache/flink-playgrounds/blob/release-1.11/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/BackpressureMap.java


On Sat, Oct 10, 2020 at 7:26 AM 大森林  wrote:

> Thanks for both of your help...
> but...
>
> I can not understand both:
>
> 
> Dear David Anderson:
> Is the whole command like this?
> flink run *--backpressure*  -c wordcount_increstate
> datastream_api-1.0-SNAPSHOT.jar
>
>
> 
> Dear Arvid Heise:
> For conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
> will this settings work to sleep when the output stream is generating?
>
>
> 
> apologise for my poor basic knowledge of flink~
> Thanks for both of your help~
>
>
>
> -- 原始邮件 --
> *发件人:* "David Anderson" ;
> *发送时间:* 2020年10月9日(星期五) 晚上8:23
> *收件人:* "Arvid Heise";
> *抄送:* "大森林";"user";
> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>
> The Flink Operations Playground includes an optional backpressure
> simulation you can experiment with. It is described at the end of [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/flink-operations-playground.html#variants
>
>
> On Fri, Oct 9, 2020 at 10:02 AM Arvid Heise  wrote:
>
>> You can add
>>
>> conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
>>
>> at any place before creating the environment [1]. Default value is 8081,
>> so you can access web ui through http://localhost:8081, but you can
>> really choose any other free port.
>>
>> [1]
>> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L198
>> On Fri, Oct 9, 2020 at 9:24 AM 大森林  wrote:
>>
>>>  Thanks for your repies,
>>> could you tell me where to set RestOption.POPT?in configuration
>>> what's the value should I set for RestOption.PORT?
>>>
>>> Thanks.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Arvid Heise" ;
>>> *发送时间:* 2020年10月9日(星期五) 下午3:00
>>> *收件人:* "大森林";
>>> *抄送:* "David ,Anderson";"user"<
>>> user@flink.apache.org>;
>>> *主题:* Re: how to simulate the scene "back pressure" in flink?Thanks~!
>>>
>>> The easiest way to see backpressure is to add some sleep to your sink,
>>> check [1] for an example.
>>> If you execute that unit test with a RestOption.PORT set in the
>>> configuration, you can even load the Web UI and watch the backpressure
>>> accumulate and finally go away at the end of the test.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/61a997364b020b44bd26df76208e76106c6390b5/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java#L536
>>>
>>> On Tue, Oct 6, 2020 at 4:13 PM 大森林  wrote:
>>>

 I want to learn the concept "back pressure".
 but I can not find the datastream generator example to generate a lot
 of data.

 besides,
 is there any example on how to simulate the scene "back pressure"  in
 WEB UI?

 Thanks for your help~

>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-10 Thread Aljoscha Krettek

Hi Dan,

did you try using the JobClient you can get from the TableResult to wait 
for job completion? You can get a CompletableFuture for the JobResult 
which should help you.


Best,
Aljoscha

On 08.10.20 23:55, Dan Hill wrote:

I figured out the issue.  The join caused part of the job's execution to be
delayed.  I added my own hacky wait condition into the test to make sure
the join job finishes first and it's fine.

What common test utilities exist for Flink?  I found
flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
for jobs to finish.  I'm guessing this can be done with one of the other
utilities.

Are there any open source test examples?

How are watermarks usually sent with Table API in tests?

After I collect some answers, I'm fine updating the Flink testing page.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs

On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Can't comment on the SQL issues, but here's our exact setup for Bazel and
Junit5 w/ the resource files approach:
https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:


I was able to get finer grained logs showing.  I switched from
-Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
larger test case, I was hitting a silent log4j error.  When I created a
small test case to just test logging, I received a log4j error.

Here is a tar

with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a
deeper in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:


Switching to junit4 did not help.

If I make a request to the url returned from
MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
I get
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:


@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
reference.  However, the actual log calls are not printing to the console.
Only errors appear in my terminal window and the test logs.  Maybe console
logger does not work for this junit setup.  I'll see if the file version
works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
wrote:


Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a
BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in
your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:

I'm trying to use Table API for my job.  I'll soon try to get a test
working for my stream job.
- I'll parameterize so I can have different sources and sink for

tests.

How should I mock out a Kafka source?  For my test, I was planning

on

changing the input to be from a temp file (instead of Kafka).
- What's a good way of forcing a watermark using the Table API?


On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 

wrote:



Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel
<

https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
rule

into my repository (I don't think junit5 is supported directly with
java_test yet).  I tried a few ways of bundling `log4j.properties`

into the

jar and didn't get them to work.  My current iteration hacks the
log4j.properties file as an absolute path.  My failed attempts

would spit

an error saying log4j.properties file was not found.  This route

finds it

but the log properties are not used for the java logger.

Are there a better set of rules to use for junit5?

# build rule
java_junit5_test(
  name = "tests",
  srcs = glob(["*.java"]),
  test_package = "ai.promoted.logprocessor.batch",
  deps = [...],
  jvm_flags =


["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],

)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = LogToConsole

On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Oops, this is actually the JOIN issu

Re: checkpoint fail

2020-10-10 Thread Yun Tang
Hi Song

Flink-1.4.2 is a bit too old, and I think this error is caused by FLINK-8876 
[1][2] which should be fixed after Flink-1.5, please consider to upgrade Flink 
version.

[1] https://issues.apache.org/jira/browse/FLINK-8876
[2] https://issues.apache.org/jira/browse/FLINK-8836


Best
Yun Tang

From: Song Wu 
Sent: Saturday, October 10, 2020 11:03
To: user 
Subject: checkpoint fail

Summary
I'm hitting an error when running a  job, it happens several times, and I dont 
know why.

Any help would be appreciated.  Thanks!

Details


flink version: 1.4.2-1700


java.lang.Exception: Could not complete snapshot 158 for operator 
asyncio_by_transform -> flatmap_by_action_list_flat -> 
order_source_kafka_sink-preprocessing (3/10).
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:370)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1285)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1223)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:707)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:622)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:575)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:217)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:198)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:107)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:48)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.(DefaultOperatorStateBackend.java:453)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:465)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:355)