Re: how to configure window of join operator in batch mode

2023-04-25 Thread Jiadong Lu

Hi, Shammon,
Thank you for your reply.

Yes, the window configured with `Time.days(1)` has no special meaning,
it is just used to group all data into the same global window.
I tried using `GlobalWindow` for this scenario, but `GlobalWindow` also 
need a `Trigger` like 
`org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger` 
to tigger all data for window process.


So I think `ProcessingTimeWindow` with `Time.days(10)` may be a good 
solution for this scenario. What do you think?


As for what you mentioned
> use join directly
I have no idea about using join without window. Would you mind writing a 
demo about it?


Your help is greatly appreciated in advance.

Best,
Jiadong Lu

On 2023/4/26 09:53, Shammon FY wrote:

Hi Jiadong,

I think it depends on the specific role of the window here for you. If 
this window has no specific business meaning and is only used for 
performance optimization, maybe you can consider to use join directly


Best,
Shammon FY

On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu > wrote:


Hello,everyone,

I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if
this approach that using process time window in batch mode is
appropriate? and does this approach have any problems? I want to use
this solution to solve my problem(join two stream in batch mode).

```java
public static void main(String[] args) throws Exception {

          StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

          DataStream s1 = env.fromCollection(Stream.of(1,
2, 3,
4, 5, 6, 7).collect(Collectors.toList()));
          DataStream s2 = env.fromCollection(Stream.of(6,
5, 4,
3, 2, 1).collect(Collectors.toList()));

          s1.coGroup(s2)
                  .where(new KeySelector() {
                      @Override
                      public Integer getKey(Integer value) throws
Exception {
                          return value;
                      }
                  })
                  .equalTo(new KeySelector() {
                      @Override
                      public Integer getKey(Integer value) throws
Exception {
                          return value;
                      }

  }).window(TumblingProcessingTimeWindows.of(Time.days(1)))

                  .apply(new CoGroupFunction>() {
                      @Override
                      public void coGroup(Iterable first,
Iterable second, Collector> out)
throws Exception {
                          if (!second.iterator().hasNext()) {
                              for (Integer integer : first) {
                                  out.collect(new Tuple2<>(integer,
null));
                              }
                          } else {
                              for (Integer integer : first) {
                                  for (Integer integer1 : second) {
                                      out.collect(new Tuple2<>(integer,
integer1));
                                  }
                              }
                          }
                      }
                  }).printToErr();
          env.setParallelism(1);
          env.setRuntimeMode(RuntimeExecutionMode.BATCH);
          env.execute();
      }
```

Thanks in advance.

-- 
Jiadong Lu




Re: how to configure window of join operator in batch mode

2023-04-25 Thread Shammon FY
Hi Jiadong,

I think it depends on the specific role of the window here for you. If this
window has no specific business meaning and is only used for performance
optimization, maybe you can consider to use join directly

Best,
Shammon FY

On Tue, Apr 25, 2023 at 5:42 PM Jiadong Lu  wrote:

> Hello,everyone,
>
> I am confused about the window of join/coGroup operator in Batch mode.
> Here is my demo code, and it works fine for me at present. I wonder if
> this approach that using process time window in batch mode is
> appropriate? and does this approach have any problems? I want to use
> this solution to solve my problem(join two stream in batch mode).
>
> ```java
> public static void main(String[] args) throws Exception {
>
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  DataStream s1 = env.fromCollection(Stream.of(1, 2, 3,
> 4, 5, 6, 7).collect(Collectors.toList()));
>  DataStream s2 = env.fromCollection(Stream.of(6, 5, 4,
> 3, 2, 1).collect(Collectors.toList()));
>
>  s1.coGroup(s2)
>  .where(new KeySelector() {
>  @Override
>  public Integer getKey(Integer value) throws Exception
> {
>  return value;
>  }
>  })
>  .equalTo(new KeySelector() {
>  @Override
>  public Integer getKey(Integer value) throws Exception
> {
>  return value;
>  }
>  }).window(TumblingProcessingTimeWindows.of(Time.days(1)))
>  .apply(new CoGroupFunction Tuple2>() {
>  @Override
>  public void coGroup(Iterable first,
> Iterable second, Collector> out)
> throws Exception {
>  if (!second.iterator().hasNext()) {
>  for (Integer integer : first) {
>  out.collect(new Tuple2<>(integer, null));
>  }
>  } else {
>  for (Integer integer : first) {
>  for (Integer integer1 : second) {
>  out.collect(new Tuple2<>(integer,
> integer1));
>  }
>  }
>  }
>  }
>  }).printToErr();
>  env.setParallelism(1);
>  env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>  env.execute();
>  }
> ```
>
> Thanks in advance.
>
> --
> Jiadong Lu
>


Can I setup standby taskmanagers while using reactive mode?

2023-04-25 Thread Wei Hou via user
Hi Flink community,

We are trying to use Flink’s reactive mode with Kubernetes HPA for autoscaling, 
however since the reactive mode will always use all available resources, it 
causes a problem when we need standby task managers for fast failure recover: 
The job will always use these extra standby task managers as active task 
manager to process data.

I wonder if you have any suggestion on this, should we avoid using Flink 
reactive mode together with standby task managers?

Best,
Wei




Re: FlinkKinesisConsumer cannot consume events from Kinesalite with EFO enabled

2023-04-25 Thread Danny Cranmer
Hello,

Kinesalite does not support EFO, so unfortunately you will need to hit the
real service for any end to end test.

Thanks,
Danny

On Tue, 25 Apr 2023, 20:10 Charles Tan,  wrote:

> Hi all,
>
> I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I
> noticed that when trying to consume from Kinesalite using the
> FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors.
> This is despite disabling certificate validation. Has anybody successfully
> tested FlinkKinesisConsumer with EFO enabled in this way or have any
> insights into what may be causing this?
>
> Below I provide a code snippet to reproduce this error and the stack trace
> I'm seeing. Note that when I comment out the lines that set the properties
> enabling EFO, the code snippet works successfully. Any help would be much
> appreciated.
>
> Thanks,
> Charles
>
>
> Code snippet:
> public static void main(String[] args) throws Exception {
>
> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
> "true");
>
> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> "true");
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> final Properties props = new Properties();
> props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON");
> props.put(ConsumerConfigConstants.AWS_ENDPOINT, "
> https://localhost:4567";);
> props.put(ConsumerConfigConstants.AWS_REGION,
> Region.US_EAST_1.toString());
> props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
> props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
> "FAKE_ACCESS_KEY_ID");
> props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
> "FAKE_SECRET_KEY");
>
> // Enhanced Fan-Out properties
> props.put(
> ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
> ConsumerConfigConstants.RecordPublisherType.EFO.name());
> props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME,
> "ABCDEFGHIJKLMNOP");
>
> FlinkKinesisConsumer < String > source =
> new FlinkKinesisConsumer("flinkkinesistest", new
> SimpleStringSchema(), props);
>
> DataStream < String > sourceStream = env.addSource(source);
>
> System.out.println(String.format("kinesis props: \n%s", props));
>
> sourceStream.print();
> env.execute("tester");
> }
>
> Stack trace:
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException:
> javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to re

FlinkKinesisConsumer cannot consume events from Kinesalite with EFO enabled

2023-04-25 Thread Charles Tan
Hi all,

I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I
noticed that when trying to consume from Kinesalite using the
FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors.
This is despite disabling certificate validation. Has anybody successfully
tested FlinkKinesisConsumer with EFO enabled in this way or have any
insights into what may be causing this?

Below I provide a code snippet to reproduce this error and the stack trace
I'm seeing. Note that when I comment out the lines that set the properties
enabling EFO, the code snippet works successfully. Any help would be much
appreciated.

Thanks,
Charles


Code snippet:
public static void main(String[] args) throws Exception {

System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
"true");

System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

final Properties props = new Properties();
props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
props.put(ConsumerConfigConstants.AWS_ENDPOINT, "https://localhost:4567
");
props.put(ConsumerConfigConstants.AWS_REGION,
Region.US_EAST_1.toString());
props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"FAKE_ACCESS_KEY_ID");
props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"FAKE_SECRET_KEY");

// Enhanced Fan-Out properties
props.put(
ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
ConsumerConfigConstants.RecordPublisherType.EFO.name());
props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME,
"ABCDEFGHIJKLMNOP");

FlinkKinesisConsumer < String > source =
new FlinkKinesisConsumer("flinkkinesistest", new
SimpleStringSchema(), props);

DataStream < String > sourceStream = env.addSource(source);

System.out.println(String.format("kinesis props: \n%s", props));

sourceStream.print();
env.execute("tester");
}

Stack trace:
org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException:
javax.net.ssl.SSLHandshakeException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307)
at
java.base/sun.security.ssl.CertificateMessage$T13C

FileSource for unbounded data

2023-04-25 Thread Lorenzo Nicora
Hi
I understand the FileSystem DataStream FileSource remembers in state all
the processed files, forever.
This causes the state to grow unbounded, making FileSource impractical to
use in a stateful application.

Is there any known workaround?

Thanks
Lorenzo


how to configure window of join operator in batch mode

2023-04-25 Thread Jiadong Lu

Hello,everyone,

I am confused about the window of join/coGroup operator in Batch mode.
Here is my demo code, and it works fine for me at present. I wonder if 
this approach that using process time window in batch mode is 
appropriate? and does this approach have any problems? I want to use 
this solution to solve my problem(join two stream in batch mode).


```java
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


DataStream s1 = env.fromCollection(Stream.of(1, 2, 3, 
4, 5, 6, 7).collect(Collectors.toList()));
DataStream s2 = env.fromCollection(Stream.of(6, 5, 4, 
3, 2, 1).collect(Collectors.toList()));


s1.coGroup(s2)
.where(new KeySelector() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
})
.equalTo(new KeySelector() {
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}).window(TumblingProcessingTimeWindows.of(Time.days(1)))
.apply(new CoGroupFunctionTuple2>() {

@Override
public void coGroup(Iterable first, 
Iterable second, Collector> out) 
throws Exception {

if (!second.iterator().hasNext()) {
for (Integer integer : first) {
out.collect(new Tuple2<>(integer, null));
}
} else {
for (Integer integer : first) {
for (Integer integer1 : second) {
out.collect(new Tuple2<>(integer, 
integer1));

}
}
}
}
}).printToErr();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.execute();
}
```

Thanks in advance.

--
Jiadong Lu


File Source Limitations

2023-04-25 Thread Kirti Dhar Upadhyay K via user
Hi Community,

I am planning to use FileSource (with S3) in my application. Hence encountered 
with below limitations:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#current-limitations



  1.  Watermarking does not work very well for large backlogs of files. This is 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark.
Ques: Is there any ideal use case/settings/configurations where this problem 
does not come into picture? OR can be avoided?


  1.  For Unbounded File Sources, the enumerator currently remembers paths of 
all already processed files, which is a state that can, in some cases, grow 
rather large.
Ques: As a workaround of this problem, what if I configure a state backend (say 
RocksDBStateBackend) with some configured TTL, which shall automatically delete 
the older data. Is there any repercussions of this?


Regards,
Kirti Dhar