Re: Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey Yaroslav!,

I suppose I will try it like this. The lookup would indeed be nice too, I will 
have a closer look at the corresponding source code. Thanks!

-Theo
> On 29. Nov 2022, at 17:41, Yaroslav Tkachenko  wrote:
> 
> Hey Theodor,
> 
> That's pretty much it, assuming your Protobuf schema is more or less fixed. 
> But for a production workload, you'd need to add a Schema Registry lookup. I 
> guess the implementation for that would be similar to what's in the Avro 
> format.
> 
> On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker  > wrote:
> Hey all,
> 
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
> create messages with a magic byte followed by a 4 byte schema id followed by 
> the actual payload (refer the docs 
> ).
>  When I try to read such messages with the regular Protobuf, Avro and JSON 
> formats in my Table API Program, it of course does not work. For Avro, Flink 
> also has a Confluent-Avro format that can deal with this. However for 
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
> ,
>  but I cannot wait for this. Hence I wonder, how much effort it would be, to 
> implement this myself - not in a production-ready way, but just in a way that 
> makes my program not break. Meaning I would be happy with a solution that 
> just ignores the first 5 bytes and passes the rest on to the existing 
> handlers of Protobuf and JSON formats.
> 
> 
> Now lets take for Example the existing Protobuf Format: I assume I have to 
> implement the DeserializationFormatFactory, create a few Decoding and 
> Encoding Formats, just like the PbDecodingFormat for example, then a new 
> DeserializationSchema and there I would have such a method 
> :
> 
> @Override
> public RowData deserialize(byte[] message) throws IOException {
> try {
> return protoToRowConverter.convertProtoBinaryToRow(message);
> } catch (Throwable t) {
> if (formatConfig.isIgnoreParseErrors()) {
> return null;
> }
> throw new IOException("Failed to deserialize PB object.", t);
> }
> }
> But instead of converting the message immediately, I would slice the first 
> few Bytes off and go from there. Is this pretty much it, or is there more to 
> it?
> 
> -Theo
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Flink Table Kinesis sink not failing when sink fails

2022-11-29 Thread Dan Hill
My text logs don't have a stack trace with this exception.  I'm doing this
inside Flink SQL with a standard Kinesis connector and JSON formatter.

On Tue, Nov 29, 2022 at 6:38 PM yuxia  wrote:

> Which code line the error message happens? Maybe it will swallow the
> exception and then log the error message, in which case Flink job won't
> fail since it seems like no exception happens.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Dan Hill" 
> *收件人: *"User" 
> *发送时间: *星期三, 2022年 11 月 30日 上午 8:06:52
> *主题: *Flink Table Kinesis sink not failing when sink fails
>
> I set up a simple Flink SQL job (Flink v1.14.4) that writes to Kinesis.
> The job looks healthy but the records are not being written.  I did not
> give enough IAM permissions to write to Kinesis.  However, the Flink SQL
> job acts like it's healthy and checkpoints even though the Kinesis PutRecords
> call fails.  I'd expect this error to kill the Flink job.
> I looked through Flink Jira and the Flink user group but didn't see a
> similar issue.
>
> Is the silent failure a known issue?  If the Flink job doesn't fail, it'll
> be hard to detect production issues.
>
> ```
>
> 2022-11-29 23:30:27,587 ERROR 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  [] - [2022-11-29 23:30:27.578072] [0x1e3b][0x7f12ef8fc700] [error] 
> [AWS Log: ERROR](AWSClient)HTTP response code: 400
> Exception name: AccessDeniedException
> Error message: User: 
> arn:aws:sts::055315558257:assumed-role/dev-workers-us-east-1b-202203101433138915000a/i-09e4f747a4bdbb1f0
>  is not authorized to perform: kinesis:ListShards on resource: 
> arn:aws:kinesis:us-east-1:055315558257:stream/dan-dev-content-metrics because 
> no identity-based policy allows the kinesis:ListShards action
> 6 response headers:
> connection : close
> content-length : 379
> content-type : application/x-amz-json-1.1
> date : Tue, 29 Nov 2022 23:30:27 GMT
> x-amz-id-2 : 
> q8kuplUOMJILzVU97YA+TYSyy6aozeoST+yws26rOkyzEUUZT0zKBdcMWUAjV/8RrnMeed/+em7CbjpwzGYEANgkwCihZWdC
> x-amzn-requestid : e4a39674-66fa-4dcd-b8a3-0e273e5e628a
>
> ```
>
>


Re: Query about flink job manager dashboard

2022-11-29 Thread naga sudhakar
After disabling the cancel, submit flags facing issues with below api
calls.

1) /jars giving 404
2) /jars/upload
3) /jars/{jarid}/run

Is there any config changes needed to have these apis work?


On Mon, 28 Nov, 2022, 7:00 PM naga sudhakar,  wrote:

> Hi,
> We are able to disable this cancela nd upload otpion in ui.
> But this is having issues with endpoints for below.
> Get call for /jars to list all uploaded jars and post call
> /jars/{jarid}/run are giving 404 after disabling the two flags.
> Is the process of uploading jars and running a jar with specific id
> changes after this change?
>
> Please suggest.
>
> Thanks & Regards,
> Nagasudhakar
>
> On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
> wrote:
>
>> Hi,
>>
>> 1) No, that's currently not possible.
>> 2) You could consider disabling to disallow uploading new JARs and/or
>> cancelling jobs from the UI. See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
>> wrote:
>>
>>> Hi Team,
 Greetings!!!
 I am a software developer using apache flink and deploying flink jobs
 using the same. I have two queries about flink job manager dashboard. Can
 you please help with below?

 1) is it possible to add login mechanism for the flink job manager dash
 board and have a role based mechanism for viewing running jobs, cancelling
 jobs, adding the jobs?
 2) is it possible to disable to dash bord display but use api to do the
 same operations using API?


 Thanks,
 Nagasudhakar.

>>>


ElasticsearchSink 设置es 主分片数

2022-11-29 Thread allanqinjy


hi,
flink streaming(版本1.12.5) 写es的时候ElasticsearchSink.Builder发现没有设置配置的地方,比如要想设置 
number_of_shards。哪位大佬知道,请教一下!


ElasticsearchSink.BuilderesSinkBuilder=newElasticsearchSink.Builder<>(httpHosts,newElasticsearchSinkFunction(){publicIndexRequestcreateIndexRequest(Stringelement){Mapjson=newHashMap<>();json.put("data",element);returnRequests.indexRequest().index("my-index").type("my-type").source(json);}@Overridepublicvoidprocess(Stringelement,RuntimeContextctx,RequestIndexerindexer){indexer.add(createIndexRequest(element));}})
| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制



回复: Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 Thread 仙路尽头谁为峰
Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。
Json主要是贴到Plan Visualizer 开发和调试用。
https://flink.apache.org/visualizer/
从 Windows 版邮件发送

发件人: yidan zhao
发送时间: 2022年11月30日 10:12
收件人: user-zh@flink.apache.org
主题: Re: Re: 怎样从flink执行计划json生成StreamGraph?

好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。

casel.chen  于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
> >并不需要从执行计划json生成streamGraph呀~
> >streamGraph提交之前直接转jobGraph。
> >
> >casel.chen  于2022年11月28日周一 08:53写道:
> >>
> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教



Re: Flink Table Kinesis sink not failing when sink fails

2022-11-29 Thread yuxia
Which code line the error message happens? Maybe it will swallow the exception 
and then log the error message, in which case Flink job won't fail since it 
seems like no exception happens. 

Best regards, 
Yuxia 


发件人: "Dan Hill"  
收件人: "User"  
发送时间: 星期三, 2022年 11 月 30日 上午 8:06:52 
主题: Flink Table Kinesis sink not failing when sink fails 

I set up a simple Flink SQL job (Flink v1.14.4) that writes to Kinesis . The 
job looks healthy but the records are not being written. I did not give enough 
IAM permissions to write to Kinesis . However, the Flink SQL job acts like it's 
healthy and checkpoints even though the Kinesis PutRecords call fails. I'd 
expect this error to kill the Flink job. 
I looked through Flink Jira and the Flink user group but didn't see a similar 
issue. 

Is the silent failure a known issue? If the Flink job doesn't fail, it'll be 
hard to detect production issues. 

``` 
2022 - 11 - 29 23 : 30 : 27 , 587 ERROR org.apache.flink. kinesis .shaded. com 
. amazonaws . services . kinesis . producer . LogInputStreamReader [] - [ 2022 
- 11 - 29 23 : 30 : 27.578072 ] [ 0 x1e3b][ 0 x7f12ef8fc700] [error] 
[AWS Log: ERROR](AWSClient)HTTP response code: 400 
Exception name: AccessDeniedException 
Error message: User : arn:aws:sts:: 055315558257 
:assumed-role/dev-workers-us-east-1b-202203101433138915000a/i-09e4f747a4bdbb1f0
 is not authorized to perform: kinesis :ListShards on resource: arn:aws: 
kinesis :us-east-1: 055315558257 :stream/dan-dev-content-metrics because no 
identity -based policy allows the kinesis :ListShards action 
6 response headers: 
connection : close 
content-length : 379 
content-type : application/x-amz-json-1. 1 
date : Tue, 29 Nov 2022 23 : 30 : 27 GMT 
x-amz-id-2 : q8kuplUOMJILzVU97YA+TYSyy6aozeoST+yws26rOkyzEUUZT0zKBdcMWUAjV/ 8 
RrnMeed/+em7CbjpwzGYEANgkwCihZWdC 
x-amzn-requestid : e4a39674-66fa-4dcd-b8a3-0e273e5e628a 
``` 



Re: Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 Thread yidan zhao
好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。

casel.chen  于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-11-29 10:07:40,"yidan zhao"  写道:
> >并不需要从执行计划json生成streamGraph呀~
> >streamGraph提交之前直接转jobGraph。
> >
> >casel.chen  于2022年11月28日周一 08:53写道:
> >>
> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Flink Table Kinesis sink not failing when sink fails

2022-11-29 Thread Dan Hill
I set up a simple Flink SQL job (Flink v1.14.4) that writes to Kinesis.
The job looks healthy but the records are not being written.  I did not
give enough IAM permissions to write to Kinesis.  However, the Flink SQL
job acts like it's healthy and checkpoints even though the Kinesis PutRecords
call fails.  I'd expect this error to kill the Flink job.

I looked through Flink Jira and the Flink user group but didn't see a
similar issue.

Is the silent failure a known issue?  If the Flink job doesn't fail, it'll
be hard to detect production issues.

```

2022-11-29 23:30:27,587 ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
[] - [2022-11-29 23:30:27.578072] [0x1e3b][0x7f12ef8fc700]
[error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: AccessDeniedException
Error message: User:
arn:aws:sts::055315558257:assumed-role/dev-workers-us-east-1b-202203101433138915000a/i-09e4f747a4bdbb1f0
is not authorized to perform: kinesis:ListShards on resource:
arn:aws:kinesis:us-east-1:055315558257:stream/dan-dev-content-metrics
because no identity-based policy allows the kinesis:ListShards action
6 response headers:
connection : close
content-length : 379
content-type : application/x-amz-json-1.1
date : Tue, 29 Nov 2022 23:30:27 GMT
x-amz-id-2 : 
q8kuplUOMJILzVU97YA+TYSyy6aozeoST+yws26rOkyzEUUZT0zKBdcMWUAjV/8RrnMeed/+em7CbjpwzGYEANgkwCihZWdC
x-amzn-requestid : e4a39674-66fa-4dcd-b8a3-0e273e5e628a

```


Re: "An illegal reflective access operation has occurred" during KeyedStream process

2022-11-29 Thread Curtis Jensen
I changed the ".map(...)" and ".print()" terminal statement to :
.executeAndCollect()
.forEachRemaining(System.out::println);

The warnings were replaced with:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
com.twitter.chill.java.ArraysAsListSerializer
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
to field java.util.Arrays$ArrayList.a
WARNING: Please consider reporting this to the maintainers of
com.twitter.chill.java.ArraysAsListSerializer
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen  wrote:
>
> Hello,
>
> Using Flink version 1.15.0, I recieve these warnings when trying a
> small example (code below):
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further
> illegal reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> I am undoubtedly doing something incorrectly, but felt that it may be
> useful to take the advice "Please consider reporting this to the
> maintainers of org.apache.flink.api.java.ClosureCleaner".
> Also, any corrections to my example would be appreciated.
>
> Thanks,
> Curtis
>
>
>
>
> AvgAmount.java
>
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> public class AvgAmount {
>
>   public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> DataStream purchaseStream =
> env.fromElements(ExampleData.PURCHASE_EVENTS);
> KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
> event.account_id);
> keyedPurchaseStream.process(new PurchaseEventProcessor())
> .map(stats -> stats.toString())
> .print();
>   }
>
>   public static class PurchaseStats {
> public String accountId;
> public long amountSum;
> public long amountCount;
>
> public PurchaseStats(String accountId) {
>   this.accountId = accountId;
> }
>
> public void addAmount(long amount) {
>   amountSum += amount;
>   amountCount += 1;
> }
>
> @Override
> public String toString() {
>   return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
> accountId, (double)amountSum/(double)amountCount);
> }
>   }
>
>   public static class PurchaseEventProcessor extends
> KeyedProcessFunction
> {
> ValueState seen;
>
> @Override
> public void open(Configuration parameters) {
>   seen = getRuntimeContext().getState(new
> ValueStateDescriptor<>("seen", PurchaseStats.class));
> }
>
> @Override
> public void processElement(ExampleData.PurchaseEvent
> purchaseEvent, KeyedProcessFunction PurchaseStats>.Context context, Collector out) throws
> Exception {
>   PurchaseStats currentStats = seen.value();
>   if (currentStats == null) {
> currentStats = new PurchaseStats(purchaseEvent.account_id);
>   }
>
>   currentStats.addAmount(purchaseEvent.amount);
>
>   seen.update(currentStats);
>   out.collect(currentStats);
> }
>   }
> }
>
> ExampleData.java
>
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Instant;
>
> public class ExampleData {
> public static final PurchaseEvent[] PURCHASE_EVENTS =
> new PurchaseEvent[] {
> new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
> new PurchaseEvent("1337", "127.0.0.1", 1000),
> new PurchaseEvent("1337", "127.0.0.2", 100),
> new PurchaseEvent("1337", "127.0.0.1", 9900)
> };
>
> public static class PurchaseEvent {
> public long timestamp;
> public String account_id;
> public String ip;
> public long amount;
>
> public PurchaseEvent() { }
>
> public 

"An illegal reflective access operation has occurred" during KeyedStream process

2022-11-29 Thread Curtis Jensen
Hello,

Using Flink version 1.15.0, I recieve these warnings when trying a
small example (code below):
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

I am undoubtedly doing something incorrectly, but felt that it may be
useful to take the advice "Please consider reporting this to the
maintainers of org.apache.flink.api.java.ClosureCleaner".
Also, any corrections to my example would be appreciated.

Thanks,
Curtis




AvgAmount.java

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class AvgAmount {

  public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DataStream purchaseStream =
env.fromElements(ExampleData.PURCHASE_EVENTS);
KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
event.account_id);
keyedPurchaseStream.process(new PurchaseEventProcessor())
.map(stats -> stats.toString())
.print();
  }

  public static class PurchaseStats {
public String accountId;
public long amountSum;
public long amountCount;

public PurchaseStats(String accountId) {
  this.accountId = accountId;
}

public void addAmount(long amount) {
  amountSum += amount;
  amountCount += 1;
}

@Override
public String toString() {
  return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
accountId, (double)amountSum/(double)amountCount);
}
  }

  public static class PurchaseEventProcessor extends
KeyedProcessFunction
{
ValueState seen;

@Override
public void open(Configuration parameters) {
  seen = getRuntimeContext().getState(new
ValueStateDescriptor<>("seen", PurchaseStats.class));
}

@Override
public void processElement(ExampleData.PurchaseEvent
purchaseEvent, KeyedProcessFunction.Context context, Collector out) throws
Exception {
  PurchaseStats currentStats = seen.value();
  if (currentStats == null) {
currentStats = new PurchaseStats(purchaseEvent.account_id);
  }

  currentStats.addAmount(purchaseEvent.amount);

  seen.update(currentStats);
  out.collect(currentStats);
}
  }
}

ExampleData.java

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Instant;

public class ExampleData {
public static final PurchaseEvent[] PURCHASE_EVENTS =
new PurchaseEvent[] {
new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
new PurchaseEvent("1337", "127.0.0.1", 1000),
new PurchaseEvent("1337", "127.0.0.2", 100),
new PurchaseEvent("1337", "127.0.0.1", 9900)
};

public static class PurchaseEvent {
public long timestamp;
public String account_id;
public String ip;
public long amount;

public PurchaseEvent() { }

public PurchaseEvent(String accountId, String ip, long amount) {
this(Instant.now().getEpochSecond(), accountId, ip, amount);
}

public PurchaseEvent(long timestamp, String accountId, String
ip, long amount) {
this.timestamp = timestamp;
this.account_id = accountId;
this.ip = ip;
this.amount = amount;
}
}
}


Re: Flink 1.15.3 Docker image

2022-11-29 Thread Márton Balassi
Done, please let me know if you see anything unexpected.

On Tue, Nov 29, 2022 at 7:07 PM Márton Balassi 
wrote:

> Hi Ben,
>
> Thanks for reaching out. Since the image repo has been updated [1] I can
> pick this up. Will let you know when done.
>
> [1]
> https://github.com/apache/flink-docker/commit/a22c0f04972a1d8539d9213b52fc0728eac8c1fa
>
> On Tue, Nov 29, 2022 at 4:28 PM Roberts, Ben (Senior Developer) via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>>
>>
>> Is there anyone able to publish the apache/flink docker image for release
>> 1.15.3?
>>
>>
>>
>> I see that the release was announced on 2022-11-10, and the PR for the
>> 1.15.3 dockerfiles (linked:
>> https://github.com/apache/flink-docker/pull/140) has been approved and
>> merged on 2022-11-25, but it looks like the release is yet to be published
>> to Dockerhub. (https://hub.docker.com/r/apache/flink/tags was last
>> updated 8 days ago.)
>>
>>
>>
>> According to the README
>>  there are a
>> limited number of people able to publish new docker images, are any in this
>> mailing list able to help?
>>
>>
>>
>> Thanks in advance
>>
>>
>>
>> --
>>
>> *Ben Roberts*
>>
>>
>> Information in this email including any attachments may be privileged,
>> confidential and is intended exclusively for the addressee. The views
>> expressed may not be official policy, but the personal views of the
>> originator. If you have received it in error, please notify the sender by
>> return e-mail and delete it from your system. You should not reproduce,
>> distribute, store, retransmit, use or disclose its contents to anyone.
>> Please note we reserve the right to monitor all e-mail communication
>> through our internal and external networks. SKY and the SKY marks are
>> trademarks of Sky Limited and Sky International AG and are used under
>> licence.
>>
>> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
>> (Registration No. 2067075), Sky Subscribers Services Limited (Registration
>> No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or
>> indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the
>> companies mentioned in this paragraph are incorporated in England and Wales
>> and share the same registered office at Grant Way, Isleworth, Middlesex TW7
>> 5QD
>>
>


Re: Flink 1.15.3 Docker image

2022-11-29 Thread Márton Balassi
Hi Ben,

Thanks for reaching out. Since the image repo has been updated [1] I can
pick this up. Will let you know when done.

[1]
https://github.com/apache/flink-docker/commit/a22c0f04972a1d8539d9213b52fc0728eac8c1fa

On Tue, Nov 29, 2022 at 4:28 PM Roberts, Ben (Senior Developer) via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> Is there anyone able to publish the apache/flink docker image for release
> 1.15.3?
>
>
>
> I see that the release was announced on 2022-11-10, and the PR for the
> 1.15.3 dockerfiles (linked:
> https://github.com/apache/flink-docker/pull/140) has been approved and
> merged on 2022-11-25, but it looks like the release is yet to be published
> to Dockerhub. (https://hub.docker.com/r/apache/flink/tags was last
> updated 8 days ago.)
>
>
>
> According to the README
>  there are a
> limited number of people able to publish new docker images, are any in this
> mailing list able to help?
>
>
>
> Thanks in advance
>
>
>
> --
>
> *Ben Roberts*
>
>
> Information in this email including any attachments may be privileged,
> confidential and is intended exclusively for the addressee. The views
> expressed may not be official policy, but the personal views of the
> originator. If you have received it in error, please notify the sender by
> return e-mail and delete it from your system. You should not reproduce,
> distribute, store, retransmit, use or disclose its contents to anyone.
> Please note we reserve the right to monitor all e-mail communication
> through our internal and external networks. SKY and the SKY marks are
> trademarks of Sky Limited and Sky International AG and are used under
> licence.
>
> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> (Registration No. 2067075), Sky Subscribers Services Limited (Registration
> No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or
> indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the
> companies mentioned in this paragraph are incorporated in England and Wales
> and share the same registered office at Grant Way, Isleworth, Middlesex TW7
> 5QD
>


Re: Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Yaroslav Tkachenko
Hey Theodor,

That's pretty much it, assuming your Protobuf schema is more or less fixed.
But for a production workload, you'd need to add a Schema Registry lookup.
I guess the implementation for that would be similar to what's in the Avro
format.

On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker 
wrote:

> Hey all,
>
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON
> that create messages with a magic byte followed by a 4 byte schema id
> followed by the actual payload (refer the docs
> ).
> When I try to read such messages with the regular Protobuf, Avro and JSON
> formats in my Table API Program, it of course does not work. For Avro,
> Flink also has a Confluent-Avro format that can deal with this. However for
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the
> JIRA
> ,
> but I cannot wait for this. Hence I wonder, how much effort it would be, to
> implement this myself - not in a production-ready way, but just in a way
> that makes my program not break. Meaning I would be happy with a solution
> that just ignores the first 5 bytes and passes the rest on to the existing
> handlers of Protobuf and JSON formats.
>
>
> Now lets take for Example the existing Protobuf Format: I assume I have to
> implement the DeserializationFormatFactory, create a few Decoding and
> Encoding Formats, just like the PbDecodingFormat for example, then a new
> DeserializationSchema and there I would have such a method
> 
> :
>
> @Override
> public RowData deserialize(byte[] message) throws IOException {
> try {
> return protoToRowConverter.convertProtoBinaryToRow(message);
> } catch (Throwable t) {
> if (formatConfig.isIgnoreParseErrors()) {
> return null;
> }
> throw new IOException("Failed to deserialize PB object.", t);
> }
> }
>
> But instead of converting the message immediately, I would slice the first
> few Bytes off and go from there. Is this pretty much it, or is there more
> to it?
>
> -Theo
>
>


Re:Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-29 Thread casel.chen
如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?

















在 2022-11-29 10:07:40,"yidan zhao"  写道:
>并不需要从执行计划json生成streamGraph呀~
>streamGraph提交之前直接转jobGraph。
>
>casel.chen  于2022年11月28日周一 08:53写道:
>>
>> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-29 Thread Leonard Xu


> On Nov 4, 2022, at 2:34 PM, 左岩 <13520871...@163.com> wrote:
> 
> tenv.executeSql("xxx);
> env.execute();


这样使用是不对的,你可以看下这两个方法的java doc

祝好,
Leonard

Flink 1.15.3 Docker image

2022-11-29 Thread Roberts, Ben (Senior Developer) via user
Hi,

Is there anyone able to publish the apache/flink docker image for release 
1.15.3?

I see that the release was announced on 2022-11-10, and the PR for the 1.15.3 
dockerfiles (linked: https://github.com/apache/flink-docker/pull/140) has been 
approved and merged on 2022-11-25, but it looks like the release is yet to be 
published to Dockerhub. (https://hub.docker.com/r/apache/flink/tags was last 
updated 8 days ago.)

According to the 
README there are a 
limited number of people able to publish new docker images, are any in this 
mailing list able to help?

Thanks in advance

--
Ben Roberts

Information in this email including any attachments may be privileged, 
confidential and is intended exclusively for the addressee. The views expressed 
may not be official policy, but the personal views of the originator. If you 
have received it in error, please notify the sender by return e-mail and delete 
it from your system. You should not reproduce, distribute, store, retransmit, 
use or disclose its contents to anyone. Please note we reserve the right to 
monitor all e-mail communication through our internal and external networks. 
SKY and the SKY marks are trademarks of Sky Limited and Sky International AG 
and are used under licence.

Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited 
(Registration No. 2067075), Sky Subscribers Services Limited (Registration No. 
2340150) and Sky CP Limited (Registration No. 9513259) are direct or indirect 
subsidiaries of Sky Limited (Registration No. 2247735). All of the companies 
mentioned in this paragraph are incorporated in England and Wales and share the 
same registered office at Grant Way, Isleworth, Middlesex TW7 5QD


Re: flink sql接cdc数据源按最新数据统计问题

2022-11-29 Thread Leonard Xu


> On Nov 29, 2022, at 8:32 AM, casel.chen  wrote:
> 
> 业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink 
> sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对


会的,可以看下flink sql相关的原理文章,百度/谷歌一搜一大把。

祝好
Leonard


> 
> 
> select 
>  s.biddate, 
>  s.supplier, 
>  sum(s.price) 
> from 
>  (
>select 
>  * 
>from 
>  (
>select 
>  biddate, 
>  supplier, 
>  price, 
>  ROW_NUMBER() OVER (
>PARTITION BY biddate, 
>supplier 
>ORDER BY 
>  bidtime DESC
>  ) as rownum 
>from 
>  (
>select 
>  bidtime, 
>  date_format(bidtime, '-MM-dd-HH') as biddate, 
>  supplier, 
>  price 
>from 
>  orders
>  )
>  ) as t 
>where 
>  t.rownum = 1
>  ) as s 
> group by 
>  s.biddate, 
>  s.supplier
> ;
> 



[SUMMARY] Flink 1.17 Release Sync 11/29/2022

2022-11-29 Thread Qingsheng Ren
Hi devs and users,

I'd like to share some highlights from the release sync on 11/29/2022.

1. @Contributors please update your progress on the release 1.17 wiki page
[1] before the sync meeting so that everyone could track it.

2. We have new CI stability tickets and owners should have been pinged on
JIRA or offline. Please take a look at your inbox.

3. Externalizing connectors is progressing well. We could expect most
connectors being externalized in the 1.17 release cycle.

4. It’ll be great to have some input for monitoring the performance test.
Currently there’s a slack channel reporting benchmark results periodically
as discussed in the dev ML [2], and we’re working on formalizing the work
on regression testing.

There are 7 weeks before the feature freeze on 1/17/2023. The next release
meeting will be on December 13th, 2022. Feel free to join us if you
are interested!

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126

Best regards,
Leonard, Martijn, Matthias and Qingsheng

[1] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[2] https://lists.apache.org/thread/zok62sx4m50c79htfp18ymq5vmtgbgxj


Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey all,

so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
create messages with a magic byte followed by a 4 byte schema id followed by 
the actual payload (refer the docs 
).
 When I try to read such messages with the regular Protobuf, Avro and JSON 
formats in my Table API Program, it of course does not work. For Avro, Flink 
also has a Confluent-Avro format that can deal with this. However for Protobuf 
and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
,
 but I cannot wait for this. Hence I wonder, how much effort it would be, to 
implement this myself - not in a production-ready way, but just in a way that 
makes my program not break. Meaning I would be happy with a solution that just 
ignores the first 5 bytes and passes the rest on to the existing handlers of 
Protobuf and JSON formats.


Now lets take for Example the existing Protobuf Format: I assume I have to 
implement the DeserializationFormatFactory, create a few Decoding and Encoding 
Formats, just like the PbDecodingFormat for example, then a new 
DeserializationSchema and there I would have such a method 
:

@Override
public RowData deserialize(byte[] message) throws IOException {
try {
return protoToRowConverter.convertProtoBinaryToRow(message);
} catch (Throwable t) {
if (formatConfig.isIgnoreParseErrors()) {
return null;
}
throw new IOException("Failed to deserialize PB object.", t);
}
}
But instead of converting the message immediately, I would slice the first few 
Bytes off and go from there. Is this pretty much it, or is there more to it?

-Theo



smime.p7s
Description: S/MIME cryptographic signature


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Just to be clear, I don't think the operator must have special logic to
find out if a savepoint was used as base for an incremental checkpoint,
however, the operator logic might want to completely disable savepoint
cleanup for a deployment if the user enabled CLAIM mode for it. At least
that sounds like the safer option to me.

Regards,
Alexis.

On Tue, 29 Nov 2022, 10:31 Gyula Fóra,  wrote:

> The operator might call dispose on an old savepoint that’s true, but I am
> not sure if the dispose api call would actually corrupt it.
>
> Gyula
>
> On Tue, 29 Nov 2022 at 09:28, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Hangxiang,
>>
>> but, if I understand correctly, setting restore mode to CLAIM means that
>> the job might create a new incremental checkpoint based on the savepoint,
>> right? And if the operator then decides to clean up the savepoint, the
>> checkpoint would be corrupted, no?
>>
>> Regards,
>> Alexis.
>>
>> Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu <
>> master...@gmail.com>:
>>
>>> Hi, Alexis.
>>> IIUC, There is no conflict between savepoint history and restore mode.
>>> Restore mode cares about whether/how we manage the savepoint of old job.
>>> Savepoint management in operator only cares about savepoint history of
>>> new job.
>>> In other words, savepoint cleanup should not clean the savepoint from
>>> the old job which should only be controlled by restore mode.
>>> So I think you could also set restore mode according to your needs.
>>>
>>>
>>> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
 Hello,

 Is there a recommended configuration for the restore mode of jobs
 managed by the operator?

 Since the documentation states that the operator keeps a savepoint
 history to perform cleanup, I imagine restore mode should always be
 NO_CLAIM, but I just want to confirm.

 Regards,
 Alexis.

>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Gyula Fóra
The operator might call dispose on an old savepoint that’s true, but I am
not sure if the dispose api call would actually corrupt it.

Gyula

On Tue, 29 Nov 2022 at 09:28, Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Hangxiang,
>
> but, if I understand correctly, setting restore mode to CLAIM means that
> the job might create a new incremental checkpoint based on the savepoint,
> right? And if the operator then decides to clean up the savepoint, the
> checkpoint would be corrupted, no?
>
> Regards,
> Alexis.
>
> Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu <
> master...@gmail.com>:
>
>> Hi, Alexis.
>> IIUC, There is no conflict between savepoint history and restore mode.
>> Restore mode cares about whether/how we manage the savepoint of old job.
>> Savepoint management in operator only cares about savepoint history of
>> new job.
>> In other words, savepoint cleanup should not clean the savepoint from the
>> old job which should only be controlled by restore mode.
>> So I think you could also set restore mode according to your needs.
>>
>>
>> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is there a recommended configuration for the restore mode of jobs
>>> managed by the operator?
>>>
>>> Since the documentation states that the operator keeps a savepoint
>>> history to perform cleanup, I imagine restore mode should always be
>>> NO_CLAIM, but I just want to confirm.
>>>
>>> Regards,
>>> Alexis.
>>>
>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

but, if I understand correctly, setting restore mode to CLAIM means that
the job might create a new incremental checkpoint based on the savepoint,
right? And if the operator then decides to clean up the savepoint, the
checkpoint would be corrupted, no?

Regards,
Alexis.

Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu :

> Hi, Alexis.
> IIUC, There is no conflict between savepoint history and restore mode.
> Restore mode cares about whether/how we manage the savepoint of old job.
> Savepoint management in operator only cares about savepoint history of new
> job.
> In other words, savepoint cleanup should not clean the savepoint from the
> old job which should only be controlled by restore mode.
> So I think you could also set restore mode according to your needs.
>
>
> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a recommended configuration for the restore mode of jobs managed
>> by the operator?
>>
>> Since the documentation states that the operator keeps a savepoint
>> history to perform cleanup, I imagine restore mode should always be
>> NO_CLAIM, but I just want to confirm.
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>