async io parallelism

2020-02-21 Thread Alexey Trenikhun
Hello,
Let's say, my elements are simple key-value pairs, elements are coming from 
Kafka, where they were partitioned by "key", then I do processing using 
KeyedProcessFunction (keyed by same "key"), then I enrich elements using 
ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed 
by same "key") and then write to Kafka topic, again partitioned by same "key", 
something like this:

FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> 
AsyncDataStream.orderedWait() -> 
keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer

Will it preserve order of events with same "key"?

  *   Will Output function receive elements with same "key" in same order as 
they were originally in Kafka?
  *   Will FlinkKafkaProducer writes elements with same "key" in same order as 
they were originally in Kafka?
  *   Does it depend on parallelism of async IO? Documentation says "the stream 
order is preserved", but if there are multiple parallel instances of async 
function, does it mean order relative to each single instance? Or total stream 
order?

Thanks,
Alexey


StreamingFileSink Not Flushing All Data

2020-02-21 Thread Austin Cawley-Edwards
Hi there,

Using Flink 1.9.1, trying to write .tgz files with the
StreamingFileSink#BulkWriter. It seems like flushing the output stream
doesn't flush all the data written. I've verified I can create valid files
using the same APIs and data on there own, so thinking it must be something
I'm doing wrong with the bulk format. I'm writing to the local filesystem,
with the `file://` protocol.

For Tar/ Gzipping, I'm using the Apache Commons Compression library,
version 1.20.

Here's a runnable example of the issue:

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;

class Scratch {
  public static class Record implements Serializable {
private static final long serialVersionUID = 1L;

String id;

public Record() {}

public Record(String id) {
  this.id = id;
}

public String getId() {
  return id;
}

public void setId(String id) {
  this.id = id;
}
  }

  public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

TarArchiveOutputStream taos = new TarArchiveOutputStream(new
GzipCompressorOutputStream(new
FileOutputStream("/home/austin/Downloads/test.tgz")));
TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", "test"));
String fullText = "hey\nyou\nwork";
byte[] fullTextData = fullText.getBytes();
fileEntry.setSize(fullTextData.length);
taos.putArchiveEntry(fileEntry);
taos.write(fullTextData, 0, fullTextData.length);
taos.closeArchiveEntry();
taos.flush();
taos.close();

StreamingFileSink textSink = StreamingFileSink
.forBulkFormat(new Path("file:///home/austin/Downloads/text-output"),
new BulkWriter.Factory() {
  @Override
  public BulkWriter create(FSDataOutputStream out)
throws IOException {
final TarArchiveOutputStream compressedOutputStream =
new TarArchiveOutputStream(new GzipCompressorOutputStream(out));

return new BulkWriter() {
  @Override
  public void addElement(Record record) throws IOException {
TarArchiveEntry fileEntry = new
TarArchiveEntry(String.format("%s.txt", record.id));
byte[] fullTextData =
"hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8);
fileEntry.setSize(fullTextData.length);
compressedOutputStream.putArchiveEntry(fileEntry);
compressedOutputStream.write(fullTextData, 0,
fullTextData.length);
compressedOutputStream.closeArchiveEntry();
  }

  @Override
  public void flush() throws IOException {
compressedOutputStream.flush();
  }

  @Override
  public void finish() throws IOException {
this.flush();
  }
};
  }
})
.withBucketCheckInterval(1000)
.build();

env
.fromElements(new Record("1"), new Record("2"))
.addSink(textSink)
.name("Streaming File Sink")
.uid("streaming-file-sink");
env.execute("streaming file sink test");
  }
}


>From the stat/ hex dumps, you can see that the first bits are there, but
are then cut off:

~/Downloads » stat test.tgz
  File: test.tgz
  Size: 114   Blocks: 8  IO Block: 4096   regular file
Device: 801h/2049d Inode: 30041077Links: 1
Access: (0664/-rw-rw-r--)  Uid: ( 1000/  austin)   Gid: ( 1000/  austin)
Access: 2020-02-21 19:30:06.009028283 -0500
Modify: 2020-02-21 19:30:44.509424406 -0500
Change: 2020-02-21 19:30:44.509424406 -0500
 Birth: -

~/Downloads » tar -tvf test.tgz
-rw-r--r-- 0/0  12 2020-02-21 19:35 test.txt

~/Downloads » hd test.tgz
  1f 8b 08 00 00 00 00 00  00 ff ed cf 31 0e 80 20
 |1.. |
0010  0c 85 61 66 4f c1 09 cc  2b 14 3c 8f 83 89 89 03
 |..afO...+.<.|
0020  09 94 a8 b7 77 30 2e ae  8a 2e fd 96 37 f6 af 4c
 |w0..7..L|
0030  45 7a d9 c4 34 04 02 22  b3 c5 e9 be 00 b1 25 1f
 |Ez..4.."..%.|
0040  1d 63 f0 81 82 05 91 77  d1 58 b4 8c ba d4 22 63
 |.c.w.X"c|
0050  36 78 7c eb fe dc 0b 69  5f 98 a7 bd db 53 ed d6
 |6x|i_S..|
0060  94 97 bf 5b 94 52 4a 7d  

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-21 Thread Robert Metzger
Hey,
you are right. I'm also seeing this exception now. It was hidden in other
log output.

The solution to all this confusion is simple: DataStreamUtils.collect() Is
like an execute().

The stream graph is cleared on each execute(). That's why collect() and
then execute() lead to the "no operators defined" error.
However, if you have collect(), print(), execute(), then the print() is
filling the stream graph again, and you are executing two Flink jobs: the
collect job and the execute job.

I hope I got it right this time :)

Best,
Robert

On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes  wrote:

> I tried this in Flink 1.10.0 :
>
> @Test
> public void experimentalTest() throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream input = env.fromElements("One", "Two");
> //DataStream input = env.addSource(new 
> StringSourceFunction());
> List result = new ArrayList<>(5);
> DataStreamUtils.collect(input).forEachRemaining(result::add);
> env.execute("Flink Streaming Java API Skeleton");
> }
>
>
> Results in
>
>
> java.lang.IllegalStateException: No operators defined in streaming topology. 
> Cannot execute.
>
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>   at 
> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> ...
>
>
>
> On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger 
> wrote:
>
>> Hey Niels,
>>
>> This minimal Flink job executes in Flink 1.10:
>>
>> public static void main(String[] args) throws Exception {
>>final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>DataStream input = env.addSource(new StringSourceFunction());
>>List result = new ArrayList<>(5);
>>DataStreamUtils.collect(input).forEachRemaining(result::add);
>>env.execute("Flink Streaming Java API Skeleton");
>> }
>>
>> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
>> that breaks with the StreamGraphGenerator?
>>
>> Best,
>> Robert
>>
>> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes  wrote:
>>
>>> Hi Gordon,
>>>
>>> Thanks. This works for me.
>>>
>>> I find it strange that when I do this it works (I made the differences
>>> bold)
>>>
>>> List result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> *resultDataStream.print();*
>>>
>>> environment.execute();
>>>
>>>
>>> how ever this does not work
>>>
>>> List result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> environment.execute();
>>>
>>>
>>> and this also does not work
>>>
>>> *resultDataStream.print();*
>>>
>>> List result = new ArrayList<>(5);
>>>
>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>
>>> environment.execute();
>>>
>>>
>>> In both these cases it fails with
>>>
>>>
>>> java.lang.IllegalStateException: *No operators defined in streaming
>>> topology. Cannot execute.*
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
>>> at
>>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>>>
>>>
>>>
>>> Did I do something wrong?
>>> Is this a bug in the DataStreamUtils ?
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai  wrote:
>>>
 Hi,

 To collect the elements of a DataStream (usually only meant for testing
 purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

 Cheers,
 Gordon



 --
 Sent 

Re: JDBC source running continuously

2020-02-21 Thread Fanbin Bu
Jark,

Thank you for the reply.
By running continuously, I meant the source operator does not finish after
all the data is read. Similar to ContinuousFileMonitoringFunction, i'm
thinking of a continuously database monitoring function.  The reason for
doing this is to  enable savepoint for my pipeline (savepoint does not work
for finished operators).

The following code shows that the format will close once it reads all data:

while (isRunning && !format.reachedEnd()) {
   nextElement = format.nextRecord(nextElement);
   if (nextElement != null) {
  ctx.collect(nextElement);
   } else {
  break;
   }
}
format.close();
completedSplitsCounter.inc();

if (isRunning) {
   isRunning = splitIterator.hasNext();
}

Is there any way to keep the operator running but not reading any data and
also enable proper savepoint?

Thanks,
Fanbin



On Fri, Feb 21, 2020 at 12:32 AM Jark Wu  wrote:

> Hi Fanbin,
>
> .iterate() is not available on Table API, it's an API of DataStream.
> Currently, the JDBC source is a bounded source (a snapshot of table at the
> execution time), so the job will finish when it processes all the data.
>
> Regarding to your requirement, "running continuously with JDBC source", we
> should make it clear what do you want the source to read after the full
> snapshot:
> 1) read a full snapshot again
> 2) read new inserted rows
> 3) read new inserted rows and updated rows and deleted rows.
>
> For (1), you can create your own jdbc input format based on
> JDBCInputFormat, trying to re-execute the SQL query while reading the last
> row from DB in nextRecord. (this is the answer in the stackoverflow [1]).
> For (2), in the nextRecord(), you need to execute a SQL query with a
> filter to fetch rows which are greater than the last max ID or max created
> time.
> For (3), this is a changelog support, which will be supported natively in
> 1.11 in Flink SQL.
>
> Best,
> Jark
>
>
> On Fri, 21 Feb 2020 at 02:35, Fanbin Bu  wrote:
>
>>
>> https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server
>>
>> On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler 
>> wrote:
>>
>>> Can you show us where you found the suggestion to use iterate()?
>>>
>>> On 20/02/2020 02:08, Fanbin Bu wrote:
>>> > Hi,
>>> >
>>> > My app creates the source from JDBC inputformat and running some sql
>>> > and print out. But the source terminates itself after the query is
>>> > done. Is there anyway to keep the source running?
>>> > samle code:
>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> > val settings = EnvironmentSettings.newInstance()
>>> >   .useBlinkPlanner()
>>> >   .inStreamingMode()
>>> >   .build()
>>> > val tEnv = StreamTableEnvironment.create(env, settings)
>>> > val inputFormat
>>> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from
>>> > table")... .finish()
>>> > val source = env.createInput(inputFormat)
>>> > tEnv.registerTableSource(source)
>>> > val queryResult = tEnv.sqlQuery("select * from awesomeSource")
>>> > queryResult.insertInto(mySink)
>>> >
>>> >
>>> > I searched around and its suggested to use .iterate(). can somebody
>>> > give more examples on how to use it in this case?
>>> >
>>> > Thanks,
>>> > Fanbin
>>>
>>>
>>>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Fabian Hueske
Congrats Jingsong!

Cheers, Fabian

Am Fr., 21. Feb. 2020 um 17:49 Uhr schrieb Rong Rong :

> Congratulations Jingsong!!
>
> Cheers,
> Rong
>
> On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
>
> > Congrats, Jingsong!
> >
> > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
> > wrote:
> >
> >> Congratulations Jingsong!
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
> >>
> >>>   Congratulations Jingsong!
> >>>
> >>>Best,
> >>>Yun
> >>>
> >>> --
> >>> From:Jingsong Li 
> >>> Send Time:2020 Feb. 21 (Fri.) 21:42
> >>> To:Hequn Cheng 
> >>> Cc:Yang Wang ; Zhijiang <
> >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
> >>> he ; dev ; user <
> >>> user@flink.apache.org>
> >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> >>>
> >>> Thanks everyone~
> >>>
> >>> It's my pleasure to be part of the community. I hope I can make a
> better
> >>> contribution in future.
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
> >>> Congratulations Jingsong! Well deserved.
> >>>
> >>> Best,
> >>> Hequn
> >>>
> >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
> wrote:
> >>> Congratulations!Jingsong. Well deserved.
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>> Zhijiang  于2020年2月21日周五 下午1:18写道:
> >>> Congrats Jingsong! Welcome on board!
> >>>
> >>> Best,
> >>> Zhijiang
> >>>
> >>> --
> >>> From:Zhenghua Gao 
> >>> Send Time:2020 Feb. 21 (Fri.) 12:49
> >>> To:godfrey he 
> >>> Cc:dev ; user 
> >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> >>>
> >>> Congrats Jingsong!
> >>>
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
> wrote:
> >>> Congrats Jingsong! Well deserved.
> >>>
> >>> Best,
> >>> godfrey
> >>>
> >>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
> >>> Congratulations!Jingsong. You deserve it
> >>>
> >>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
> >>> Congrats Jingsong!
> >>>
> >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
> >>>
> >>> > Congrats Jingsong!
> >>> >
> >>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
> >>> > >
> >>> > > Congratulations Jingsong! Well deserved.
> >>> > >
> >>> > > Best,
> >>> > > Jark
> >>> > >
> >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
> >>> > >
> >>> > >> Congratulations! Jingsong
> >>> > >>
> >>> > >>
> >>> > >> Best,
> >>> > >> Dan Zou
> >>> > >>
> >>> >
> >>> >
> >>>
> >>>
> >>> --
> >>> Best Regards
> >>>
> >>> Jeff Zhang
> >>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>>
> >>>
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Peter Huang
Congrats Jingsong!


On Fri, Feb 21, 2020 at 8:49 AM Rong Rong  wrote:

> Congratulations Jingsong!!
>
> Cheers,
> Rong
>
> On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
>
>> Congrats, Jingsong!
>>
>> On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
>> wrote:
>>
>>> Congratulations Jingsong!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>>>
   Congratulations Jingsong!

Best,
Yun

 --
 From:Jingsong Li 
 Send Time:2020 Feb. 21 (Fri.) 21:42
 To:Hequn Cheng 
 Cc:Yang Wang ; Zhijiang <
 wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
 he ; dev ; user <
 user@flink.apache.org>
 Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

 Thanks everyone~

 It's my pleasure to be part of the community. I hope I can make a
 better contribution in future.

 Best,
 Jingsong Lee

 On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
 Congratulations Jingsong! Well deserved.

 Best,
 Hequn

 On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
 wrote:
 Congratulations!Jingsong. Well deserved.


 Best,
 Yang

 Zhijiang  于2020年2月21日周五 下午1:18写道:
 Congrats Jingsong! Welcome on board!

 Best,
 Zhijiang

 --
 From:Zhenghua Gao 
 Send Time:2020 Feb. 21 (Fri.) 12:49
 To:godfrey he 
 Cc:dev ; user 
 Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

 Congrats Jingsong!


 *Best Regards,*
 *Zhenghua Gao*


 On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
 wrote:
 Congrats Jingsong! Well deserved.

 Best,
 godfrey

 Jeff Zhang  于2020年2月21日周五 上午11:49写道:
 Congratulations!Jingsong. You deserve it

 wenlong.lwl  于2020年2月21日周五 上午11:43写道:
 Congrats Jingsong!

 On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:

 > Congrats Jingsong!
 >
 > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
 > >
 > > Congratulations Jingsong! Well deserved.
 > >
 > > Best,
 > > Jark
 > >
 > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
 > >
 > >> Congratulations! Jingsong
 > >>
 > >>
 > >> Best,
 > >> Dan Zou
 > >>
 >
 >


 --
 Best Regards

 Jeff Zhang



 --
 Best, Jingsong Lee





Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Rong Rong
Congratulations Jingsong!!

Cheers,
Rong

On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:

> Congrats, Jingsong!
>
> On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
> wrote:
>
>> Congratulations Jingsong!
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>>
>>>   Congratulations Jingsong!
>>>
>>>Best,
>>>Yun
>>>
>>> --
>>> From:Jingsong Li 
>>> Send Time:2020 Feb. 21 (Fri.) 21:42
>>> To:Hequn Cheng 
>>> Cc:Yang Wang ; Zhijiang <
>>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
>>> he ; dev ; user <
>>> user@flink.apache.org>
>>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>>
>>> Thanks everyone~
>>>
>>> It's my pleasure to be part of the community. I hope I can make a better
>>> contribution in future.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
>>> Congratulations Jingsong! Well deserved.
>>>
>>> Best,
>>> Hequn
>>>
>>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
>>> Congratulations!Jingsong. Well deserved.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>>> Congrats Jingsong! Welcome on board!
>>>
>>> Best,
>>> Zhijiang
>>>
>>> --
>>> From:Zhenghua Gao 
>>> Send Time:2020 Feb. 21 (Fri.) 12:49
>>> To:godfrey he 
>>> Cc:dev ; user 
>>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>>
>>> Congrats Jingsong!
>>>
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>>> Congrats Jingsong! Well deserved.
>>>
>>> Best,
>>> godfrey
>>>
>>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>>> Congratulations!Jingsong. You deserve it
>>>
>>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>>> Congrats Jingsong!
>>>
>>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>>
>>> > Congrats Jingsong!
>>> >
>>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>>> > >
>>> > > Congratulations Jingsong! Well deserved.
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>>> > >
>>> > >> Congratulations! Jingsong
>>> > >>
>>> > >>
>>> > >> Best,
>>> > >> Dan Zou
>>> > >>
>>> >
>>> >
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>>
>>>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Bowen Li
Congrats, Jingsong!

On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann  wrote:

> Congratulations Jingsong!
>
> Cheers,
> Till
>
> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>
>>   Congratulations Jingsong!
>>
>>Best,
>>Yun
>>
>> --
>> From:Jingsong Li 
>> Send Time:2020 Feb. 21 (Fri.) 21:42
>> To:Hequn Cheng 
>> Cc:Yang Wang ; Zhijiang <
>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey he
>> ; dev ; user <
>> user@flink.apache.org>
>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>
>> Thanks everyone~
>>
>> It's my pleasure to be part of the community. I hope I can make a better
>> contribution in future.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
>> Congratulations Jingsong! Well deserved.
>>
>> Best,
>> Hequn
>>
>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
>> Congratulations!Jingsong. Well deserved.
>>
>>
>> Best,
>> Yang
>>
>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>> Congrats Jingsong! Welcome on board!
>>
>> Best,
>> Zhijiang
>>
>> --
>> From:Zhenghua Gao 
>> Send Time:2020 Feb. 21 (Fri.) 12:49
>> To:godfrey he 
>> Cc:dev ; user 
>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>
>> Congrats Jingsong!
>>
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>> Congrats Jingsong! Well deserved.
>>
>> Best,
>> godfrey
>>
>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>> Congratulations!Jingsong. You deserve it
>>
>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>> Congrats Jingsong!
>>
>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>
>> > Congrats Jingsong!
>> >
>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>> > >
>> > > Congratulations Jingsong! Well deserved.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>> > >
>> > >> Congratulations! Jingsong
>> > >>
>> > >>
>> > >> Best,
>> > >> Dan Zou
>> > >>
>> >
>> >
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>>
>>


Re: FlinkCEP questions - architecture

2020-02-21 Thread Oytun Tez
Amazing content, thanks for asking and answering.

On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag <
juergen.donners...@gmail.com> wrote:

> thanks a lot
> Juergen
>
> On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas 
> wrote:
>
>> Hi Juergen,
>>
>> I will reply to your questions inline. As a general comment I would
>> suggest to also have a look at [3] so that you have an idea of some of
>> the alternatives.
>> With that said, here come the answers :)
>>
>> 1) We receive files every day, which are exports from some database
>> tables, containing ONLY changes from the day. Most tables have
>> modify-cols. Even though they are files but because they contain
>> changes only, I belief the file records shall be considered events in
>> Flink terminology. Is that assumption correct?
>>
>> -> Yes. I think your assumption is correct.
>>
>> 2) The records within the DB export files are NOT in chronologically,
>> and we can not change the export. Our use case is a "complex event
>> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
>> A, then B, then C within 30 days, then do something". Does that work
>> with FlinkCEP despite the events/records are not in chrono order
>> within the file? The files are 100MB to 20GB in size. Do I need to
>> sort the files first before CEP processing?
>>
>> -> Flink CEP also works in event time and the re-ordering can be done by
>> Flink
>>
>> 3) Occassionally some crazy people manually "correct" DB records
>> within the database and manually trigger a re-export of ALL of the
>> changes for that respective day (e.g. last weeks Tuesday).
>> Consequently we receive a correction file. Same filename but "_1"
>> appended. All filenames include the date (of the original export).
>> What are the options to handle that case (besides telling the DB
>> admins not to, which we did already). Regular checkpoints and
>> re-process all files since then?  What happens to the CEP state? Will
>> it be checkpointed as well?
>>
>> -> If you require re-processing, then I would say that your best
>> option is what you described. The other option would be to keep
>> everything in Flink state until you are sure that no more corrections
>> will come. In this case, you have to somehow issue the "correction" in
>> a way that the downstream system can understand what to correct and
>> how. Keep in mind that this may be an expensive operation because
>> everything has to be kept in state for longer.
>>
>> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>>
>> -> The only thing to consider is the size of your state. Time is not
>> necessarily an issue. If your state for these 180 days is a couple of
>> MBs, then you have no problem. If it increases fast, then you have to
>> provision your cluster accordingly.
>>
>> 5) We also have CEP rules that must fire if after a start sequence
>> matched, the remaining sequence did NOT within a configured window.
>> E.g. If A, then B, but C did not occur within 30 days since A. Is that
>> supported by FlinkCEP? I couldn't find a working example.
>>
>> -> You can have a look at [1] for the supported pattern combinations
>> and you can also look at [2] for some tests of different pattern
>> combinations.
>>
>> 6) We expect 30-40 CEP rules. How can we estimate the required storage
>> size for the temporary CEP state? Is there some sort of formular
>> considering number of rules, number of records per file or day, record
>> size, window, number of records matched per sequence, number of keyBy
>> grouping keys, ...
>>
>> -> In FlinkCEP, each pattern becomes a single operator. This means
>> that you will have 30-40 operators in your job graph, each with each
>> own state. This can become heavy but once again it depends on your
>> workload. I cannot give an estimate because in CEP, in order to
>> guarantee correct ordering of events in an unordered stream, the
>> library sometimes has to keep also in state more records than will be
>> presented at the end.
>>
>> Have you considered going with a solution based on processfunction and
>> broadcast state? This will also allow you to have a more dynamic
>> set-up where patterns can be added at runtime and it will allow you to
>> do any optimizations specific to your workload ;) For a discussion on
>> this, check [3]. In addition, it will allow you to "multiplex" many
>> patterns into a single operator thus potentially minimizing the amount
>> of copies of the state you keep.
>>
>> 7) I can imagine that for debugging reasons it'd be good if we were
>> able to query the temporary CEP state. What is the (CEP) schema used
>> to persist the CEP state and how can we query it? And does such query
>> work on the whole cluster or only per node (e.g. because of shuffle
>> and nodes responsible only for a portion of the events).
>>
>> -> Unfortunatelly the state in CEP is not queryable, thus I am not
>> sure if you can inspect it at runtime.
>>
>> 8) I understand state is stored per node. What happens if 

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-21 Thread Niels Basjes
I tried this in Flink 1.10.0 :

@Test
public void experimentalTest() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream input = env.fromElements("One", "Two");
//DataStream input = env.addSource(new StringSourceFunction());
List result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}


Results in


java.lang.IllegalStateException: No operators defined in streaming
topology. Cannot execute.

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

...



On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger  wrote:

> Hey Niels,
>
> This minimal Flink job executes in Flink 1.10:
>
> public static void main(String[] args) throws Exception {
>final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>DataStream input = env.addSource(new StringSourceFunction());
>List result = new ArrayList<>(5);
>DataStreamUtils.collect(input).forEachRemaining(result::add);
>env.execute("Flink Streaming Java API Skeleton");
> }
>
> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
> that breaks with the StreamGraphGenerator?
>
> Best,
> Robert
>
> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes  wrote:
>
>> Hi Gordon,
>>
>> Thanks. This works for me.
>>
>> I find it strange that when I do this it works (I made the differences
>> bold)
>>
>> List result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> *resultDataStream.print();*
>>
>> environment.execute();
>>
>>
>> how ever this does not work
>>
>> List result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> environment.execute();
>>
>>
>> and this also does not work
>>
>> *resultDataStream.print();*
>>
>> List result = new ArrayList<>(5);
>>
>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>
>> environment.execute();
>>
>>
>> In both these cases it fails with
>>
>>
>> java.lang.IllegalStateException: *No operators defined in streaming
>> topology. Cannot execute.*
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
>> at
>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>>
>>
>>
>> Did I do something wrong?
>> Is this a bug in the DataStreamUtils ?
>>
>> Niels Basjes
>>
>>
>>
>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai  wrote:
>>
>>> Hi,
>>>
>>> To collect the elements of a DataStream (usually only meant for testing
>>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2

2020-02-21 Thread Till Rohrmann
+1 for dropping savepoint compatibility with Flink 1.2.

Cheers,
Till

On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen  wrote:

> Thank you for the feedback.
>
> Here is the JIRA issue with some more explanation also about the
> background and implications:
> https://jira.apache.org/jira/browse/FLINK-16192
>
> Best,
> Stephan
>
>
> On Thu, Feb 20, 2020 at 2:26 PM vino yang  wrote:
>
>> +1 for dropping Savepoint compatibility with Flink 1.2
>>
>> Flink 1.2 is quite far away from the latest 1.10. Especially after the
>> release of Flink 1.9 and 1.10, the code and architecture have undergone
>> major changes.
>>
>> Currently, I am updating state migration tests for Flink 1.10. I can
>> still see some binary snapshot files of version 1.2. If we agree on this
>> topic, we may be able to alleviate some of the burdens(remove those binary
>> files) when the migration tests would be updated later.
>>
>> Best,
>> Vino
>>
>> Theo Diefenthal  于2020年2月20日周四
>> 下午9:04写道:
>>
>>> +1 for dropping compatibility.
>>>
>>> I personally think that it is very important for a project to keep a
>>> good pace in developing that old legacy stuff must be dropped from time to
>>> time. As long as there is an upgrade routine (via going to another flink
>>> release) that's fine.
>>>
>>> --
>>> *Von: *"Stephan Ewen" 
>>> *An: *"dev" , "user" 
>>> *Gesendet: *Donnerstag, 20. Februar 2020 11:11:43
>>> *Betreff: *[DISCUSS] Drop Savepoint Compatibility with Flink 1.2
>>>
>>> Hi all!
>>> For some cleanup and simplifications, it would be helpful to drop
>>> Savepoint compatibility with Flink version 1.2. That version was released
>>> almost three years ago.
>>>
>>> I would expect that no one uses that old version any more in a way that
>>> they actively want to upgrade directly to 1.11.
>>>
>>> Even if, there is still the way to first upgrade to another version
>>> (like 1.9) and then upgrade to 1.11 from there.
>>>
>>> Any concerns to drop that support?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> --
>>> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
>>> Theo Diefenthal
>>>
>>> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
>>> theo.diefent...@scoop-software.de - www.scoop-software.de
>>> Sitz der Gesellschaft: Köln, Handelsregister: Köln,
>>> Handelsregisternummer: HRB 36625
>>> Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
>>> Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel
>>>
>>


Re: Emit message at start and end of event time session window

2020-02-21 Thread Till Rohrmann
Hi Manas and Rafi,

you are right that when using merging windows as event time session windows
are, then Flink requires that any state the Trigger keeps is of type
MergingState. This constraint allows that the state can be merged whenever
two windows get merged.

Rafi, you are right. With the current implementation it might happen that
you send a wrong started window message. I think it depends on the
MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
your watermark. If you want to be on the safe side, then I would recommend
to use the ProcessFunction to implement the required logic. The
ProcessFunction [1] is Flink's low level API and gives you access to state
and timers. In it, you would need to buffer the elements and to sessionize
them yourself, though. However, it would give you access to the
watermark which in turn would allow you to properly handle your described
edge case.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Cheers,
Till

Cheers,
Till

On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch  wrote:

> I think one "edge" case which is not handled would be that the first event
> (by event-time) arrives late, then a wrong "started-window" would be
> reported.
>
> Rafi
>
>
> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale  wrote:
>
>> Is the reason ValueState cannot be use because session windows are always
>> formed by merging proto-windows of single elements, therefore a state store
>> is needed that can handle merging. ValueState does not provide this
>> functionality, but a ReducingState does?
>>
>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale  wrote:
>>
>>> Hi Till,
>>> Thanks for your answer! You also answered the next question that I was
>>> about to ask "Can we share state between a Trigger and a Window?" Currently
>>> the only (convoluted) way to share state between two operators is through
>>> the broadcast state pattern, right?
>>> Also, in your example, why can't we use a ValueStateDescriptor
>>> in the Trigger? I tried using it in my own example but it  I am not able
>>> to  call the mergePartitionedState() method on a ValueStateDescriptor.
>>>
>>> Regards,
>>> Manas
>>>
>>>
>>>
>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Manas,

 you can implement something like this with a bit of trigger magic. What
 you need to do is to define your own trigger implementation which keeps
 state to remember whether it has triggered the "started window" message or
 not. In the stateful window function you would need to do something
 similar. The first call could trigger the output of "window started" and
 any subsequent call will trigger the evaluation of the window. It would
 have been a bit easier if the trigger and the window process function could
 share its internal state. Unfortunately, this is not possible at the 
 moment.

 I've drafted a potential solution which you can find here [1].

 [1]
 https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef

 Cheers,
 Till

 On Mon, Feb 17, 2020 at 8:09 AM Manas Kale 
 wrote:

> Hi,
> I want to achieve the following using event time session windows:
>
>1. When the window.getStart() and last event timestamp in the
>window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a
>message "Window started @ timestamp".
>2. When the session window ends, i.e. the watermark passes
>lasteventTimestamp + inactivityPeriod, I want to emit a message "Window
>ended @ timestamp".
>
>  It is guaranteed that all events are on time and no lateness is
> allowed. I am having difficulty implementing both 1 and 2 simultaneously.
> I am able to implement point 1 using a custom trigger, which checks
> if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and 
> triggers
> a customProcessWindowFunction().
> However, with this architecture I can't detect the end of the window.
>
> Is my approach correct or is there a completely different method to
> achieve this?
>
> Thanks,
> Manas Kale
>
>
>
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Yun Gao
Congratulations Jingsong!

   Best,
   Yun


--
From:Jingsong Li 
Send Time:2020 Feb. 21 (Fri.) 21:42
To:Hequn Cheng 
Cc:Yang Wang ; Zhijiang ; 
Zhenghua Gao ; godfrey he ; dev 
; user 
Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

Thanks everyone~

It's my pleasure to be part of the community. I hope I can make a better 
contribution in future.

Best,
Jingsong Lee
On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:

Congratulations Jingsong! Well deserved.

Best, 
Hequn 
On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
Congratulations!Jingsong. Well deserved.


Best,
Yang

Zhijiang  于2020年2月21日周五 下午1:18写道:
Congrats Jingsong! Welcome on board!

Best,
Zhijiang

--
From:Zhenghua Gao 
Send Time:2020 Feb. 21 (Fri.) 12:49
To:godfrey he 
Cc:dev ; user 
Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

Congrats Jingsong!


Best Regards,
Zhenghua Gao

On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
Congrats Jingsong! Well deserved.

Best,
godfrey
Jeff Zhang  于2020年2月21日周五 上午11:49写道:
Congratulations!Jingsong. You deserve it 

wenlong.lwl  于2020年2月21日周五 上午11:43写道:
Congrats Jingsong!

 On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:

 > Congrats Jingsong!
 >
 > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
 > >
 > > Congratulations Jingsong! Well deserved.
 > >
 > > Best,
 > > Jark
 > >
 > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
 > >
 > >> Congratulations! Jingsong
 > >>
 > >>
 > >> Best,
 > >> Dan Zou
 > >>
 >
 >


-- 
Best Regards

Jeff Zhang



-- 
Best, Jingsong Lee



Re: Flink's Either type information

2020-02-21 Thread Yun Gao
  Hi Jacopo, Robert, 

 Very sorry for missing the previous email and not response in time. I 
think exactly as Robert has pointed out with the example: using inline 
anonymous subclass of KeyedBroadcastProcessFunction should not cause the 
problem. As far as I know, the possible reason that cause the attached 
exception might be that the parameter types of Either get erased due to the way 
to create KeyedBroadcastProcessFunction object. For example, if you first 
implement a generic subclass of KeyedBroadcastProcessFunction like:

public class MyKeyedBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction, String, 
Either> { ... }

 and create a function object directly when constructing the DataStream job:

stream.process(new MyKeyedBroadcastProcessFunction());

 Then MyLeftType and MyRightType will be erased and will cause the attached 
exception when Flink tries to inference the output type. 

 And I totally agree with Robert that attaching the corresponding codes 
would help debugging the problem.

  Yours,
Yun



--
From:Robert Metzger 
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi 
Cc:yungao.gy ; user 
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in 
this artificial example:

MapStateDescriptor state = new MapStateDescriptor<>("test", 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream> result = input
  .map((MapFunction>) value -> Tuple2.of(0, 
value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
String.class))
  .keyBy(0).connect(input.broadcast(state))
  .process(new KeyedBroadcastProcessFunction, String, Either>() {
 @Override
 public void processElement(Tuple2 value, 
ReadOnlyContext ctx, Collector> out) throws Exception {
out.collect(Either.Left(111));
 }
 @Override
 public void processBroadcastElement(String value, Context ctx, 
Collector> out) throws Exception { }
  });
result.print();
On Wed, Feb 19, 2020 at 6:07 PM  wrote:

Yes, I create it the way you mentioned.
 
From: Yun Gao [mailto:yungao...@aliyun.com] 
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information
 
  Hi Jacopo,
 
  Could you also provide how the KeyedBroadcastProcessFunction is 
created when constructing datastream API ? For example, are you using something 
like 
 
  new KeyedBroadcastProcessFunction() { 
   // Function implementation
 }
 
 or something else?
 
 Best, 
  Yun
 
 
--
From:jacopo.gobbi 
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user 
Subject:Flink's Either type information
 
Hi all,
 
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on 
Either type as it does not contain information about the 'left' type." when 
doing: out.collect(Either.Right(myObject));
 
Thanks,
 
Jacopo Gobbi
 
 



Re: Running Flink Cluster on PaaS

2020-02-21 Thread Yang Wang
> I always wonder what do you guys mean by "Standalone Flink session" or
"Standalone Cluster" ...

"Standalone Flink session" usually means an empty Flink cluster is started
and
could accept multiple jobs submission from the Flink client or webui. Even
all
the jobs finished, the session cluster will still be there until you
manually stop it.

"Standalone Flink per-job" means a dedicated Flink cluster is started for
only
one the job. It has better isolation. Usually this mode is used in container
environment. And users build in their jars and dependencies in the image.
You could check how to build your custom image and run a per-job cluster
here[1].


[1].
https://github.com/apache/flink/blob/release-1.10/flink-container/kubernetes/README.md#deploy-flink-job-cluster


Best,
Yang

KristoffSC  于2020年2月21日周五 下午4:03写道:

> Thank you Yang Wang,
>
> Regarding [1] and a sentence from that doc.
> "This page describes deploying a standalone Flink session"
>
> I always wonder what do you guys mean by "Standalone Flink session" or
> "Standalone Cluster" that can be found here [2].
>
> I'm using a Docker with Job Cluster approach, I know that there is also a
> Session Cluster docker images. I understand the differences, but I'm not
> sure what you are referring to using those to terms from [1] and [2].
>
> Thanks,
> Krzysztof
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#cluster_setup.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question: Determining Total Recovery Time

2020-02-21 Thread Arvid Heise
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure
that the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon
restart. Performance will take a hit but if that is significant depends on
the remaining pipeline.

Btw, at least once should be enough for that, since you implicitly
deduplicating.

Best,

Arvid

On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys <
morgan.geldenh...@tu-berlin.de> wrote:

> Thanks for the advice, i will look into it.
>
> Had a quick think about another simple solution but we would need a hook
> into the checkpoint process from the task/operator perspective, which I
> haven't looked into yet. It would work like this:
>
> - The sink operators (?) would keep a local copy of the last message
> processed (or digest?), the current timestamp, and a boolean value
> indicating whether or not the system is in recovery or not.
> - While not in recovery, update the local copy and timestamp with each new
> event processed.
> - When a failure is detected and the taskmanagers are notified to
> rollback, we use the hook into this process to switch the boolean value to
> true.
> - While true, it compares each new message with the last one processed
> before the recovery process was initiated.
> - When a match is found, the difference between the previous and current
> timestamp is calculated and outputted as a custom metric and the boolean is
> reset to false.
>
> From here, the mean total recovery time could be calculated across the
> operators. Not sure how it would impact on performance, but i doubt it
> would be significant. We would need to ensure exactly once so that the
> message would be guaranteed to be seen again. thoughts?
>
> On 11.02.20 08:57, Arvid Heise wrote:
>
> Hi Morgan,
>
> as Timo pointed out, there is no general solution, but in your setting,
> you could look at the consumer lag of the input topic after a crash. Lag
> would spike until all tasks restarted and reprocessing begins. Offsets are
> only committed on checkpoints though by default.
>
> Best,
>
> Arvid
>
> On Tue, Feb 4, 2020 at 12:32 PM Timo Walther  wrote:
>
>> Hi Morgan,
>>
>> as far as I know this is not possible mostly because measuring "till the
>> point when the system catches up to the last message" is very
>> pipeline/connector dependent. Some sources might need to read from the
>> very beginning, some just continue from the latest checkpointed offset.
>>
>> Measure things like that (e.g. for experiments) might require collecting
>> own metrics as part of your pipeline definition.
>>
>> Regards,
>> Timo
>>
>>
>> On 03.02.20 12:20, Morgan Geldenhuys wrote:
>> > Community,
>> >
>> > I am interested in determining the total time to recover for a Flink
>> > application after experiencing a partial failure. Let's assume a
>> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
>> > guarantees enabled.
>> >
>> > Taking a look at the documentation
>> > (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>>
>> > one of the metrics which can be gathered is /recoveryTime/. However, as
>> > far as I can tell this is only the time taken for the system to go from
>> > an inconsistent state back into a consistent state, i.e. restarting the
>> > job. Is there any way of measuring the amount of time taken from the
>> > point when the failure occurred till the point when the system catches
>> > up to the last message that was processed before the outage?
>> >
>> > Thank you very much in advance!
>> >
>> > Regards,
>> > Morgan.
>>
>>
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Jingsong Li
Thanks everyone~

It's my pleasure to be part of the community. I hope I can make a better
contribution in future.

Best,
Jingsong Lee

On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:

> Congratulations Jingsong! Well deserved.
>
> Best,
> Hequn
>
> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
>
>> Congratulations!Jingsong. Well deserved.
>>
>>
>> Best,
>> Yang
>>
>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>>
>>> Congrats Jingsong! Welcome on board!
>>>
>>> Best,
>>> Zhijiang
>>>
>>> --
>>> From:Zhenghua Gao 
>>> Send Time:2020 Feb. 21 (Fri.) 12:49
>>> To:godfrey he 
>>> Cc:dev ; user 
>>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>>
>>> Congrats Jingsong!
>>>
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>>> Congrats Jingsong! Well deserved.
>>>
>>> Best,
>>> godfrey
>>>
>>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>>> Congratulations!Jingsong. You deserve it
>>>
>>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>>> Congrats Jingsong!
>>>
>>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>>
>>> > Congrats Jingsong!
>>> >
>>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>>> > >
>>> > > Congratulations Jingsong! Well deserved.
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>>> > >
>>> > >> Congratulations! Jingsong
>>> > >>
>>> > >>
>>> > >> Best,
>>> > >> Dan Zou
>>> > >>
>>> >
>>> >
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>>
>>>

-- 
Best, Jingsong Lee


Re: FsStateBackend vs RocksDBStateBackend

2020-02-21 Thread Robert Metzger
I would try the FsStateBackend in this scenario, as you have enough memory
available.

On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang  wrote:

> Hi Gordon,
>
> Thanks for your reply! Regarding state size - we are at 200-300gb but we
> have 120 parallelism which will make each task handle ~2 - 3 gb state.
> (when we submit the job we are setting tm memory to 15g.) In this scenario
> what will be the best fit for statebackend?
>
> Thanks,
> Ran
>
> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Ran,
>>
>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang  wrote:
>>
>>> Hi all,
>>>
>>> We have a Flink app that uses a KeyedProcessFunction, and in the
>>> function it requires a ValueState(of TreeSet) and the processElement method
>>> needs to access and update it. We tried to use RocksDB as our stateBackend
>>> but the performance is not good, and intuitively we think it was because of
>>> the serialization / deserialization on each processElement call.
>>>
>>
>> As you have already pointed out, serialization behaviour is a major
>> difference between the 2 state backends, and will directly impact
>> performance due to the extra runtime overhead in RocksDB.
>> If you plan to continue using the RocksDB state backend, make sure to use
>> MapState instead of ValueState where possible, since every access to the
>> ValueState in the RocksDB backend requires serializing / deserializing the
>> whole value.
>> For MapState, de-/serialization happens per K-V access. Whether or not
>> this makes sense would of course depend on your state access pattern.
>>
>>
>>> Then we tried to switch to use FsStateBackend (which keeps the in-flight
>>> data in the TaskManager’s memory according to doc), and it could resolve
>>> the performance issue. *So we want to understand better what are the
>>> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint
>>> size is 200 - 300 GB in stable state. For now we know one benefits of
>>> RocksDB is it supports incremental checkpoint, but would love to know what
>>> else we are losing in choosing FsStateBackend.
>>>
>>
>> As of now, feature-wise both backends support asynchronous snapshotting,
>> state schema evolution, and access via the State Processor API.
>> In the end, the major factor for deciding between the two state backends
>> would be your expected state size.
>> That being said, it could be possible in the future that savepoint
>> formats for the backends are changed to be compatible, meaning that you
>> will be able to switch between different backends upon restore [1].
>>
>>
>>>
>>> Thanks a lot!
>>> Ran Zhang
>>>
>>
>> Cheers,
>> Gordon
>>
>>  [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>
>


请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?

2020-02-21 Thread butnet
Hi all:
请问Flink Sink的close方法为什么会被反复执行,但open并没有被调用,他们不是成对出现吗?
以下是Sink的实现和日志,Sink主要做数据库的异步输出,我在open和close中输出日志,
通过日志发现,open只调用了一次,后面非常多的close,请问什么原因,他们不应该是成对出现吗?
环境:JDK8, Flink: flink-1.8.2
Flink是通过标准集群方式(./start-cluster.sh)启动
感谢大家。


|

public class MySqlSink extends CounterRichSinkFunction {
private static final Logger log = LoggerFactory.getLogger(MySqlSink.class);
private final MySqlUpsertor upsertor;
private transient volatile BatchThread batchThread;
private transient volatile DataBaseUtil dataBaseUtil;
private final String name;

public MySqlSink(String name) {
this(null, name);
}

public MySqlSink(MySqlUpsertor upsertor, String name) {
this.upsertor = upsertor;
this.name = name;
}

@Override
public String getName() {
return name;
}

@Override
public void invoke(T value, Context context) throws Exception {
try {
this.counter.inc();
batchThread.push(value);
} catch (Throwable e) {
this.counterError.inc();
log.info("异步输出异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
}

@Override
public void open(Configuration parameters) throws Exception {
log.info("异步输出 {}@{} open", name, hashCode());
super.open(parameters);
try {
if (dataBaseUtil == null) {
dataBaseUtil = DataBaseUtil.getInstance();
}
if (batchThread == null) {
batchThread = new BatchThread<>(getRuntimeContext(), dataBaseUtil, upsertor, 
name, () -> batchThread = null);
batchThread.start();
}
} catch (Throwable e) {
log.info("创建异常输出线程异常: {}@{} {}", name, hashCode(), e.toString(), e);
}
}

@Override
public void close() throws Exception {
log.info("异步输出 {}@{} close", name, hashCode());
super.close();
}
}

|


日志:
2020-02-21 20:29:49 [   main] INFO [util.FlinkUtil] 
windowTime: 6, paramWaterInterval: 4
2020-02-21 20:29:50 [   main] WARN [job.alarm.LimitAlarmJob   ] 未配置 
activityNgKafka 参数
2020-02-21 20:29:50 [   main] WARN [job.alarm.LimitAlarmJob   ] 未配置 
remoteKafka 参数
2020-02-21 20:29:52 [: ng-host (3/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@97861042 open
2020-02-21 20:29:52 [k: ng-url (3/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@1582284274 open
2020-02-21 20:29:52 [: ng-host (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@1255075694 open
2020-02-21 20:29:52 [: ng-host (4/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@971447764 open
2020-02-21 20:29:52 [k: ng-url (2/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@793060686 open
2020-02-21 20:29:52 [k: ng-url (5/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@275482810 open
2020-02-21 20:29:52 [k: ng-url (4/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@543530928 open
2020-02-21 20:29:52 [: ng-host (2/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@989133776 open
2020-02-21 20:29:52 [k: ng-url (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@2092790834 open
2020-02-21 20:29:52 [: ng-host (5/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@268528771 open
2020-02-21 20:29:52 [ng-host] INFO [common.BatchThread] 
BatchThread ng-host-1 start
2020-02-21 20:29:52 [ng-host] INFO [common.BatchThread] 
BatchThread ng-host-4 start
2020-02-21 20:29:52 [ng-host] INFO [common.BatchThread] 
BatchThread ng-host-2 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread] 
BatchThread ng-url-3 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread] 
BatchThread ng-url-5 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread] 
BatchThread ng-url-6 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread] 
BatchThread ng-url-7 start
2020-02-21 20:29:52 [ng-host] INFO [common.BatchThread] 
BatchThread ng-host-8 start
2020-02-21 20:29:52 [ ng-url] INFO [common.BatchThread] 
BatchThread ng-url-9 start
2020-02-21 20:29:52 [ng-host] INFO [common.BatchThread] 
BatchThread ng-host-10 start
2020-02-21 20:30:41 [k: ng-url (2/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@793060686 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@543530928 close
2020-02-21 20:30:41 [k: ng-url (4/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@543530928 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@1255075694 close
2020-02-21 20:30:41 [: ng-host (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-host@1255075694 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink  ] 异步输出 
ng-url@2092790834 close
2020-02-21 20:30:41 [k: ng-url (1/5)] INFO [common.MySqlSink  ] 异步输出 

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

2020-02-21 Thread Robert Metzger
Hey Niels,

This minimal Flink job executes in Flink 1.10:

public static void main(String[] args) throws Exception {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
   DataStream input = env.addSource(new StringSourceFunction());
   List result = new ArrayList<>(5);
   DataStreamUtils.collect(input).forEachRemaining(result::add);
   env.execute("Flink Streaming Java API Skeleton");
}

Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that
breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes  wrote:

> Hi Gordon,
>
> Thanks. This works for me.
>
> I find it strange that when I do this it works (I made the differences
> bold)
>
> List result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> *resultDataStream.print();*
>
> environment.execute();
>
>
> how ever this does not work
>
> List result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> environment.execute();
>
>
> and this also does not work
>
> *resultDataStream.print();*
>
> List result = new ArrayList<>(5);
>
> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>
> environment.execute();
>
>
> In both these cases it fails with
>
>
> java.lang.IllegalStateException: *No operators defined in streaming
> topology. Cannot execute.*
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
> at
> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>
>
>
> Did I do something wrong?
> Is this a bug in the DataStreamUtils ?
>
> Niels Basjes
>
>
>
> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai  wrote:
>
>> Hi,
>>
>> To collect the elements of a DataStream (usually only meant for testing
>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Flink's Either type information

2020-02-21 Thread Robert Metzger
Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it
worked in this artificial example:

MapStateDescriptor state = new
MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
DataStream> result = input
  .map((MapFunction>) value ->
Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
String.class))
  .keyBy(0).connect(input.broadcast(state))
  .process(new KeyedBroadcastProcessFunction, String, Either>() {
 @Override
 public void processElement(Tuple2 value,
ReadOnlyContext ctx, Collector> out) throws
Exception {
out.collect(Either.Left(111));
 }
 @Override
 public void processBroadcastElement(String value, Context
ctx, Collector> out) throws Exception { }
  });
result.print();


On Wed, Feb 19, 2020 at 6:07 PM  wrote:

> Yes, I create it the way you mentioned.
>
>
>
> *From:* Yun Gao [mailto:yungao...@aliyun.com]
> *Sent:* Dienstag, 18. Februar 2020 10:12
> *To:* Gobbi, Jacopo-XT; user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>   Hi Jacopo,
>
>
>
>   Could you also provide how the KeyedBroadcastProcessFunction is
> created when constructing datastream API ? For example, are you using
> something like
>
>
>
>   new KeyedBroadcastProcessFunction Either() {
>
>// Function implementation
>
>  }
>
>
>
>  or something else?
>
>
>
>  Best,
>
>   Yun
>
>
>
>
>
> --
>
> From:jacopo.gobbi 
>
> Send Time:2020 Feb. 17 (Mon.) 18:31
>
> To:user 
>
> Subject:Flink's Either type information
>
>
>
> Hi all,
>
>
>
> How can an Either value be returned by a KeyedBroadcastProcessFunction?
>
> We keep getting "InvalidTypesException: Type extraction is not possible on
> Either type as it does not contain information about the 'left' type." when
> doing: out.collect(Either.Right(myObject));
>
>
>
> Thanks,
>
>
>
> Jacopo Gobbi
>
>
>
>
>


Re: Tests in FileUtilsTest while building Flink in local

2020-02-21 Thread Andrey Zagrebin
These tests also fail on my mac. It may be some mac os setup related issue. I 
create a JIRA ticket for that:
https://issues.apache.org/jira/browse/FLINK-16198 

> On 20 Feb 2020, at 12:03, Chesnay Schepler  wrote:
> 
> Is the stacktrace identical in both tests?
> 
> Did these fail on the command-line or in the IDE?
> 
> Can you check what directory the java.io.tmpdir points to?
> 
> On 19/02/2020 20:42, Arujit Pradhan wrote:
>> Hi all,
>> 
>> I was trying to build Flink in my local machine and these two unit tests are 
>> failing.
>> 
>> [ERROR] Errors:
>> [ERROR]   
>> FileUtilsTest.testCompressionOnRelativePath:261->verifyDirectoryCompression:440
>>  » NoSuchFile
>> [ERROR]   FileUtilsTest.testDeleteDirectoryConcurrently » FileSystem 
>> /var/folders/x9/tr2...
>> 
>> I am building on these versions
>> Java 1.8.0_221
>> maven 3.6.3
>> and OS is Mac Catalina(10.15).
>> 
>> Did anyone face this issue? Am I missing something?
>> 
>> The stack-trace is :
>> java.nio.file.NoSuchFileException: 
>> ../../../../../../../../var/folders/x9/tr2xclq51sx891lbntv7bwy4gn/T/junit3367096668518353289/compressDir/rootDir
>> 
>> at 
>> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>> at 
>> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>> at 
>> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>> at 
>> java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389)
>> at java.base/java.nio.file.Files.createDirectory(Files.java:689)
>> at 
>> org.apache.flink.util.FileUtilsTest.verifyDirectoryCompression(FileUtilsTest.java:440)
>> at 
>> org.apache.flink.util.FileUtilsTest.testCompressionOnRelativePath(FileUtilsTest.java:261)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>> at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at 
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>> at 
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at 
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>> at 
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>> at 
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>> at 
>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>> at 
>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>> at 
>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
>> 
>> Thanks in advance. 
> 



Re: Flink Kafka connector consume from a single kafka partition

2020-02-21 Thread Robert Metzger
Hey Hemant,

Are you able to reconstruct the ordering of the event, for example based on
time or some sequence number?
If so, you could create as many Kafka partitions as you need (for proper
load distribution), disregarding any ordering at that point.
Then you keyBy your stream in Flink, and order it within a window operator
(or some custom logic in a process function)
Flink is able to handle quite large states using the RocksDB statebackend.

Best,
Robert


On Wed, Feb 19, 2020 at 6:34 PM hemant singh  wrote:

> Hi Arvid,
>
> Thanks for your response. I think I did not word my question properly.
> I wanted to confirm that if the data is distributed to more than one
> partition then the ordering cannot be maintained (which is documented).
> According to your response I understand if I set the parallelism to number
> of partition then each consumer will consume from one partition and
> ordering can be maintained.
>
> However, I have a question here in case my parallelism is less than number
> of partitions still I believe if I create keyedstream ordering will be
> maintained at operator level for that key. Correct me if I am wrong.
>
> Second, one issue/challenge which I see with this model is one of the
> source's frequency of pushing data is very high then one partition is
> overloaded. Hence the task which process this will be overloaded too,
> however for maintaining ordering I do not have any other options but to
> maintain data in one partition.
>
> Thanks,
> Hemant
>
> On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise  wrote:
>
>> Hi Hemant,
>>
>> Flink passes your configurations to the Kafka consumer, so you could
>> check if you can subscribe to only one partition there.
>>
>> However, I would discourage that approach. I don't see the benefit to
>> just subscribing to the topic entirely and have dedicated processing for
>> the different devices.
>>
>> If you are concerned about the order, you shouldn't. Since all events of
>> a specific device-id reside in the same source partition, events are
>> in-order in Kafka (responsibility of producer, but I'm assuming that
>> because of your mail) and thus they are also in order in non-keyed streams
>> in Flink. Any keyBy on device-id or composite key involving device-id,
>> would also retain the order.
>>
>> If you have exactly one partition per device-id, you could even go with
>> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.
>>
>> Let me know if I misunderstood your use case or if you have further
>> questions.
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Feb 19, 2020 at 8:39 AM hemant singh 
>> wrote:
>>
>>> Hello Flink Users,
>>>
>>> I have a use case where I am processing metrics from different type of
>>> sources(one source will have multiple devices) and for aggregations as well
>>> as build alerts order of messages is important. To maintain customer data
>>> segregation I plan to have single topic for each customer with each source
>>> stream data to one kafka partition.
>>> To maintain ordering I am planning to push data for a single source type
>>> to single partitions. Then I can create keyedstream so that each of the
>>> device-id I have a single stream which has ordered data for each device-id.
>>>
>>> However, flink-kafka consumer I don't see that I can read from a
>>> specific partition hence flink consumer read from multiple kafka
>>> partitions. So even if I try to create a keyedstream on source type(and
>>> then write to a partition for further processing like keyedstream on
>>> device-id) I think ordering will not be maintained per source type.
>>>
>>> Only other option I feel I am left with is have single partition for the
>>> topic so that flink can subscribe to the topic and this maintains the
>>> ordering, the challenge is too many topics(as I have this configuration for
>>> multiple customers) which is not advisable for a kafka cluster.
>>>
>>> Can anyone shed some light on how to handle this use case.
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: AWS Client Builder with default credentials

2020-02-21 Thread Robert Metzger
There are multiple ways of passing configuration parameters to your user
defined code in Flink

a)  use getRuntimeContext().getUserCodeClassLoader().getResource() to load
a config file from your user code jar or the classpath.
b)  use getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to
access a configuration object serialized from the main method.
you can pass a custom object to the job parameters, or use Flink's
"Configuration" object in your main method:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();
config.setString("foo", "bar");
env.getConfig().setGlobalJobParameters(config);

c) Load the flink-conf.yaml:

Configuration conf = GlobalConfiguration.loadConfiguration();

I'm not 100% sure if this approach works, as it is not intended to be used
in user code (I believe).


Let me know if this helps!

Best,
Robert

On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler  wrote:

> First things first, we do not intend for users to use anything in the S3
> filesystem modules except the filesystems itself,
> meaning that you're somewhat treading on unsupported ground here.
>
> Nevertheless, the S3 modules contain a large variety of AWS-provided
> CerentialsProvider implementations,
> that can derive credentials from environment variables, system properties,
> files on the classpath and many more.
>
> Ultimately though, you're kind of asking us how to use AWS APIs, for which
> I would direct you to the AWS documentation.
>
> On 20/02/2020 13:16, David Magalhães wrote:
>
> I'm using
> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder
> to create a S3 client to copy objects and delete object inside
> a TwoPhaseCommitSinkFunction.
>
> Shouldn't be another way to set up configurations without put them
> hardcoded ? Something like core-site.xml or flink-conf.yaml ?
>
> Right now I need to have them hardcoded like this.
>
> AmazonS3ClientBuilder.standard
>   .withPathStyleAccessEnabled(true)
>   .withEndpointConfiguration(
> new EndpointConfiguration("http://minio:9000;, "us-east-1")
>   )
>   .withCredentials(
> new AWSStaticCredentialsProvider(new BasicAWSCredentials("minio",
> "minio123"))
>   )
>   .build
>
> Thanks
>
>
>


Flink Weekly | 每周社区动态更新 - 2020/02/21

2020-02-21 Thread 蒋晓峰
大家好,本文为 Flink Weekly 的第六期,由蒋晓峰(子懿)整理,主要内容包括:Flink 1.10 版本的发布,近期社区开发进展,Flink 
Forward San Francisco 等活动和相关博客。




社区开发进展

==




Apache Flink 
1.10.0版本于2020年2月11日正式发布。该版本实现容纳超过200位贡献者对超过1200个issue的开发实现,包含对Flink作业的整体性能及稳定性的显著优化、对原生的Kubernetes的初步集成以及对Python(PyFlink)的重大优化,标志着对Blink的整合宣告完成,在增强流式SQL处理能力的同时具备成熟的批处理能力。

更多信息请参考:

[1]https://flink.apache.org/news/2020/02/11/release-1.10.0.html

[2]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-10-0-released-td37564.html




Apache Flink Python API(PyFlink) 1.9.2版本于2020年2月13日正式发布。该版本是Apache Flink Python 
API 1.9系列的PyPI的第一个发布版本。

更多信息请参考:

[3]https://pypi.org/project/apache-flink/1.9.2/#files

[4]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-Python-API-PyFlink-1-9-2-released-td37597.html




Apache Flink-shaded 
10.0版本于2020年2月19日正式发布。该版本修复flink-shaded-hadoop-2-uber绑定错误的依赖版本问题,增加zk+curator模块以及禁止在META-INF
 /maven下包含原始pom的优化。

更多信息请参考:

[5]https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346746

[6]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-shaded-10-0-released-td37815.html




Aljoscha在Apache 
Flink社区提出一套新版的文档样式指南,样式指南提出最重要的几点是需要使用直接的语言并以读者为对象而不是被动的构造,并且使用“警报块”而不是简单的内联警报标签。

更多信息请参考:

[7]https://flink.apache.org/contributing/docs-style.html

[8]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Documentation-Style-Guide-td37673.html




Apache Flink社区宣布Flink Forward San Francisco 
2020计划,会议将于3月23日至25日在旧金山凯悦酒店举行。第一天提供Apache Flink开发人员培训,Apache 
Flink运行时和操作培训,Apache Flink调整和故障排除培训以及Apache Flink 
SQL开发人员培训四个培训课程,第二天和第三天有一系列精彩演讲分享包括来自AWS,Bird,Cloudera,Lyft,Netflix,Splunk,Uber,Yelp,阿里巴巴,Ververica等。

更多信息请参考:

[9]https://events.evolutionaryevents.com/flink-forward-sf-2020

[10]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Flink-Forward-San-Francisco-2020-Program-is-Live-td37676.html





来自阿里巴巴的李劲松受邀成为Apache Flink Committer,李劲松此前是Apache Beam Committer,在Apache 
Flink社区主要负责Flink SQL,整合Blink Planner以及实现修复Flink 
SQL诸多功能和Bug工作,活跃于开发者和用户邮件列表,帮助讨论设计并且回答用户相关问题,也帮助验证各种发布版本。恭喜李劲松!

更多信息请参考:

[11]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Jingsong-Lee-becomes-a-Flink-committer-td37938.html




Apache Flink社区一致通过DianFu发起的Scalar vectorized Python UDF in 
PyFlink讨论,意味着支持在向量化的Python UDF中一批行以列格式在JVM和Python 
VM之间传输,这批行将转换为Pandas.Series的集合并提供给矢量化的Python 
UDF,然后利用流行的Python库(例如Pandas,Numpy等)实现Python UDF。

更多信息参考:

[12]https://cwiki.apache.org/confluence/display/FLINK/FLIP-97%3A+Support+Scalar+Vectorized+Python+UDF+in+PyFlink

[13]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-97-Support-scalar-vectorized-Python-UDF-in-PyFlink-td37745.html




伍翀发起FLIP-105关于支持Flink 
SQL解释和发出Changelog的讨论,本质上意味着能够将Changelog(Debezium,紧凑主题等)解释为更新模式下的动态表,之后生成的持续更新表可能是直接用于(时间表)联接和聚合。FLIP-105提出两种实现方法:一种是引入新的TableSource接口(更高优先级),另一种是引入新的SQL语法来解释和发出Changelog。

更多信息参考:

[14]https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#

[15]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-105-Support-to-Interpret-and-Emit-Changelog-in-Flink-SQL-td37665.html




程鹤群发起关于FLIP流程的改进的讨论,改进的FLIP流程步骤是1.在邮件列表上进行讨论。线程的主题格式为[DISCUSS] [FLIP] 
{您的FLIP标题},另外设计文件应严格遵循FLIP模板;2.在讨论中达成协议后,创建一个FLIP Wiki页面,将Google文档复制到FLIP 
Wiki页面中;3.提案最终确定后,进行表决以通过提案。

更多信息参考:

[16]https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

[17]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvements-on-FLIP-Process-td37785.html




徐帅发起关于notFollowedBy()作为模式的最后一部分的讨论,CEP本身不支持notFollowBy()作为模式的最后一部分,通过如果模式以notFollowBy()结尾且时间间隔在T以内将其视为有效模式,如果前一个模式已匹配并且在间隔期间未出现notFollowBy()模式则从开始阶段的时间T之后将触发此模式来启用此功能。

更多信息参考:

[18]https://docs.google.com/document/d/1swUSHcVxbkWm7EPdOfOQXWj-A4gGDA8Y8R1DOUjokds/edit#

[19]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-notFollowedBy-with-interval-as-the-last-part-of-a-Pattern-td37513.html

Dawid发起关于删除ElasticSearch 2.X和5.X连接器的讨论,ElasticSearch 
5.X连接器在某些系统上无法立即使用并且需要版本缓冲,并且无法在除ElasticSearch 
5.X连接器中修改版本,因为5.x连接器与2.x共享一个公共类,建议删除至少2.x连接器并将5.X更新为可正常运行的ElasticSearch客户端模块。

更多信息参考:

[20]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-connectors-for-Elasticsearch-2-x-and-5-x-td37471.html




Stephan发起开启1.11版本发布周期的讨论,Piotrek和Zhijiang负责担任1.11版本的Release Manager,发布日期按照最初的“ 
3个月发布周期”假设定于五月中旬,意味着功能不迟于4月底冻结,确切的功能冻结日期仍需稍后社区讨论。

更多信息参考:

[21]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Kicking-off-the-1-11-release-cycle-td37817.html




杨克特发起在Table 
Env和ConnectTableDescriptor中删除TableSource/TableSink的注册讨论,FLIP-64解释为什么弃用TableSource和来自的TableSink接口,这些接口不适合逻辑表字段例如计算列,水印。Table
 
Env中registerTableSource的暴露使得整个SQL协议相反。TableSource用作表的读取器应该依赖框架保存的其他元数据信息,这些信息最终来自

DDL或ConnectDescriptor。

更多信息参考:

[22]https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module


Flink 1.10连接hive时kerberos认证异常问题

2020-02-21 Thread sunfulin
Hi,
我使用Flink 
1.10集成hive,在连接metastore的时候由于hive对应CDH集群开启了kerberos认证,抛出了如下异常:请问大家这个该怎么配置或者解决哈?


999  [main] INFO  hive.metastore  - Trying to connect to metastore with URI 
thrift://namenode01.htsc.com:9083
1175 [main] ERROR org.apache.thrift.transport.TSaslTransport  - SASL 
negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
  at 
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
  at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
  at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
  at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:420)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236)
  at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:181)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
  at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:118)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:43)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:71)
  at 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
  at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
  at 
org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.doJob(HiveMetaJob.java:44)
  at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:50)
  at 
com.htsc.crm_realtime.fatjob.Jobs.hive.HiveMetaJob.main(HiveMetaJob.java:23)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed 
to find any Kerberos tgt)
  at 
sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
  at 
sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
  at 
sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
  at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
  at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
  ... 34 more

Re: FlinkCEP questions - architecture

2020-02-21 Thread Juergen Donnerstag
thanks a lot
Juergen

On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas  wrote:

> Hi Juergen,
>
> I will reply to your questions inline. As a general comment I would
> suggest to also have a look at [3] so that you have an idea of some of
> the alternatives.
> With that said, here come the answers :)
>
> 1) We receive files every day, which are exports from some database
> tables, containing ONLY changes from the day. Most tables have
> modify-cols. Even though they are files but because they contain
> changes only, I belief the file records shall be considered events in
> Flink terminology. Is that assumption correct?
>
> -> Yes. I think your assumption is correct.
>
> 2) The records within the DB export files are NOT in chronologically,
> and we can not change the export. Our use case is a "complex event
> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
> A, then B, then C within 30 days, then do something". Does that work
> with FlinkCEP despite the events/records are not in chrono order
> within the file? The files are 100MB to 20GB in size. Do I need to
> sort the files first before CEP processing?
>
> -> Flink CEP also works in event time and the re-ordering can be done by
> Flink
>
> 3) Occassionally some crazy people manually "correct" DB records
> within the database and manually trigger a re-export of ALL of the
> changes for that respective day (e.g. last weeks Tuesday).
> Consequently we receive a correction file. Same filename but "_1"
> appended. All filenames include the date (of the original export).
> What are the options to handle that case (besides telling the DB
> admins not to, which we did already). Regular checkpoints and
> re-process all files since then?  What happens to the CEP state? Will
> it be checkpointed as well?
>
> -> If you require re-processing, then I would say that your best
> option is what you described. The other option would be to keep
> everything in Flink state until you are sure that no more corrections
> will come. In this case, you have to somehow issue the "correction" in
> a way that the downstream system can understand what to correct and
> how. Keep in mind that this may be an expensive operation because
> everything has to be kept in state for longer.
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> -> The only thing to consider is the size of your state. Time is not
> necessarily an issue. If your state for these 180 days is a couple of
> MBs, then you have no problem. If it increases fast, then you have to
> provision your cluster accordingly.
>
> 5) We also have CEP rules that must fire if after a start sequence
> matched, the remaining sequence did NOT within a configured window.
> E.g. If A, then B, but C did not occur within 30 days since A. Is that
> supported by FlinkCEP? I couldn't find a working example.
>
> -> You can have a look at [1] for the supported pattern combinations
> and you can also look at [2] for some tests of different pattern
> combinations.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage
> size for the temporary CEP state? Is there some sort of formular
> considering number of rules, number of records per file or day, record
> size, window, number of records matched per sequence, number of keyBy
> grouping keys, ...
>
> -> In FlinkCEP, each pattern becomes a single operator. This means
> that you will have 30-40 operators in your job graph, each with each
> own state. This can become heavy but once again it depends on your
> workload. I cannot give an estimate because in CEP, in order to
> guarantee correct ordering of events in an unordered stream, the
> library sometimes has to keep also in state more records than will be
> presented at the end.
>
> Have you considered going with a solution based on processfunction and
> broadcast state? This will also allow you to have a more dynamic
> set-up where patterns can be added at runtime and it will allow you to
> do any optimizations specific to your workload ;) For a discussion on
> this, check [3]. In addition, it will allow you to "multiplex" many
> patterns into a single operator thus potentially minimizing the amount
> of copies of the state you keep.
>
> 7) I can imagine that for debugging reasons it'd be good if we were
> able to query the temporary CEP state. What is the (CEP) schema used
> to persist the CEP state and how can we query it? And does such query
> work on the whole cluster or only per node (e.g. because of shuffle
> and nodes responsible only for a portion of the events).
>
> -> Unfortunatelly the state in CEP is not queryable, thus I am not
> sure if you can inspect it at runtime.
>
> 8) I understand state is stored per node. What happens if I want to
> add or remove a nodes. Will the state still be found, despite it being
> stored in another node? I read that I need to be equally careful when
> changing rules? Or is that a different issue?
>
> -> Rescaling a Flink job is not done 

[FLI]求大佬翻牌,看一个checkpoint 被 decline 的问题。

2020-02-21 Thread tao wang
FLINK 1.9.1 版本,线上任务运行的时候,偶现这个checkpoint 被decline的问题。能帮忙确认一下根本原因是什么吗? 是kafka
出问题还是代码有bug?



> 2020-02-21 08:32:15,738 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 941 of job 0e16cf38a0bff313544e1f31d078f75b.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 941 for operator PctrLogJoin -> (Sink: hdfsSink, Sink:
> kafkaSink) (8/36).   Failure reason: Checkpoint was declined.Checkpoint was
> declined.

at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1282)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1216)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:872)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:777)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:708)
> at
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
> at
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> at
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be
> zero at this point: 2
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:964)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:311)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> ... 17 more


Re: JDBC source running continuously

2020-02-21 Thread Jark Wu
Hi Fanbin,

.iterate() is not available on Table API, it's an API of DataStream.
Currently, the JDBC source is a bounded source (a snapshot of table at the
execution time), so the job will finish when it processes all the data.

Regarding to your requirement, "running continuously with JDBC source", we
should make it clear what do you want the source to read after the full
snapshot:
1) read a full snapshot again
2) read new inserted rows
3) read new inserted rows and updated rows and deleted rows.

For (1), you can create your own jdbc input format based on
JDBCInputFormat, trying to re-execute the SQL query while reading the last
row from DB in nextRecord. (this is the answer in the stackoverflow [1]).
For (2), in the nextRecord(), you need to execute a SQL query with a filter
to fetch rows which are greater than the last max ID or max created time.
For (3), this is a changelog support, which will be supported natively in
1.11 in Flink SQL.

Best,
Jark


On Fri, 21 Feb 2020 at 02:35, Fanbin Bu  wrote:

>
> https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server
>
> On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler 
> wrote:
>
>> Can you show us where you found the suggestion to use iterate()?
>>
>> On 20/02/2020 02:08, Fanbin Bu wrote:
>> > Hi,
>> >
>> > My app creates the source from JDBC inputformat and running some sql
>> > and print out. But the source terminates itself after the query is
>> > done. Is there anyway to keep the source running?
>> > samle code:
>> > val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > val settings = EnvironmentSettings.newInstance()
>> >   .useBlinkPlanner()
>> >   .inStreamingMode()
>> >   .build()
>> > val tEnv = StreamTableEnvironment.create(env, settings)
>> > val inputFormat
>> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from
>> > table")... .finish()
>> > val source = env.createInput(inputFormat)
>> > tEnv.registerTableSource(source)
>> > val queryResult = tEnv.sqlQuery("select * from awesomeSource")
>> > queryResult.insertInto(mySink)
>> >
>> >
>> > I searched around and its suggested to use .iterate(). can somebody
>> > give more examples on how to use it in this case?
>> >
>> > Thanks,
>> > Fanbin
>>
>>
>>


Re: Running Flink Cluster on PaaS

2020-02-21 Thread KristoffSC
Thank you Yang Wang,

Regarding [1] and a sentence from that doc.
"This page describes deploying a standalone Flink session"

I always wonder what do you guys mean by "Standalone Flink session" or
"Standalone Cluster" that can be found here [2].

I'm using a Docker with Job Cluster approach, I know that there is also a
Session Cluster docker images. I understand the differences, but I'm not
sure what you are referring to using those to terms from [1] and [2].

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#cluster_setup.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/