Re: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-12 Thread Fuyao Li
Hi Tejas,

Yes, you can write a typefactory for enum. But I am assuming Flink should be 
able to recognize enum by default…

Anyways, you can do something like this:

Types.ENUM(RuleType.class);

This will return you a TypeInfomation which can be used to construct a 
typefactory..

BTW, could you take a look at my question in email: “How to define 
TypeInformation for Flink recursive resolved POJO”  ?

Thanks,
Fuyao




From: Tejas B 
Date: Thursday, May 12, 2022 at 16:32
To: Weihua Hu 
Cc: user 
Subject: [External] : Re: How to get flink to use POJO serializer when enum is 
present in POJO class
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any 
of the 1 provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
 ... 11 more Caused by: 
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to 
restore operator state backend at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
 at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find 
class: 11 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

From the error it looks like it's falling back to Kryo serializer instead of 
POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu 
mailto:huweihua@gmail.com>> wrote:
Hi, Tejas

These code is works in my idea environment.
Could you provide more error info or log?


Best,
Weihua


2022年5月10日 下午1:22,Tejas B mailto:tejasub1...@gmail.com>> 
写道:

Hi,
I am trying to get flink schema evolution to work for me using POJO serializer. 
But I found out that if an enum is present in the POJO then the POJO serializer 
is not used. Example of my POJO is as follows :

public class Rule {

String id;

int val;

RuleType ruleType;

//Newly added field

//int val2 = 0;



public Rule() {}



public Rule(String id, int val, RuleType ruleType) {

this.id = id;

this.val = val;

this.ruleType = ruleType;

//this.val2 = val2;

}



public String getId() {

return id;

}



public void setId(String id) {

this.id = id;

}



public int getVal() {

return val;

}



public void setVal(int val) {

this.val = val;

}



public RuleType getRuleType() {

return ruleType;

}



public void setRuleType(RuleType ruleType) {

this.ruleType = ruleType;

}



//public int getVal2() {

//return val2;

//}



//public void setVal2(int val2) {

//this.val2 = val2;

//}



@Override

public boolean equals(Object o) {

if (this == o) return true;

if (o == null || getClass() != o.getClass()) return false;

Rule rule = (Rule) o;

return val == rule.val && 
id.equals(rule.id)
 && ruleType == rule.ruleType;

}



@Override

public int hashCode() {

return Objects.hash(id, val, ruleType);

}



@Override

public String toString() {

return "Rule{" +

"name='" + id + '\'' +

", val=" + val +

", ruleType=" + ruleType +

'}';

}

}

RuleType is an enum class as follows :

public enum RuleType {
X,
Y,
Z

}

Now for the Rule class the schema evolution (Adding a new field called val2), 
works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does the 
flink not recognize enum in a POJO class ?



BigDecimal数据转为指定精度的DecimalData,为什么要用

2022-05-12 Thread happygoing
Hi,All:


使用tableEnv.createTemporaryView(String path, DataStream dataStream, 
Schema 
schema)创建临时表后使用SQL查询(schema中有个字段为decimal,定义为DataTypes.DECIMAL(10,2))时。当我发送的数据小数位超过2,结果会被四舍五入,如,10.136
 -> 10.14。 后面debug时发现在数据转换时,会被四舍五入。


   想咨询下:
   1、为什么要用RoundingMode.HALF_UP,是出于什么考虑?
   2、可以调整成RoundingMode.ROUND_DOWN只取固定的小数位吗?


附录:
DecimalData转换源码
/**
 * Creates an instance of {@link DecimalData} from a {@link BigDecimal} and the 
given precision
 * and scale.
 *
 * The returned decimal value may be rounded to have the desired scale. The 
precision will be
 * checked. If the precision overflows, null will be returned.
 */
public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int 
precision, int scale) {
bd = bd.setScale(scale, RoundingMode.HALF_UP);
if (bd.precision() > precision) {
return null;
}

long longVal = -1;
if (precision <= MAX_COMPACT_PRECISION) {
longVal = bd.movePointRight(scale).longValueExact();
}
return new DecimalData(precision, scale, longVal, bd);
}

oss checkpoint fail

2022-05-12 Thread json
使用oss 存储checkpoint,做几次checkpoint就会出现下面报错,导致checkpoint失败


Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException: 
Connection error due to: Trying to access closed classloader. Please check if 
you store classloaders directly or indirectly in static fields. If the 
stacktrace suggests that the leak occurs in a third party library and cannot be 
fixed immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
[ErrorCode]: Unknown
[RequestId]: Unknown
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequestImpl(ServiceClient.java:170)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.common.comm.ServiceClient.sendRequest(ServiceClient.java:70)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.send(OSSOperation.java:83)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:145)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSOperation.doOperation(OSSOperation.java:102)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.writeObjectInternal(OSSObjectOperation.java:897)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.internal.OSSObjectOperation.putObject(OSSObjectOperation.java:129)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.putObject(OSSClient.java:471)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.OSSClient.putObject(OSSClient.java:455)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.uploadObject(AliyunOSSFileSystemStore.java:414)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSOutputStream.close(AliyunOSSOutputStream.java:87)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
 ~[?:?]
at 
org.apache.flink.fs.osshadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
 ~[?:?]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:131)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:40)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:323)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1182)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
... 10 more
Caused by: java.lang.IllegalStateException: Trying to access closed 
classloader. Please check if you store classloaders directly or indirectly in 
static fields. If the stacktrace suggests that the leak occurs in a third party 
library and cannot be fixed immediately, you can disable this check with the 
configuration 'classloader.check-leaked-classloader'.
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at org.apache.commons.logging.LogFactory$4.run(LogFactory.java:1307) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_241]
at org.apache.commons.logging.LogFactory.getResources(LogFactory.java:1325) 
~[?:?]
at 
org.apache.commons.logging.LogFactory.getConfigurationFile(LogFactory.java:1403)
 ~[?:?]
at org.apache.commons.logging.LogFactory.getFactory(LogFactory.java:455) ~[?:?]
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657) ~[?:?]
at 
org.apache.http.impl.conn.DefaultHttpResponseParser.(DefaultHttpResponseParser.java:59)
 ~[?:?]
at 
org.apache.http.impl.conn.DefaultHttpResponseParserFactory.create(DefaultHttpResponseParserFactory.java:76)
 ~[?:?]
at 
org.apache.http.impl.DefaultBHttpClientConnection.(DefaultBHttpClientConnection.java:99)
 ~[?:?]
at 
org.apache.http.impl.conn.DefaultManagedHttpClientConnection.(DefaultManagedHttpClientConnection.java:74)
 ~[?:?]
at 
org.apache.http.impl.conn.LoggingManagedHttpClientConnection.(LoggingManagedHttpClientConnection.java:66)
 ~[?:?]
at 
org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.create(ManagedHttpClientConnectionFactory.java:127)
 ~[?:?]
at 
org.apache.http.impl.conn.ManagedHttpClientConnectionFactory.create(ManagedHttpClientConnectionFactory.java:57)
 ~[?:?]
at 

Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-12 Thread Tejas B
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from
any of the 1 provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
... 11 more Caused by:
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying
to restore operator state backend at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to
find class: 11 at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

>From the error it looks like it's falling back to Kryo serializer instead
of POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu  wrote:

> Hi, Tejas
>
> These code is works in my idea environment.
> Could you provide more error info or log?
>
>
> Best,
> Weihua
>
> 2022年5月10日 下午1:22,Tejas B  写道:
>
> Hi,
> I am trying to get flink schema evolution to work for me using POJO
> serializer. But I found out that if an enum is present in the POJO then the
> POJO serializer is not used. Example of my POJO is as follows :
>
> public class Rule {
> String id;int val;
> RuleType ruleType;//Newly added field//int val2 = 0;
> public Rule() {}
> public Rule(String id, int val, RuleType ruleType) {
> this.id = id;
> this.val = val;
> this.ruleType = ruleType;
> //this.val2 = val2;
> }
> public String getId() {
> return id;
> }
> public void setId(String id) {
> this.id = id;
> }
> public int getVal() {
> return val;
> }
> public void setVal(int val) {
> this.val = val;
> }
> public RuleType getRuleType() {
> return ruleType;
> }
> public void setRuleType(RuleType ruleType) {
> this.ruleType = ruleType;
> }
> //public int getVal2() {//return val2;//}
> //public void setVal2(int val2) {//this.val2 = val2;//}
> @Overridepublic boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Rule rule = (Rule) o;
> return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
> }
> @Overridepublic int hashCode() {
> return Objects.hash(id, val, ruleType);
> }
> @Overridepublic String toString() {
> return "Rule{" +
> "name='" + id + '\'' +
> ", val=" + val +
> ", ruleType=" + ruleType +
> '}';
> }
>
> }
>
> RuleType is an enum class as follows :
>
> public enum RuleType {
> X,
> Y,
> Z
>
> }
>
> Now for the Rule class the schema evolution (Adding a new field called
> val2), works only if I write a custom typeFactory for this class.
>
> Is there a way that I can write typeFactory for the enum class ? Why does
> the flink not recognize enum in a POJO class ?
>
>
>


Re: unsubscribe

2022-05-12 Thread Henry Cai
I did this, but I am still getting emails from the flink user group.

On Wed, May 11, 2022 at 6:30 PM yuxia  wrote:

> To unsubscribe, you can send email to user-unsubscr...@flink.apache.org
>  with any object.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Henry Cai" 
> *收件人: *"User" 
> *发送时间: *星期四, 2022年 5 月 12日 上午 1:14:43
> *主题: *unsubscribe
>
> unsubscribe
>
>


Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

2022-05-12 Thread Fuyao Li
Updated the FieldDefinition class inline to avoid confusion. I am just listing 
a few fields in the class (not all). It is all following suggested POJO 
approach.

From: Fuyao Li 
Date: Thursday, May 12, 2022 at 09:46
To: Weihua Hu 
Cc: User 
Subject: Re: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed 
in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set validColumns = new HashSet<>();
}

@Data
Class FieldDefinition {
   private Metadata parentMetadata; // causing recusive resolving when type 
info is added
}

public class StringFieldDefinitionMapTypeInfoFactory extends 
TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type 
type, Map> map) {
return new MapTypeInfo(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, 
Map<>, Set<>, due to the type erase behavior. I have to provide the type 
information through type 
factory
 and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java 
type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set 
is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu 
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and 
setter functions. For a field called foo the getter and setter methods must be 
named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos



Best,
Weihua



2022年5月12日 上午3:03,Fuyao Li mailto:fuyao...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao



Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

2022-05-12 Thread Fuyao Li
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed 
in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set validColumns = new HashSet<>();
}

Class FieldDefinition {
   Metadata parentMetadata; // causing recusive resolving when type info is 
added
}

public class StringFieldDefinitionMapTypeInfoFactory extends 
TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type 
type, Map> map) {
return new MapTypeInfo(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, 
Map<>, Set<>, due to the type erase behavior. I have to provide the type 
information through type 
factory
 and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java 
type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory> {
@Override
public TypeInformation> createTypeInfo(Type type, Map> map) {
return TypeInformation.of(new TypeHint>() {});
}
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set 
is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu 
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li 
Cc: user 
Subject: [External] : Re: How to define TypeInformation for Flink recursive 
resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and 
setter functions. For a field called foo the getter and setter methods must be 
named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos



Best,
Weihua


2022年5月12日 上午3:03,Fuyao Li mailto:fuyao...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I 
define the @TypeInfo annotation correctly to avoid stack overflow exception 
when starting the application.

Basically,
Class Metadata
Map fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had 
to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and 
don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to 
Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); 
statement, there won’t be any problem during program startup.

Thanks,
Fuyao



Pulsar/Flink Error: PulsarAdminException$NotFoundException: Topic not exist

2022-05-12 Thread Jason Kania
 Hi,

I am attempting to upgrade from 1.12.7 to 1.15.0. One of the issues I am 
encountering is the following exception when attempting to submit a job from 
the command line:
switched from INITIALIZING to FAILED with failure cause: 
org.apache.pulsar.client.admin.PulsarAdminException$NotFoundException: Topic 
not exist        at 
org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:230)
        at 
org.apache.pulsar.client.admin.internal.TopicsImpl$7.failed(TopicsImpl.java:529)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:882)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:863)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at 
org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
Unfortunately, the exception does not name the topic that does not exist 
specifically. Additionally, I would expect that the topic would be created 
automatically as it has in the past under the use of Pulsar.
Can someone confirm if topics must now be created manually? If not, what 
parameter must be set to have the topic automatically created. I am likely 
missing it, but could not see.
Thanks


  

Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
That's true scala shell is removed from flink . Fortunately, Apache
Zeppelin has its own scala repl for Flink. So if Flink can support scala
2.13, I am wondering whether it is possible to integrate it into scala
shell so that user can run flink scala code in notebook like spark.

On Thu, May 12, 2022 at 11:06 PM Roman Grebennikov  wrote:

> Hi,
>
> AFAIK scala REPL was removed completely in Flink 1.15 (
> https://issues.apache.org/jira/browse/FLINK-24360), so there is nothing
> to cross-build.
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Thu, May 12, 2022, at 14:55, Jeff Zhang wrote:
>
> Great work Roman, do you think it is possible to run in scala shell as
> well?
>
> On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:
>
>
> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Roman Grebennikov
Hi,

AFAIK scala REPL was removed completely in Flink 1.15 
(https://issues.apache.org/jira/browse/FLINK-24360), so there is nothing to 
cross-build.

Roman Grebennikov | g...@dfdx.me


On Thu, May 12, 2022, at 14:55, Jeff Zhang wrote:
> Great work Roman, do you think it is possible to run in scala shell as well?
> 
> On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:
>> __
>> Hello,
>> 
>> As far as I understand discussions in this mailist, now there is almost no 
>> people maintaining the official Scala API in Apache Flink. Due to some 
>> technical complexities it will be probably stuck for a very long time on 
>> Scala 2.12 (which is not EOL yet, but quite close to):
>> * Traversable serializer relies a lot on CanBuildFrom (so it's read and 
>> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating off 
>> from this approach maintaining a savepoint compatibility can be quite a 
>> complex task.
>> * Scala API uses an implicitly generated TypeInformation, which is generated 
>> by a giant scary mkTypeInfo macro, which should be completely rewritten for 
>> Scala 3.x.
>> 
>> But even in the current state, scala support in Flink has some issues with 
>> ADT (sealed traits, popular data modelling pattern) not being natively 
>> supported, so if you use them, you have to fall back to Kryo, which is not 
>> that fast: we've seed 3x-4x throughput drops in performance tests.
>> 
>> In my current company we made a library 
>> (https://github.com/findify/flink-adt) which used Magnolia 
>> (https://github.com/softwaremill/magnolia) to do all the compile-time 
>> TypeInformation generation to make Scala ADT nice & fast in Flink. With a 
>> couple of community contributions it was now possible to cross-build it also 
>> for scala3.
>> 
>> As Flink 1.15 core is scala free, we extracted the DataStream part of Flink 
>> Scala API into a separate project, glued it together with flink-adt and 
>> ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and 
>> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this 
>> github project: https://github.com/findify/flink-scala-api
>> 
>> So technically speaking, now it's possible to migrate a scala flink job from 
>> 2.12 to 3.x with:
>> * replace flink-streaming-scala dependency with flink-scala-api (optional, 
>> both libs can co-exist in classpath on 2.12)
>> * replace all imports of org.apache.flink.streaming.api.scala._ with ones 
>> from the new library
>> * rebuild the job for 3.x
>> 
>> The main drawback is that there is no savepoint compatibility due to 
>> CanBuildFrom and different way of handling ADTs. But if you can afford 
>> re-bootstrapping the state - migration is quite straightforward.
>> 
>> The README on github https://github.com/findify/flink-scala-api#readme has 
>> some more details on how and why this project was done in this way. And the 
>> project is a bit experimental, so if you're interested in scala3 on Flink, 
>> you're welcome to share your feedback and ideas. 
>> 
>> with best regards,
>> Roman Grebennikov | g...@dfdx.me
>> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang


Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
Great work Roman, do you think it is possible to run in scala shell as well?

On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:

> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>

-- 
Best Regards

Jeff Zhang


Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Roman Grebennikov
Hello,

As far as I understand discussions in this mailist, now there is almost no 
people maintaining the official Scala API in Apache Flink. Due to some 
technical complexities it will be probably stuck for a very long time on Scala 
2.12 (which is not EOL yet, but quite close to):
* Traversable serializer relies a lot on CanBuildFrom (so it's read and 
compiled on restore), which is missing in Scala 2.13 and 3.x - migrating off 
from this approach maintaining a savepoint compatibility can be quite a complex 
task.
* Scala API uses an implicitly generated TypeInformation, which is generated by 
a giant scary mkTypeInfo macro, which should be completely rewritten for Scala 
3.x.

But even in the current state, scala support in Flink has some issues with ADT 
(sealed traits, popular data modelling pattern) not being natively supported, 
so if you use them, you have to fall back to Kryo, which is not that fast: 
we've seed 3x-4x throughput drops in performance tests.

In my current company we made a library (https://github.com/findify/flink-adt) 
which used Magnolia (https://github.com/softwaremill/magnolia) to do all the 
compile-time TypeInformation generation to make Scala ADT nice & fast in Flink. 
With a couple of community contributions it was now possible to cross-build it 
also for scala3.

As Flink 1.15 core is scala free, we extracted the DataStream part of Flink 
Scala API into a separate project, glued it together with flink-adt and 
ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and 
cross-compiled it for 2.12/2.13/3.x. You can check out the result on this 
github project: https://github.com/findify/flink-scala-api

So technically speaking, now it's possible to migrate a scala flink job from 
2.12 to 3.x with:
* replace flink-streaming-scala dependency with flink-scala-api (optional, both 
libs can co-exist in classpath on 2.12)
* replace all imports of org.apache.flink.streaming.api.scala._ with ones from 
the new library
* rebuild the job for 3.x

The main drawback is that there is no savepoint compatibility due to 
CanBuildFrom and different way of handling ADTs. But if you can afford 
re-bootstrapping the state - migration is quite straightforward.

The README on github https://github.com/findify/flink-scala-api#readme has some 
more details on how and why this project was done in this way. And the project 
is a bit experimental, so if you're interested in scala3 on Flink, you're 
welcome to share your feedback and ideas. 

with best regards,
Roman Grebennikov | g...@dfdx.me


Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-12 Thread Weihua Hu
Hi, Tejas

These code is works in my idea environment.
Could you provide more error info or log?


Best,
Weihua

> 2022年5月10日 下午1:22,Tejas B  写道:
> 
> Hi,
> I am trying to get flink schema evolution to work for me using POJO 
> serializer. But I found out that if an enum is present in the POJO then the 
> POJO serializer is not used. Example of my POJO is as follows :
> public class Rule {
> String id;
> int val;
> RuleType ruleType;
> //Newly added field
> //int val2 = 0;
> 
> public Rule() {}
> 
> public Rule(String id, int val, RuleType ruleType) {
> this.id = id;
> this.val = val;
> this.ruleType = ruleType;
> //this.val2 = val2;
> }
> 
> public String getId() {
> return id;
> }
> 
> public void setId(String id) {
> this.id = id;
> }
> 
> public int getVal() {
> return val;
> }
> 
> public void setVal(int val) {
> this.val = val;
> }
> 
> public RuleType getRuleType() {
> return ruleType;
> }
> 
> public void setRuleType(RuleType ruleType) {
> this.ruleType = ruleType;
> }
> 
> //public int getVal2() {
> //return val2;
> //}
> 
> //public void setVal2(int val2) {
> //this.val2 = val2;
> //}
> 
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Rule rule = (Rule) o;
> return val == rule.val && id.equals(rule.id ) && 
> ruleType == rule.ruleType;
> }
> 
> @Override
> public int hashCode() {
> return Objects.hash(id, val, ruleType);
> }
> 
> @Override
> public String toString() {
> return "Rule{" +
> "name='" + id + '\'' +
> ", val=" + val +
> ", ruleType=" + ruleType +
> '}';
> }
> }
> 
> RuleType is an enum class as follows :
> 
> public enum RuleType {
> X,
> Y,
> Z
> }
> Now for the Rule class the schema evolution (Adding a new field called val2), 
> works only if I write a custom typeFactory for this class.
> 
> Is there a way that I can write typeFactory for the enum class ? Why does the 
> flink not recognize enum in a POJO class ?
> 



Re: How to define TypeInformation for Flink recursive resolved POJO

2022-05-12 Thread Weihua Hu
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink 
docs[1] said:
The class must be public.

It must have a public constructor without arguments (default constructor).

All fields are either public or must be accessible through getter and setter 
functions. For a field called foo the getter and setter methods must be named 
getFoo() and setFoo().

The type of a field must be supported by a registered serializer.


[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos



Best,
Weihua

> 2022年5月12日 上午3:03,Fuyao Li  写道:
> 
> Hi Community,
>  
> I have a POJO that has nested recursively resolved structure. How should I 
> define the @TypeInfo annotation correctly to avoid stack overflow exception 
> when starting the application.
>  
> Basically,
> Class Metadata
> Map fields
>  
> Class FieldDefinition
> Metadata parentMetadata
>  
> The Metadata class got resolved recursively and causing stack overflow. I had 
> to design this way since the metadata structure model looks like this.
>  
> Is there any ways to fix this issue? Or I must treat this as a Generic type 
> and don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall 
> back to Kryo and remove the 
> streamEnvironment.getConfig().disableGenericTypes(); statement, there won’t 
> be any problem during program startup.
>  
> Thanks,
> Fuyao



回复:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 Thread 徐战辉
准备看些文档再尝试下,该问题撤回,多谢,
done.

在2022年05月12日 15:36,徐战辉 写道:


hi, Yuxia,   
这边是想咨询下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 1
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://**:8020/flink/checkpoints
state.savepoints.dir: hdfs://:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ..
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ..
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '360',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)
Best Regards

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 回复的原邮件 
| 发件人 | yuxia |
| 发送日期 | 2022年5月12日 15:16 |
| 收件人 | user-zh |
| 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

- 原始邮件 -
发件人: "徐战辉" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |


Re: Batching in kinesis sink

2022-05-12 Thread Zain Haider Nemati
Thanks for your response ! Much appreciated

On Thu, May 12, 2022 at 5:51 PM Teoh, Hong  wrote:

> Hi Zain,
>
> For Flink 1.13, we use the KinesisProducerLibrary. If you are using
> aggregation, you can control the maximum size of aggregated records by
> configuring the AggregationMaxSize in the producer config when constructing
> the FlinkKinesisProducer. (See [1] for more docs)
>
> producerConfig.put("AggregationMaxSize", "1048576”);
>
>
> However, since the default value is actually <1MB here, I doubt this is
> the issue. A possibility I can think of is that a single record is larger
> than 1MB, so the aggregation limit doesn’t apply. If this is the case,
> the way forward would be to change the record size to be lower than 1MB.
>
> In general, I would recommend upgrading to Flink 1.15 and using the newer
> KinesisStreamsSink. That sink is more configurable (see below and see [2]
> for more docs), and will surface the problem explicitly if the issue is
> really that a single record is larger than 1MB.
>
> (Note that we use the PutRecords API, so individual records still need to
> be smaller than 1MB, but batches can be up to 5MB) See [3] for more info.
>
> .setMaxBatchSizeInBytes(5 * 1024 * 1024)  
>  .setMaxRecordSizeInBytes(1 * 1024 * 1024)
>
>
>
> Thanks,
> Hong
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#kinesis-producer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/#kinesis-streams-sink
> [3]
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
>
>
>
>
> On 2022/05/12 10:30:47 Zain Haider Nemati wrote:
> > Hi,
> > I am using a kinesis sink with flink 1.13.
> > The amount of data is in millions and it choke the 1MB cap for kinesis
> data
> > streams.
> > Is there any way to send data to kinesis sink in batches of less than
> 1MB?
> > or any other workaround
> >
>


Re: Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Forgot to mention the Flink version is 1.13.2 and we use kubernetes native
mode

On Thu, May 12, 2022 at 9:18 PM tao xiao  wrote:

> Hi team,
>
> I met a weird issue when a job tries to recover from JM failure.  The
> success checkpoint before JM crashed is 41205
>
> ```
>
> {"log":"2022-05-10 14:55:40,663 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 41205 for job  (9453840 bytes in 
> 1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}
>
> ```
>
> However JM tries to recover the job with an old checkpoint 41051 which
> doesn't exist that leads to unrecoverable state
>
> ```
>
> "2022-05-10 14:59:38,949 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 41051.\n"
>
> ```
>
> Full log attached
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Incorrect checkpoint id used when job is recovering

2022-05-12 Thread tao xiao
Hi team,

I met a weird issue when a job tries to recover from JM failure.  The
success checkpoint before JM crashed is 41205

```

{"log":"2022-05-10 14:55:40,663 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Completed checkpoint 41205 for job 
(9453840 bytes in 1922
ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"}

```

However JM tries to recover the job with an old checkpoint 41051 which
doesn't exist that leads to unrecoverable state

```

"2022-05-10 14:59:38,949 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore []
- Trying to retrieve checkpoint 41051.\n"

```

Full log attached

-- 
Regards,
Tao


jm.log
Description: Binary data


RE: Batching in kinesis sink

2022-05-12 Thread Teoh, Hong
Hi Zain,

For Flink 1.13, we use the KinesisProducerLibrary. If you are using 
aggregation, you can control the maximum size of aggregated records by 
configuring the AggregationMaxSize in the producer config when constructing the 
FlinkKinesisProducer. (See [1] for more docs)


producerConfig.put("AggregationMaxSize", "1048576”);

However, since the default value is actually <1MB here, I doubt this is the 
issue. A possibility I can think of is that a single record is larger than 1MB, 
so the aggregation limit doesn’t apply. If this is the case, the way forward 
would be to change the record size to be lower than 1MB.

In general, I would recommend upgrading to Flink 1.15 and using the newer 
KinesisStreamsSink. That sink is more configurable (see below and see [2] for 
more docs), and will surface the problem explicitly if the issue is really that 
a single record is larger than 1MB.

(Note that we use the PutRecords API, so individual records still need to be 
smaller than 1MB, but batches can be up to 5MB) See [3] for more info.


.setMaxBatchSizeInBytes(5 * 1024 * 1024)
.setMaxRecordSizeInBytes(1 * 1024 * 1024)



Thanks,
Hong


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#kinesis-producer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/#kinesis-streams-sink
[3] https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html




On 2022/05/12 10:30:47 Zain Haider Nemati wrote:
> Hi,
> I am using a kinesis sink with flink 1.13.
> The amount of data is in millions and it choke the 1MB cap for kinesis data
> streams.
> Is there any way to send data to kinesis sink in batches of less than 1MB?
> or any other workaround
>


Re: How to test Flink SQL UDF with open method?

2022-05-12 Thread zhouhaifengmath






I register my job parameters as flink global parameters, and I need to get those parameters in udf's open method like:
I know in DataStream API there are test harnesses to test user-defined functions as shows in docs:So I wonder is there a similar way in Table/SQL API to set global parameters?






 


zhouhaifengmathzhouhaifengm...@gmail.com


 

On 5/12/2022 19:29,Zhanghao Chen wrote: 




Hi,




What kind of parameters do you want to get, Flink global job parameters, or some other parameters?









Best,
Zhanghao Chen







From: zhouhaifengmath 
Sent: Thursday, May 12, 2022 14:33
To: user@flink.apache.org 
Subject: How to test Flink SQL UDF with open method?
 





Hi, 
I am trying to test a flink sql udf which has open method to get some parameters with Flink1.14, but i can't find an example to set those parameters in a test. Can someone give me a example on this question? Thanks for your help~



Thanks && Regards












zhouhaifengmath





zhouhaifengm...@gmail.com














Re: http stream as input data source

2022-05-12 Thread Alexander Preuß
Hi Harald,

I was previously investigating this topic as well. There are some community
efforts for HTTP sources, please have a look at the references below:

https://getindata.com/blog/data-enrichment-flink-sql-http-connector-flink-sql-part-one/
https://github.com/getindata/flink-http-connector
https://github.com/galgus/flink-connector-http

Best regards,
Alexander

On Thu, May 12, 2022 at 1:59 PM Xuyang  wrote:

> Hi, there have not been a http source currently, and you can build the
> custom data source manually just like Yuxia said.
>
> Yuxia has given you a quick way to build the custom connector by Table
> Api. But if you want to use DataStream api to do that, you can refer to
> here[1].
>
> You can also open an issue to start a discussion in Flink community
> here[2] to let community support this feature officially.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#use-the-source
> [2] https://issues.apache.org/jira/projects/FLINK/issues/
>
> At 2022-05-12 11:37:29, "Harald Busch"  wrote:
>
> Hi,
> is there a http data stream as data source ?
> I only see socketTextStream and other predefined stream sources.
> It seems that I have to use fromCollection, fromElements ... and prepare
> the collection for myself.
> Thanks
> Regards
>
>

-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re:http stream as input data source

2022-05-12 Thread Xuyang
Hi, there have not been a http source currently, and you can build the custom 
data source manually just like Yuxia said.

Yuxia has given you a quick way to build the custom connector by Table Api. But 
if you want to use DataStream api to do that, you can refer to here[1].

You can also open an issue to start a discussion in Flink community here[2] to 
let community support this feature officially.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/#use-the-source
[2] https://issues.apache.org/jira/projects/FLINK/issues/



At 2022-05-12 11:37:29, "Harald Busch"  wrote:

Hi,
is there a http data stream as data source ?
I only see socketTextStream and other predefined stream sources.
It seems that I have to use fromCollection, fromElements ... and prepare the 
collection for myself.
Thanks
Regards

Re: How to test Flink SQL UDF with open method?

2022-05-12 Thread Zhanghao Chen
Hi,

What kind of parameters do you want to get, Flink global job parameters, or 
some other parameters?

Best,
Zhanghao Chen

From: zhouhaifengmath 
Sent: Thursday, May 12, 2022 14:33
To: user@flink.apache.org 
Subject: How to test Flink SQL UDF with open method?

Hi,
I am trying to test a flink sql udf which has open method to get some 
parameters with Flink1.14, but i can't find an example to set those parameters 
in a test. Can someone give me a example on this question? Thanks for your help~

Thanks && Regards

[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
zhouhaifengmath
zhouhaifengm...@gmail.com


Re: Converting from table to stream, following Avro schema

2022-05-12 Thread Dhavan Vaidya
Hey Dian,

Though my HTTP call's response is indeed JSON, I needed to serialize data
into Avro for Kafka sink.

Since you said it supports Row inside Row, I investigated deeper and found
out that since Row class *sorts by key names*, I had to strictly follow the
order in output_type parameter in `process()` function! For example, if my
ProcessFunction's output is defined to be following (that is, it yields a
structure of this type):

Row(
  relatedPlaylists=Row(
likes=1,
uploads="1",
  )
)

In `ds = process(MyProcessFunction(), output_type=output_type)`, the output
type must be:
Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes",
"uploads"], [Types.INT(), Types.STRING()])])

And I cannot swap the order of "likes" and "uploads". That is, the
following is wrong:
Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads",
"likes"], [Types.STRING(), Types.INT()])])

After making these changes, things indeed work out well - I am able to
convert back to table API and sink into Kafka with Confluent Avro
serialization. Though I find this ordering very cumbersome :D

On Wed, 11 May 2022 at 11:35, Dian Fu  wrote:

> Hi Dhavan,
>
> The type of the `ds`  appearing in `t_env.from_data_stream(ds) should be
> known. Otherwise, it's impossible to infer the schema of the converted
> table, as a result, `raw` type will be used which makes the schema of the
> resulting table not expected. You could either declare the type of the `ds`
> in the DataStream API via `output_type` or declare the type via
> `t_env.from_data_stream(ds, schema)` when converting DataStream to table.
>
> >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row
> `?
> It doesn't necessarily have to be Row type. You could also use other
> types, e.g. Integer, String, etc.
>
> >> 2. What should my schema (for output_type) look like given the
> structure of the message: https://pastebin.com/kEFBJSBS
> Is the output data a JSON string? If this is the case, I guess you could
> just declare the output_type as Types.STRING().
>
> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
> error:
> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
> inside `Row` is causing some issues.
> It supports `Row` inside `Row`. Could you share an example which could
> reproduce this issue?
>
> Regards,
> Dian
>
>
> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya 
> wrote:
>
>> Hello,
>>
>> I am consuming Kafka messages with Table API connector. I cannot use
>> DataStream API because it does not support Confluent Avro.
>>
>> After consuming the messages, I am converting to DataStream API and using
>> ProcessFunction. The ProcessFunction makes async http calls and emits
>> results with a completely different structure. This works well. If I don't
>> do anything, and just do `ds.print()`,  things are printed as expected.
>>
>> The issues start happening when I convert from DataStream to Table API
>> again. To do this effectively and sink back to Kafka (with Confluent Avro
>> serialization), I am specifying `output_type` while calling `process` on
>> the stream. I have gathered (there might be other ways I am unaware of)
>> that if I don't specify the schema here, converting back to Table API (with
>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the
>> data.
>>
>> What I am not sure about are the following things:
>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
>> 2. What should my schema (for output_type) look like given the structure
>> of the message: https://pastebin.com/kEFBJSBS
>>
>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
>> error:
>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
>> inside `Row` is causing some issues.
>>
>> Thanks!
>>
>> --
>> Dhavan
>>
>


Re: At-least once sinks and their behaviour in a non-failure scenario

2022-05-12 Thread Alexander Preuß
Hi Piotr,

You are correct regarding the Savepoint, there should be no duplicates sent
to RabbitMQ.

Best regards,
Alexander

On Thu, May 12, 2022 at 11:28 AM Piotr Domagalski 
wrote:

> Hi,
>
> I'm planning to build a pipeline that is using Kafka source, some stateful
> transformation and a RabbitMQ sink. What I don't yet fully understand is
> how common should I expect the "at-least once" scenario (ie. seeing
> duplicates) on the sink side. The case when things start failing is clear
> to me, but what happens when I want to gracefully stop the Flink job?
>
> Am I right in thinking that when I gracefully stop a job with a final
> savepoint [1] then what happens is that Kafka source stops consuming, a
> checkpoint barrier is sent through the pipeline and this will flush the
> sink completely? So my understanding is that if nothing fails and that
> Kafka offset is committed, when the job is started again from that
> savepoint, it will not result in any duplicates being sent to RabbitMQ. Is
> that correct?
>
> Thanks!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint
>
> --
> Piotr Domagalski
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Batching in kinesis sink

2022-05-12 Thread Zain Haider Nemati
Hi,
I am using a kinesis sink with flink 1.13.
The amount of data is in millions and it choke the 1MB cap for kinesis data
streams.
Is there any way to send data to kinesis sink in batches of less than 1MB?
or any other workaround


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-12 Thread Robert Metzger
+1 on setting up our own Slack instance (PMC owned)
+1 for having a separate discussion about setting up a discussion forum (I
like the idea of using GH discussions)

Besides, we still need to investigate how
> http://apache-airflow.slack-archives.org works, I think
> a slack of our own can be easier to set up the archive.


This is the code used by airflow: https://github.com/ashb/slackarchive. I'm
happy to look into setting up the archive for the community.


On Thu, May 12, 2022 at 11:00 AM Jark Wu  wrote:

> Hi,
>
> I would +1 to create Apache Flink Slack for the lower barriers to entry as
> Jingsong mentioned.
> Besides, we still need to investigate how
> http://apache-airflow.slack-archives.org works, I think
> a slack of our own can be easier to set up the archive.
>
> Regarding Discourse vs Slack, I think they are not exclusive, but
> complementary.
> Someday in the future, we might be able to provide them both. But what we
> are seeking today
> is a tool that can provide real-time communication and ad-hoc questions
> and interactions.
> A forum is more similar to a mailing list. Forum is modern mailing list
> but can't solve the problems
> mentioned above. With slack-archives, the information and thoughtful
> discussion in Slack can also be searchable.
>
> I think we can open another thread to discuss creating a forum for Flink
> and keep this thread focused
> on Slack. IMO, we can investigate more kinds of forums, for example GitHub
> Discussion which is free, powerful
>  and fully-managed. Airflow[1] and Next.JS also use GitHub Discussion as
> their forum.
>
> Best,
> Jark
>
> [1]: https://github.com/apache/airflow/discussions
> [2]: https://github.com/vercel/next.js/discussions
>
>
> On Thu, 12 May 2022 at 15:24, Martijn Visser 
> wrote:
>
>> Hi,
>>
>> I would +1 setting up our own Slack. It will allow us to provide the best
>> experience for those in the community who want to use Slack.
>> More than happy to help with setting up community guidelines on how to use
>> Slack.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, 12 May 2022 at 05:22, Yun Tang  wrote:
>>
>> > Hi all,
>> >
>> > I think forum might be a good choice for search and maintain. However,
>> > unlike slack workspace, it seems no existing popular product could be
>> > leveraged easily.
>> >
>> > Thus, I am +1 to create an Apache Flink slack channel. If the ASF slack
>> > cannot be joined easily for most of users, I prefer to set up our own
>> slack
>> > workspace.
>> >
>> > Best
>> > Yun Tang
>> > --
>> > *From:* Jingsong Li 
>> > *Sent:* Thursday, May 12, 2022 10:49
>> > *To:* Xintong Song 
>> > *Cc:* dev ; user 
>> > *Subject:* Re: [Discuss] Creating an Apache Flink slack workspace
>> >
>> > Hi all,
>> >
>> > Regarding using ASF slack. I share the problems I saw in the Apache
>> Druid
>> > community. [1]
>> >
>> > > As you may have heard, it’s become increasingly difficult for new
>> users
>> > without an @apache.org email address to join the ASF #druid Slack
>> channel.
>> > ASF Infra disabled the option to publicly provide a link to the
>> workspace
>> > to anyone who wanted it, after encountering issues with spammers.
>> >
>> > > Per Infra’s guidance (https://infra.apache.org/slack.html), new
>> > community
>> > members should only be invited as single-channel guests. Unfortunately,
>> > single-channel guests are unable to extend invitations to new members,
>> > including their colleagues who are using Druid. Only someone with full
>> > member privileges is able to extend an invitation to new members. This
>> lack
>> > of consistency doesn’t make the community feel inclusive.
>> >
>> > > There is a workaround in place (
>> > https://github.com/apache/druid-website-src/pull/278) – users can send
>> an
>> > email to druid-u...@googlegroups.com to request an invite to the Slack
>> > channel from an existing member – but this still poses a barrier to
>> entry,
>> > and isn’t a viable permanent solution. It also creates potential privacy
>> > issues as not everyone is at liberty to announce they’re using Druid nor
>> > wishes to display their email address in a public forum.
>> >
>> > [1] https://lists.apache.org/thread/f36tvfwfo2ssf1x3jb4q0v2pftdyo5z5
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Thu, May 12, 2022 at 10:22 AM Xintong Song 
>> > wrote:
>> >
>> > > To make some progress, maybe we decide on chat vs forum vs none and
>> then
>> > >> go into a deeper discussion on the implementation or is there
>> anything
>> > >> about Slack that would be complete blocker for the implementation?
>> > >>
>> > >
>> > > Sure, then I'd be +1 for chat. From my side, the initiative is more
>> about
>> > > making communication more efficient, rather than making information
>> > easier
>> > > to find.
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > >
>> > > On Wed, May 11, 2022 at 5:39 PM Konstantin Knauf 
>> > > wrote:
>> > >
>> > >> I don't think we can maintain two additional channels. 

Re: Incompatible data types while using firehose sink

2022-05-12 Thread yuxia
Firehose implements sink2 which is introduced in Flink 1.15. But the method 
inputStream#sinkTo(xxx) only accepts sink1 in Flink 1.13. 

If you still want to use Firehose in Flink 1.13, I guess you may need to 
implement a SinkV2Adapter Or to t ranslates Sink V2 into Sink V1 like 
SinkV1Adapter in Flink 1.15 or rewrite some code of Firehose connector to 
migrate it to sink1. 

Best regards, 
Yuxia 


发件人: "Zain Haider Nemati"  
收件人: "Martijn Visser"  
抄送: "yu'an huang" , "User"  
发送时间: 星期四, 2022年 5 月 12日 下午 3:36:46 
主题: Re: Incompatible data types while using firehose sink 

Hi, Appreciate your response. 
My flink version is 1.13. 
Is there any other way to sink data to kinesis without having to update to 1.15 

On Thu, May 12, 2022 at 12:25 PM Martijn Visser < [ 
mailto:martijnvis...@apache.org | martijnvis...@apache.org ] > wrote: 



I'm guessing this must be Flink 1.15 since Firehose was added in that version 
:) 

On Thu, 12 May 2022 at 08:41, yu'an huang < [ mailto:h.yuan...@gmail.com | 
h.yuan...@gmail.com ] > wrote: 

BQ_BEGIN

Hi, 

Your code is working fine in my computer. What is the Flink version you are 
using. 





BQ_BEGIN

On 12 May 2022, at 3:39 AM, Zain Haider Nemati < [ 
mailto:zain.hai...@retailo.co | zain.hai...@retailo.co ] > wrote: 

Hi Folks, 
Getting this error when sinking data to a firehosesink, would really appreciate 
some help ! 

DataStream inputStream = env.addSource(new FlinkKafkaConsumer<>("xxx", 
new SimpleStringSchema(), properties)); 

Properties sinkProperties = new Properties(); 
sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx"); 
KinesisFirehoseSink kdfSink = KinesisFirehoseSink.builder() 
.setFirehoseClientProperties(sinkProperties) 
.setSerializationSchema(new SimpleStringSchema()) 
.setDeliveryStreamName("xxx") 
.setMaxBatchSize(350) 
.build(); 



inputStream.sinkTo(kdfSink); 

incompatible types: 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink 
cannot be converted to 
org.apache.flink.api.connector.sink.Sink 





BQ_END


BQ_END




At-least once sinks and their behaviour in a non-failure scenario

2022-05-12 Thread Piotr Domagalski
Hi,

I'm planning to build a pipeline that is using Kafka source, some stateful
transformation and a RabbitMQ sink. What I don't yet fully understand is
how common should I expect the "at-least once" scenario (ie. seeing
duplicates) on the sink side. The case when things start failing is clear
to me, but what happens when I want to gracefully stop the Flink job?

Am I right in thinking that when I gracefully stop a job with a final
savepoint [1] then what happens is that Kafka source stops consuming, a
checkpoint barrier is sent through the pipeline and this will flush the
sink completely? So my understanding is that if nothing fails and that
Kafka offset is committed, when the job is started again from that
savepoint, it will not result in any duplicates being sent to RabbitMQ. Is
that correct?

Thanks!

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

-- 
Piotr Domagalski


Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-05-12 Thread Dongwon Kim
Hi Juntao,

Thanks a lot for taking a look at this.

After a little inspection, I found that elements (window state) are stored
> in namespace TimeWindow{start=1,end=11}, in your case, and trigger count
> (trigger state) is stored in namespace TimeWindow{start=1,end=15}, but
> WindowReaderOperator only tries to find keys and namespaces related to
> window state.


So is there any chance for me to get the trigger state using the correct
name space? Or should it be considered as a bug to be fixed?

Best,

Dongwon

On Thu, May 12, 2022 at 5:51 PM Juntao Hu  wrote:

> Sorry to make the previous mail private.
> My response reposted here:
> "
> After a little inspection, I found that elements (window state) are stored
> in namespace TimeWindow{start=1,end=11}, in your case, and trigger count
> (trigger state) is stored in namespace TimeWindow{start=1,end=15}, but
> WindowReaderOperator only tries to find keys and namespaces related to
> window state.
> "
>
> Juntao Hu  于2022年5月12日周四 11:47写道:
>
>> After a little inspection, I found that elements (window state) are
>> stored in namespace TimeWindow{start=1,end=11}, in your case, and trigger
>> count (trigger state) is stored in namespace TimeWindow{start=1,end=15},
>> but WindowReaderOperator only tries to find keys and namespaces related to
>> window state.
>>
>> Dongwon Kim  于2022年4月19日周二 15:29写道:
>>
>>> Hi,
>>>
>>> I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the
>>> state of a stateful trigger attached to a session window.
>>> I found that the following data become available in WindowReaderFunction:
>>> - the state defined in the ProcessWindowFunction
>>> - the registered timers of the stateful trigger attached to the session
>>> window
>>> - all the elements of the window
>>> , but the state of the stateful trigger attached to the session window
>>> is not available when using State Processor API.
>>> In other words, the following code always returns null when used with
>>> session windows:
>>>
 ReducingState state =
 context.triggerState(triggerCountDesc);
 Long val = state.get();

>>> On the other hand, the above code snippet returns expected data when
>>> used with sliding and tumbling windows.
>>>
>>> To explain the problem, I made up an example in a similar spirit to
>>> o.a.f.state.api.SavepointWindowReaderITCase.
>>> Here you can find three test cases each with different types of
>>> event-time windows: Session, Sliding, and Tumbling.
>>> With sliding and tumbling windows, I can read the state of the trigger
>>> attached to the windows in WindowReaderFunction.
>>> However, with a session window, I cannot read the state of the trigger
>>> in WindowReaderFunction.
>>>
>>> Is it a bug, or did I miss something?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-12 Thread Jark Wu
Hi,

I would +1 to create Apache Flink Slack for the lower barriers to entry as
Jingsong mentioned.
Besides, we still need to investigate how
http://apache-airflow.slack-archives.org works, I think
a slack of our own can be easier to set up the archive.

Regarding Discourse vs Slack, I think they are not exclusive, but
complementary.
Someday in the future, we might be able to provide them both. But what we
are seeking today
is a tool that can provide real-time communication and ad-hoc questions and
interactions.
A forum is more similar to a mailing list. Forum is modern mailing list but
can't solve the problems
mentioned above. With slack-archives, the information and thoughtful
discussion in Slack can also be searchable.

I think we can open another thread to discuss creating a forum for Flink
and keep this thread focused
on Slack. IMO, we can investigate more kinds of forums, for example GitHub
Discussion which is free, powerful
 and fully-managed. Airflow[1] and Next.JS also use GitHub Discussion as
their forum.

Best,
Jark

[1]: https://github.com/apache/airflow/discussions
[2]: https://github.com/vercel/next.js/discussions


On Thu, 12 May 2022 at 15:24, Martijn Visser  wrote:

> Hi,
>
> I would +1 setting up our own Slack. It will allow us to provide the best
> experience for those in the community who want to use Slack.
> More than happy to help with setting up community guidelines on how to use
> Slack.
>
> Best regards,
>
> Martijn
>
> On Thu, 12 May 2022 at 05:22, Yun Tang  wrote:
>
> > Hi all,
> >
> > I think forum might be a good choice for search and maintain. However,
> > unlike slack workspace, it seems no existing popular product could be
> > leveraged easily.
> >
> > Thus, I am +1 to create an Apache Flink slack channel. If the ASF slack
> > cannot be joined easily for most of users, I prefer to set up our own
> slack
> > workspace.
> >
> > Best
> > Yun Tang
> > --
> > *From:* Jingsong Li 
> > *Sent:* Thursday, May 12, 2022 10:49
> > *To:* Xintong Song 
> > *Cc:* dev ; user 
> > *Subject:* Re: [Discuss] Creating an Apache Flink slack workspace
> >
> > Hi all,
> >
> > Regarding using ASF slack. I share the problems I saw in the Apache Druid
> > community. [1]
> >
> > > As you may have heard, it’s become increasingly difficult for new users
> > without an @apache.org email address to join the ASF #druid Slack
> channel.
> > ASF Infra disabled the option to publicly provide a link to the workspace
> > to anyone who wanted it, after encountering issues with spammers.
> >
> > > Per Infra’s guidance (https://infra.apache.org/slack.html), new
> > community
> > members should only be invited as single-channel guests. Unfortunately,
> > single-channel guests are unable to extend invitations to new members,
> > including their colleagues who are using Druid. Only someone with full
> > member privileges is able to extend an invitation to new members. This
> lack
> > of consistency doesn’t make the community feel inclusive.
> >
> > > There is a workaround in place (
> > https://github.com/apache/druid-website-src/pull/278) – users can send
> an
> > email to druid-u...@googlegroups.com to request an invite to the Slack
> > channel from an existing member – but this still poses a barrier to
> entry,
> > and isn’t a viable permanent solution. It also creates potential privacy
> > issues as not everyone is at liberty to announce they’re using Druid nor
> > wishes to display their email address in a public forum.
> >
> > [1] https://lists.apache.org/thread/f36tvfwfo2ssf1x3jb4q0v2pftdyo5z5
> >
> > Best,
> > Jingsong
> >
> > On Thu, May 12, 2022 at 10:22 AM Xintong Song 
> > wrote:
> >
> > > To make some progress, maybe we decide on chat vs forum vs none and
> then
> > >> go into a deeper discussion on the implementation or is there anything
> > >> about Slack that would be complete blocker for the implementation?
> > >>
> > >
> > > Sure, then I'd be +1 for chat. From my side, the initiative is more
> about
> > > making communication more efficient, rather than making information
> > easier
> > > to find.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Wed, May 11, 2022 at 5:39 PM Konstantin Knauf 
> > > wrote:
> > >
> > >> I don't think we can maintain two additional channels. Some people
> have
> > >> already concerns about covering one additional channel.
> > >>
> > >> I think, a forum provides a better user experience than a mailing
> list.
> > >> Information is structured better, you can edit messages, sign up and
> > search
> > >> is easier.
> > >>
> > >> To make some progress, maybe we decide on chat vs forum vs none and
> then
> > >> go into a deeper discussion on the implementation or is there anything
> > >> about Slack that would be complete blocker for the implementation?
> > >>
> > >>
> > >>
> > >> Am Mi., 11. Mai 2022 um 07:35 Uhr schrieb Xintong Song <
> > >> tonysong...@gmail.com>:
> > >>
> > >>> I agree with Robert on reworking the 

Data loss while writing to File System

2022-05-12 Thread Surendra Lalwani
Hi Team,

When I am writing to S3 then in that case there are chances of data loss
when let suppose you have in-progress/pending files and your job fails. In
that case the data which was written to in-progress files is lost as even
restoring from savepoint is not reading data again.

Thanks and Regards ,
Surendra Lalwani

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Re: Incompatible data types while using firehose sink

2022-05-12 Thread Martijn Visser
Hi,

As far as I know, there is no Firehose sink in Flink 1.13, only a Kinesis
one [1]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/


On Thu, 12 May 2022 at 09:37, Zain Haider Nemati 
wrote:

> Hi, Appreciate your response.
> My flink version is 1.13.
> Is there any other way to sink data to kinesis without having to update to
> 1.15
>
> On Thu, May 12, 2022 at 12:25 PM Martijn Visser 
> wrote:
>
>> I'm guessing this must be Flink 1.15 since Firehose was added in that
>> version :)
>>
>> On Thu, 12 May 2022 at 08:41, yu'an huang  wrote:
>>
>>> Hi,
>>>
>>> Your code is working fine in my computer. What is the Flink version you
>>> are using.
>>>
>>>
>>>
>>>
>>> On 12 May 2022, at 3:39 AM, Zain Haider Nemati 
>>> wrote:
>>>
>>> Hi Folks,
>>> Getting this error when sinking data to a firehosesink, would really
>>> appreciate some help !
>>>
>>> DataStream inputStream = env.addSource(new
>>> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));
>>>
>>> Properties sinkProperties = new Properties();
>>> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");
>>> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>>> "xxx");
>>> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>>> "xxx");
>>> KinesisFirehoseSink kdfSink =
>>> KinesisFirehoseSink.builder()
>>> .setFirehoseClientProperties(sinkProperties)
>>> .setSerializationSchema(new SimpleStringSchema())
>>> .setDeliveryStreamName("xxx")
>>> .setMaxBatchSize(350)
>>> .build();
>>>
>>> inputStream.sinkTo(kdfSink);
>>>
>>> incompatible types:
>>> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
>>> cannot be converted to
>>> org.apache.flink.api.connector.sink.Sink
>>>
>>>
>>>


Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-05-12 Thread Juntao Hu
Sorry to make the previous mail private.
My response reposted here:
"
After a little inspection, I found that elements (window state) are stored
in namespace TimeWindow{start=1,end=11}, in your case, and trigger count
(trigger state) is stored in namespace TimeWindow{start=1,end=15}, but
WindowReaderOperator only tries to find keys and namespaces related to
window state.
"

Juntao Hu  于2022年5月12日周四 11:47写道:

> After a little inspection, I found that elements (window state) are stored
> in namespace TimeWindow{start=1,end=11}, in your case, and trigger count
> (trigger state) is stored in namespace TimeWindow{start=1,end=15}, but
> WindowReaderOperator only tries to find keys and namespaces related to
> window state.
>
> Dongwon Kim  于2022年4月19日周二 15:29写道:
>
>> Hi,
>>
>> I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the
>> state of a stateful trigger attached to a session window.
>> I found that the following data become available in WindowReaderFunction:
>> - the state defined in the ProcessWindowFunction
>> - the registered timers of the stateful trigger attached to the session
>> window
>> - all the elements of the window
>> , but the state of the stateful trigger attached to the session window is
>> not available when using State Processor API.
>> In other words, the following code always returns null when used with
>> session windows:
>>
>>> ReducingState state =
>>> context.triggerState(triggerCountDesc);
>>> Long val = state.get();
>>>
>> On the other hand, the above code snippet returns expected data when used
>> with sliding and tumbling windows.
>>
>> To explain the problem, I made up an example in a similar spirit to
>> o.a.f.state.api.SavepointWindowReaderITCase.
>> Here you can find three test cases each with different types of
>> event-time windows: Session, Sliding, and Tumbling.
>> With sliding and tumbling windows, I can read the state of the trigger
>> attached to the windows in WindowReaderFunction.
>> However, with a session window, I cannot read the state of the trigger in
>> WindowReaderFunction.
>>
>> Is it a bug, or did I miss something?
>>
>> Best,
>>
>> Dongwon
>>
>>


回复: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 Thread 徐战辉


hi, Yuxia,   
这边是想咨询下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 1
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://**:8020/flink/checkpoints
state.savepoints.dir: hdfs://:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ..
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ..
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '360',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)
Best Regards

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 回复的原邮件 
| 发件人 | yuxia |
| 发送日期 | 2022年5月12日 15:16 |
| 收件人 | user-zh |
| 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

- 原始邮件 -
发件人: "徐战辉" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |


Re: Incompatible data types while using firehose sink

2022-05-12 Thread Zain Haider Nemati
Hi, Appreciate your response.
My flink version is 1.13.
Is there any other way to sink data to kinesis without having to update to
1.15

On Thu, May 12, 2022 at 12:25 PM Martijn Visser 
wrote:

> I'm guessing this must be Flink 1.15 since Firehose was added in that
> version :)
>
> On Thu, 12 May 2022 at 08:41, yu'an huang  wrote:
>
>> Hi,
>>
>> Your code is working fine in my computer. What is the Flink version you
>> are using.
>>
>>
>>
>>
>> On 12 May 2022, at 3:39 AM, Zain Haider Nemati 
>> wrote:
>>
>> Hi Folks,
>> Getting this error when sinking data to a firehosesink, would really
>> appreciate some help !
>>
>> DataStream inputStream = env.addSource(new
>> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));
>>
>> Properties sinkProperties = new Properties();
>> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");
>> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
>> "xxx");
>> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
>> "xxx");
>> KinesisFirehoseSink kdfSink =
>> KinesisFirehoseSink.builder()
>> .setFirehoseClientProperties(sinkProperties)
>> .setSerializationSchema(new SimpleStringSchema())
>> .setDeliveryStreamName("xxx")
>> .setMaxBatchSize(350)
>> .build();
>>
>> inputStream.sinkTo(kdfSink);
>>
>> incompatible types:
>> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
>> cannot be converted to
>> org.apache.flink.api.connector.sink.Sink
>>
>>
>>


Re: Incompatible data types while using firehose sink

2022-05-12 Thread Martijn Visser
I'm guessing this must be Flink 1.15 since Firehose was added in that
version :)

On Thu, 12 May 2022 at 08:41, yu'an huang  wrote:

> Hi,
>
> Your code is working fine in my computer. What is the Flink version you
> are using.
>
>
>
>
> On 12 May 2022, at 3:39 AM, Zain Haider Nemati 
> wrote:
>
> Hi Folks,
> Getting this error when sinking data to a firehosesink, would really
> appreciate some help !
>
> DataStream inputStream = env.addSource(new
> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));
>
> Properties sinkProperties = new Properties();
> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");
> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> "xxx");
> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> "xxx");
> KinesisFirehoseSink kdfSink =
> KinesisFirehoseSink.builder()
> .setFirehoseClientProperties(sinkProperties)
> .setSerializationSchema(new SimpleStringSchema())
> .setDeliveryStreamName("xxx")
> .setMaxBatchSize(350)
> .build();
>
> inputStream.sinkTo(kdfSink);
>
> incompatible types:
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
> cannot be converted to
> org.apache.flink.api.connector.sink.Sink
>
>
>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-12 Thread Martijn Visser
Hi,

I would +1 setting up our own Slack. It will allow us to provide the best
experience for those in the community who want to use Slack.
More than happy to help with setting up community guidelines on how to use
Slack.

Best regards,

Martijn

On Thu, 12 May 2022 at 05:22, Yun Tang  wrote:

> Hi all,
>
> I think forum might be a good choice for search and maintain. However,
> unlike slack workspace, it seems no existing popular product could be
> leveraged easily.
>
> Thus, I am +1 to create an Apache Flink slack channel. If the ASF slack
> cannot be joined easily for most of users, I prefer to set up our own slack
> workspace.
>
> Best
> Yun Tang
> --
> *From:* Jingsong Li 
> *Sent:* Thursday, May 12, 2022 10:49
> *To:* Xintong Song 
> *Cc:* dev ; user 
> *Subject:* Re: [Discuss] Creating an Apache Flink slack workspace
>
> Hi all,
>
> Regarding using ASF slack. I share the problems I saw in the Apache Druid
> community. [1]
>
> > As you may have heard, it’s become increasingly difficult for new users
> without an @apache.org email address to join the ASF #druid Slack channel.
> ASF Infra disabled the option to publicly provide a link to the workspace
> to anyone who wanted it, after encountering issues with spammers.
>
> > Per Infra’s guidance (https://infra.apache.org/slack.html), new
> community
> members should only be invited as single-channel guests. Unfortunately,
> single-channel guests are unable to extend invitations to new members,
> including their colleagues who are using Druid. Only someone with full
> member privileges is able to extend an invitation to new members. This lack
> of consistency doesn’t make the community feel inclusive.
>
> > There is a workaround in place (
> https://github.com/apache/druid-website-src/pull/278) – users can send an
> email to druid-u...@googlegroups.com to request an invite to the Slack
> channel from an existing member – but this still poses a barrier to entry,
> and isn’t a viable permanent solution. It also creates potential privacy
> issues as not everyone is at liberty to announce they’re using Druid nor
> wishes to display their email address in a public forum.
>
> [1] https://lists.apache.org/thread/f36tvfwfo2ssf1x3jb4q0v2pftdyo5z5
>
> Best,
> Jingsong
>
> On Thu, May 12, 2022 at 10:22 AM Xintong Song 
> wrote:
>
> > To make some progress, maybe we decide on chat vs forum vs none and then
> >> go into a deeper discussion on the implementation or is there anything
> >> about Slack that would be complete blocker for the implementation?
> >>
> >
> > Sure, then I'd be +1 for chat. From my side, the initiative is more about
> > making communication more efficient, rather than making information
> easier
> > to find.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, May 11, 2022 at 5:39 PM Konstantin Knauf 
> > wrote:
> >
> >> I don't think we can maintain two additional channels. Some people have
> >> already concerns about covering one additional channel.
> >>
> >> I think, a forum provides a better user experience than a mailing list.
> >> Information is structured better, you can edit messages, sign up and
> search
> >> is easier.
> >>
> >> To make some progress, maybe we decide on chat vs forum vs none and then
> >> go into a deeper discussion on the implementation or is there anything
> >> about Slack that would be complete blocker for the implementation?
> >>
> >>
> >>
> >> Am Mi., 11. Mai 2022 um 07:35 Uhr schrieb Xintong Song <
> >> tonysong...@gmail.com>:
> >>
> >>> I agree with Robert on reworking the "Community" and "Getting Help"
> >>> pages to emphasize how we position the mailing lists and Slack, and on
> >>> revisiting in 6-12 months.
> >>>
> >>> Concerning dedicated Apache Flink Slack vs. ASF Slack, I'm with
> >>> Konstantin. I'd expect it to be easier for having more channels and
> keeping
> >>> them organized, managing permissions for different roles, adding bots,
> etc.
> >>>
> >>> IMO, having Slack is about improving the communication efficiency when
> >>> you are already in a discussion, and we expect such improvement would
> >>> motivate users to interact more with each other. From that perspective,
> >>> forums are not much better than mailing lists.
> >>>
> >>> I'm also open to forums as well, but not as an alternative to Slack. I
> >>> definitely see how forums help in keeping information organized and
> easy to
> >>> find. However, I'm a bit concerned about the maintenance overhead. I'm
> not
> >>> very familiar with Discourse or Reddit. My impression is that they are
> not
> >>> as easy to set up and maintain as Slack.
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>> [1] https://asktug.com/
> >>>
> >>> On Tue, May 10, 2022 at 4:50 PM Konstantin Knauf 
> >>> wrote:
> >>>
>  Thanks for starting this discussion again. I am pretty much with Timo
>  here. Slack or Discourse as an alternative for the user community, and
>  mailing list for the contributing, 

Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 Thread yuxia
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

- 原始邮件 -
发件人: "徐战辉" 
收件人: "user-zh" 
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongm...@163.com
|
 转发的原邮件 
| 发件人 | 徐战辉 |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |


Re: http stream as input data source

2022-05-12 Thread yuxia
The quick answer is no. 
There's no http data stream on hand. 
You can implement one by yourself. Here[1] is a guidance about how to implemet 
user-defined source & sink 

Btw, there's a jira for http sink[2] but is marked as won't fix. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 
[2]: https://issues.apache.org/jira/browse/FLINK-8047. 

Best regards, 
Yuxia 


发件人: "Harald Busch"  
收件人: "User"  
发送时间: 星期四, 2022年 5 月 12日 上午 11:37:29 
主题: http stream as input data source 

Hi, 
is there a http data stream as data source ? 
I only see socketTextStream and other predefined stream sources. 
It seems that I have to use fromCollection, fromElements ... and prepare the 
collection for myself. 
Thanks 
Regards 


Re: Incompatible data types while using firehose sink

2022-05-12 Thread yu'an huang
Hi, 

Your code is working fine in my computer. What is the Flink version you are 
using.




> On 12 May 2022, at 3:39 AM, Zain Haider Nemati  wrote:
> 
> Hi Folks,
> Getting this error when sinking data to a firehosesink, would really 
> appreciate some help !
> 
> DataStream inputStream = env.addSource(new 
> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));
> 
> Properties sinkProperties = new Properties();
> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");
> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx");
> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> "xxx");
> KinesisFirehoseSink kdfSink = 
> KinesisFirehoseSink.builder()
> .setFirehoseClientProperties(sinkProperties)  
> .setSerializationSchema(new SimpleStringSchema()) 
> .setDeliveryStreamName("xxx")
> .setMaxBatchSize(350)
> .build();
> inputStream.sinkTo(kdfSink);
> 
> incompatible types: 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink
>  cannot be converted to 
> org.apache.flink.api.connector.sink.Sink



How to test Flink SQL UDF with open method?

2022-05-12 Thread zhouhaifengmath






Hi,
I am trying to test a flink sql udf which has open method to get some parameters with Flink1.14, but i can't find an example to set those parameters in a test. Can someone give me a example on this question? Thanks for your help~Thanks && Regards

 


zhouhaifengmathzhouhaifengm...@gmail.com