退订

2022-11-15 Thread jack zhang



Re: Reading Parquet file with array of structs cause error

2022-11-15 Thread liu ron
It will be addressed in FLINK-28867.

Best,
Ron

Benenson, Michael via user  于2022年11月16日周三 08:47写道:

> Thanks, Jing
>
>
>
> Do you know, if this problem will be addressed in FLINK-28867
>  or  it deserve a
> separate Jira?
>
>
>
>
>
> *From: *Jing Ge 
> *Date: *Tuesday, November 15, 2022 at 3:39 PM
> *To: *Benenson, Michael 
> *Cc: *user@flink.apache.org , Deshpande, Omkar <
> omkar_deshpa...@intuit.com>, Vora, Jainik 
> *Subject: *Re: Reading Parquet file with array of structs cause error
>
> This email is from an external sender.
>
>
>
> Hi Michael,
>
>
>
> Currently, ParquetColumnarRowInputFormat does not support schemas with
> nested columns. If your parquet file stores Avro records. You might want to
> try e.g. Avro Generic record[1].
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record
>
>
>
> Best regards,
>
> Jing
>
>
>
>
>
> On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <
> user@flink.apache.org> wrote:
>
> Hi, folks
>
>
>
> I’m using flink 1.16.0, and I would like to read Parquet file (attached),
> that has schema [1].
>
>
>
> I could read this file with Spark, but when I try to read it with Flink
> 1.16.0 (program attached) using schema [2]
>
> I got IndexOutOfBoundsException [3]
>
>
>
> My code, and parquet file are attached. Is it:
>
> · the problem, described in FLINK-28867
>  or
>
> · something new, that deserve a separate Jira, or
>
> · something wrong with my code?
>
>
>
> [1]: Parquet Schema
>
>
>
> root
>
> |-- amount: decimal(38,9) (nullable = true)
>
> |-- connectionAccountId: string (nullable = true)
>
> |-- sourceEntity: struct (nullable = true)
>
> ||-- extendedProperties: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- key: string (nullable = true)
>
> ||||-- value: string (nullable = true)
>
> ||-- sourceAccountId: string (nullable = true)
>
> ||-- sourceEntityId: string (nullable = true)
>
> ||-- sourceEntityType: string (nullable = true)
>
> ||-- sourceSystem: string (nullable = true)
>
>
>
>
>
> [2]: Schema used in Flink:
>
>
>
> static RowType getSchema()
>
> {
>
> RowType elementType = RowType.of(
>
> new LogicalType[] {
>
> new VarCharType(VarCharType.MAX_LENGTH),
>
> new VarCharType(VarCharType.MAX_LENGTH)
>
> },
>
> new String[] {
>
> "key",
>
> "value"
>
> }
>
> );
>
>
>
> RowType element = RowType.of(
>
> new LogicalType[] { elementType },
>
> new String[] { "element" }
>
> );
>
>
>
> RowType sourceEntity = RowType.of(
>
> new LogicalType[] {
>
> new ArrayType(element),
>
> new VarCharType(),
>
> new VarCharType(),
>
> new VarCharType(),
>
> new VarCharType(),
>
> },
>
> new String[] {
>
> "extendedProperties",
>
> "sourceAccountId",
>
> "sourceEntityId",
>
> "sourceEntityType",
>
> "sourceSystem"
>
> }
>
> );
>
>
>
> return  RowType.of(
>
> new LogicalType[] {
>
> new DecimalType(),
>
> new VarCharType(),
>
> sourceEntity
>
> },
>
> new String[] {
>
> "amount",
>
> "connectionAccountId",
>
> "sourceEntity",
>
> });
>
> }
>
>
>
> [3]:  Execution Exception:
>
>
> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager -
> Received uncaught exception.
>
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
>
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>
> ...
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for
> length 1
>
> at
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>
> at
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>
> at java.base/java.util.Objects.checkIndex(Objects.java:372)
>
> at java.base/java.util.ArrayList.get(ArrayList.java:459)
>
> at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>
>

Re: Reading Parquet file with array of structs cause error

2022-11-15 Thread Benenson, Michael via user
Thanks, Jing


Do you know, if this problem will be addressed in 
FLINK-28867 or  it deserve a 
separate Jira?


From: Jing Ge 
Date: Tuesday, November 15, 2022 at 3:39 PM
To: Benenson, Michael 
Cc: user@flink.apache.org , Deshpande, Omkar 
, Vora, Jainik 
Subject: Re: Reading Parquet file with array of structs cause error
This email is from an external sender.

Hi Michael,

Currently, ParquetColumnarRowInputFormat does not support schemas with nested 
columns. If your parquet file stores Avro records. You might want to try e.g. 
Avro Generic record[1].

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record

Best regards,
Jing


On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user 
mailto:user@flink.apache.org>> wrote:
Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]
I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

• the problem, described in 
FLINK-28867 or

• something new, that deserve a separate Jira, or

• something wrong with my code?

[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
||-- extendedProperties: array (nullable = true)
|||-- element: struct (containsNull = true)
||||-- key: string (nullable = true)
||||-- value: string (nullable = true)
||-- sourceAccountId: string (nullable = true)
||-- sourceEntityId: string (nullable = true)
||-- sourceEntityType: string (nullable = true)
||-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[] {
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)
},
new String[] {
"key",
"value"
}
);

RowType element = RowType.of(
new LogicalType[] { elementType },
new String[] { "element" }
);

RowType sourceEntity = RowType.of(
new LogicalType[] {
new ArrayType(element),
new VarCharType(),
new VarCharType(),
new VarCharType(),
new VarCharType(),
},
new String[] {
"extendedProperties",
"sourceAccountId",
"sourceEntityId",
"sourceEntityType",
"sourceSystem"
}
);

return  RowType.of(
new LogicalType[] {
new DecimalType(),
new VarCharType(),
sourceEntity
},
new String[] {
"amount",
"connectionAccountId",
"sourceEntity",
});
}

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at 

Re: Reading Parquet file with array of structs cause error

2022-11-15 Thread Jing Ge
Hi Michael,

Currently, ParquetColumnarRowInputFormat does not support schemas with
nested columns. If your parquet file stores Avro records. You might want to
try e.g. Avro Generic record[1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record

Best regards,
Jing


On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <
user@flink.apache.org> wrote:

> Hi, folks
>
>
>
> I’m using flink 1.16.0, and I would like to read Parquet file (attached),
> that has schema [1].
>
>
>
> I could read this file with Spark, but when I try to read it with Flink
> 1.16.0 (program attached) using schema [2]
>
> I got IndexOutOfBoundsException [3]
>
>
>
> My code, and parquet file are attached. Is it:
>
> · the problem, described in FLINK-28867
>  or
>
> · something new, that deserve a separate Jira, or
>
> · something wrong with my code?
>
>
>
> [1]: Parquet Schema
>
>
>
> root
>
> |-- amount: decimal(38,9) (nullable = true)
>
> |-- connectionAccountId: string (nullable = true)
>
> |-- sourceEntity: struct (nullable = true)
>
> ||-- extendedProperties: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- key: string (nullable = true)
>
> ||||-- value: string (nullable = true)
>
> ||-- sourceAccountId: string (nullable = true)
>
> ||-- sourceEntityId: string (nullable = true)
>
> ||-- sourceEntityType: string (nullable = true)
>
> ||-- sourceSystem: string (nullable = true)
>
>
>
>
>
> [2]: Schema used in Flink:
>
>
>
> static RowType getSchema()
>
> {
>
> RowType elementType = RowType.of(
>
> new LogicalType[] {
>
> new VarCharType(VarCharType.MAX_LENGTH),
>
> new VarCharType(VarCharType.MAX_LENGTH)
>
> },
>
> new String[] {
>
> "key",
>
> "value"
>
> }
>
> );
>
>
>
> RowType element = RowType.of(
>
> new LogicalType[] { elementType },
>
> new String[] { "element" }
>
> );
>
>
>
> RowType sourceEntity = RowType.of(
>
> new LogicalType[] {
>
> new ArrayType(element),
>
> new VarCharType(),
>
> new VarCharType(),
>
> new VarCharType(),
>
> new VarCharType(),
>
> },
>
> new String[] {
>
> "extendedProperties",
>
> "sourceAccountId",
>
> "sourceEntityId",
>
> "sourceEntityType",
>
> "sourceSystem"
>
> }
>
> );
>
>
>
> return  RowType.of(
>
> new LogicalType[] {
>
> new DecimalType(),
>
> new VarCharType(),
>
> sourceEntity
>
> },
>
> new String[] {
>
> "amount",
>
> "connectionAccountId",
>
> "sourceEntity",
>
> });
>
> }
>
>
>
> [3]:  Execution Exception:
>
>
> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager -
> Received uncaught exception.
>
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
>
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>
> ...
>
> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for
> length 1
>
> at
> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>
> at
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>
> at
> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>
> at java.base/java.util.Objects.checkIndex(Objects.java:372)
>
> at java.base/java.util.ArrayList.get(ArrayList.java:459)
>
> at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
>
> at
> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>
> at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
>
> at
> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
>
> at
> 

Reading Parquet file with array of structs cause error

2022-11-15 Thread Benenson, Michael via user
Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]

I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

· the problem, described in 
FLINK-28867 or

· something new, that deserve a separate Jira, or

· something wrong with my code?


[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
||-- extendedProperties: array (nullable = true)
|||-- element: struct (containsNull = true)
||||-- key: string (nullable = true)
||||-- value: string (nullable = true)
||-- sourceAccountId: string (nullable = true)
||-- sourceEntityId: string (nullable = true)
||-- sourceEntityType: string (nullable = true)
||-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[] {
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)
},
new String[] {
"key",
"value"
}
);

RowType element = RowType.of(
new LogicalType[] { elementType },
new String[] { "element" }
);

RowType sourceEntity = RowType.of(
new LogicalType[] {
new ArrayType(element),
new VarCharType(),
new VarCharType(),
new VarCharType(),
new VarCharType(),
},
new String[] {
"extendedProperties",
"sourceAccountId",
"sourceEntityId",
"sourceEntityType",
"sourceSystem"
}
);

return  RowType.of(
new LogicalType[] {
new DecimalType(),
new VarCharType(),
sourceEntity
},
new String[] {
"amount",
"connectionAccountId",
"sourceEntity",
});
}

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted

Thanks



part-00121.parquet
Description: part-00121.parquet



Re: Kafka transactions drastically limit usability of Flink savepoints

2022-11-15 Thread Piotr Nowojski
Hi Yordan,

I don't understand where the problem is, why do you think savepoints are
unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
enabled, the current Flink behaviour shouldn't cause any problems (except
for maybe some logged errors).

Best,
Piotrek

wt., 15 lis 2022 o 15:36 Yordan Pavlov  napisał(a):

> Hi,
> we are using Kafka savepoints as a recovery tool and want to store
> multiple ones for the past months. However as we use Kafka
> transactions for our KafkaSink this puts expiration time on our
> savepoints. We can use a savepoint only as old as our Kafka
> transaction timeout. The problem is explained in this issue:
> https://issues.apache.org/jira/browse/FLINK-16419
> the relative comment being this one:
> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> have to recover and commit or if it has already happened. Due to that,
> they are always attempting to recover and commit transactions during
> startup."
> I'm surprised that more people are not hitting this problem as this
> makes Savepoints pretty much unusable as a recovery mechanism.
>


Re: Kafka transactioins & flink checkpoints

2022-11-15 Thread Yaroslav Tkachenko
Hi Vishal,

Just wanted to comment on this bit:

> My job has very large amount of state (>100GB) and I have no option but
to use unaligned checkpoints.

I successfully ran Flink jobs with 10+ TB of state and no unaligned
checkpoints enabled. Usually, you consider enabling them when there is some
kind of skew in the topology, but IMO it's unrelated to the state size.

> Reducing the checkpoint interval is not really an option given the size
of the checkpoint

Do you use RocksDB state backend with incremental checkpointing?

On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana  wrote:

> I wanted to achieve exactly once semantics in my job and wanted to make
> sure I understood the current behaviour correctly:
>
>1. Only one Kafka transaction at a time (no concurrent checkpoints)
>2. Only one transaction per checkpoint
>
>
> My job has very large amount of state (>100GB) and I have no option but to
> use unaligned checkpoints. With the above limitation, it seems to me that
> if checkpoint interval is 1 minute and checkpoint takes about 10 seconds to
> complete then only one Kafka transaction can happen in 70 seconds. All of
> the output records will not be visible until the transaction completes.
> This way a steady stream of inputs will result in an buffered output stream
> where data is only visible after a minute, thereby destroying any sort of
> real time streaming use cases. Reducing the checkpoint interval is not
> really an option given the size of the checkpoint. Only way out would be to
> allow for multiple transactions per checkpoint.
>
> Thanks,
> Vishal
>


Kafka transactions drastically limit usability of Flink savepoints

2022-11-15 Thread Yordan Pavlov
Hi,
we are using Kafka savepoints as a recovery tool and want to store
multiple ones for the past months. However as we use Kafka
transactions for our KafkaSink this puts expiration time on our
savepoints. We can use a savepoint only as old as our Kafka
transaction timeout. The problem is explained in this issue:
https://issues.apache.org/jira/browse/FLINK-16419
the relative comment being this one:
"FlinkKafkaProducer or KafkaSink do not know during recovery if they
have to recover and commit or if it has already happened. Due to that,
they are always attempting to recover and commit transactions during
startup."
I'm surprised that more people are not hitting this problem as this
makes Savepoints pretty much unusable as a recovery mechanism.


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> meaning that double and integer
I meant to write: "meaning that double and bigint ... "
:)

On Tue, Nov 15, 2022 at 8:54 AM Andrew Otto  wrote:

> > Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
> Glad to hear it, that is very cool!
>
> > converts number to double always. I wonder, did you make this up?
> Yes, we chose the the mapping.  We chose to do number -> double and
> integer -> bigint because both of those are wider than their float/int
> counterparts, meaning that double and integer will work in more cases.  Of
> course, this is not an optimal usage of bits, but at least things won't
> break.
>
> > all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter
> It is possible to make some non-JSONSchema convention in the JSONSchema to
> map to more specific types.  This is done for example with format:
> date-time in our code, to map from a ISO-8601 string to a timestamp.  I
> just did a quick google to find some example of someone else already doing
> this and found this doc from IBM
>  saying
> they use JSONSchema's format to specify a float, like
>
>   type: number
>   format: float
>
> This seems like a pretty good idea to me, and we should probably do this
> at WMF too!  However, it would be a custom convention, and not in the
> JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
> have to codify this convention to do so (and nothing outside of your code
> would really respect it).
>
>
>
>
>
>
> On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
> wrote:
>
>> Yes, you are right. Schemas are not so nice in Json. When implementing
>> and testing my converter from DataType to JsonSchema I noticed that your
>> converter from JsonSchema to DataType converts number to double always. I
>> wonder, did you make this up? Because the table that specifies the
>> mapping
>> 
>>  only
>> does it for DataType -> JsonSchema.
>>
>> Its generally unfortunate that json schema only offers so little
>> possibility to specify type information… now when I have a Flink DataType
>> with all kinds of fields like double, float, big decimal… they all get
>> mapped to number by my converter - in return when I use yours they are all
>> mapped to a Flink Datatype double again. So I lose a lot of precision.
>>
>> I guess for my application it would in general be better to use Avro or
>> Protobuf, since they retain a lot more type information when you convert
>> them back and forth…
>> Also thanks for showing me your pattern with the SchemaConversions and
>> stuff. Feels pretty clean and worked like a charm :)
>>
>> -Theo
>>
>>
>> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>>
>> >  I find it interesting that the Mapping from DataType to AvroSchema
>> does exist in Flink (see AvroSchemaConverter), but for all the other
>> formats there is no such Mapping,
>> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
>> of course is JSONSchema, but it isn't a real java-y type system; it's just
>> more JSON for which there exist validators.
>>
>>
>>
>> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <
>> theo.wueb...@inside-m2m.de> wrote:
>>
>>> Great, I will have a closer look at what you sent. Your idea seems very
>>> good, it would be a very clean solution to be able to plug in different
>>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>>> try to implement it like this. I find it interesting that the Mapping from
>>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>>> for all the other formats there is no such Mapping. Maybe this would be
>>> something that would interest more people, so I when I am finished perhaps
>>> I can suggest putting the solution into the flink-json and flink-protobuf
>>> packages.
>>>
>>> -Theo
>>>
>>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>>
>>> Interesting, yeah I think you'll have to implement code to recurse
>>> through the (Row) DataType and somehow auto generate the JSONSchema you
>>> want.
>>>
>>> We abstracted the conversions from JSONSchema to other type systems in
>>> this JsonSchemaConverter
>>> .
>>> There's nothing special going on here, I've seen versions of this schema
>>> conversion code over and over again in different frameworks. This one just
>>> allows us to plug in a SchemaConversions
>>> 
>>>  implementation
>>> to provide 

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> Also thanks for showing me your pattern with the SchemaConversions and
stuff. Feels pretty clean and worked like a charm :)
Glad to hear it, that is very cool!

> converts number to double always. I wonder, did you make this up?
Yes, we chose the the mapping.  We chose to do number -> double and integer
-> bigint because both of those are wider than their float/int
counterparts, meaning that double and integer will work in more cases.  Of
course, this is not an optimal usage of bits, but at least things won't
break.

> all kinds of fields like double, float, big decimal… they all get mapped
to number by my converter
It is possible to make some non-JSONSchema convention in the JSONSchema to
map to more specific types.  This is done for example with format:
date-time in our code, to map from a ISO-8601 string to a timestamp.  I
just did a quick google to find some example of someone else already doing
this and found this doc from IBM
 saying
they use JSONSchema's format to specify a float, like

  type: number
  format: float

This seems like a pretty good idea to me, and we should probably do this at
WMF too!  However, it would be a custom convention, and not in the
JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
have to codify this convention to do so (and nothing outside of your code
would really respect it).






On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
wrote:

> Yes, you are right. Schemas are not so nice in Json. When implementing and
> testing my converter from DataType to JsonSchema I noticed that your
> converter from JsonSchema to DataType converts number to double always. I
> wonder, did you make this up? Because the table that specifies the mapping
> 
>  only
> does it for DataType -> JsonSchema.
>
> Its generally unfortunate that json schema only offers so little
> possibility to specify type information… now when I have a Flink DataType
> with all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter - in return when I use yours they are all
> mapped to a Flink Datatype double again. So I lose a lot of precision.
>
> I guess for my application it would in general be better to use Avro or
> Protobuf, since they retain a lot more type information when you convert
> them back and forth…
> Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
>
> -Theo
>
>
> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>
> >  I find it interesting that the Mapping from DataType to AvroSchema
> does exist in Flink (see AvroSchemaConverter), but for all the other
> formats there is no such Mapping,
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
> of course is JSONSchema, but it isn't a real java-y type system; it's just
> more JSON for which there exist validators.
>
>
>
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker 
> wrote:
>
>> Great, I will have a closer look at what you sent. Your idea seems very
>> good, it would be a very clean solution to be able to plug in different
>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>> try to implement it like this. I find it interesting that the Mapping from
>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>> for all the other formats there is no such Mapping. Maybe this would be
>> something that would interest more people, so I when I am finished perhaps
>> I can suggest putting the solution into the flink-json and flink-protobuf
>> packages.
>>
>> -Theo
>>
>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>
>> Interesting, yeah I think you'll have to implement code to recurse
>> through the (Row) DataType and somehow auto generate the JSONSchema you
>> want.
>>
>> We abstracted the conversions from JSONSchema to other type systems in
>> this JsonSchemaConverter
>> .
>> There's nothing special going on here, I've seen versions of this schema
>> conversion code over and over again in different frameworks. This one just
>> allows us to plug in a SchemaConversions
>> 
>>  implementation
>> to provide the mappings to the output type system (like the Flink DataType
>> mappings
>> 
>>  I
>> linked to 

Any way to improve list state get performance

2022-11-15 Thread tao xiao
Hi team,

I have a Flink job that joins two streams, let's say A and B streams,
followed by a key process function. In the key process function the job
inserts elements from B stream to a list state if element from A stream
hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
check if there are elements in the list state when A stream arrives to
reduce the call to underlying state (RocksDB)

Here is the code snippet

keyfunction {

process(in, ctx, collector) {
if (in is A stream)
// anyway to check if list state is empty so that we dont need to call
get()?
for (b : liststate.get()) {
.
}

if (in is B stream)
liststate.add(in)


-- 
Regards,
Tao


Re: Missing image tag in apache/flink repository ?

2022-11-15 Thread godfrey he
Thanks for reporting this, I will resolve it ASAP.

Best,
Godfrey

Alon Halimi via user  于2022年11月15日周二 16:46写道:
>
> Hello :)
>
>
>
> It seems the tag “apache/flink:1.16.0-scala_2.12” is missing – I get the 
> following error:
>
>
>
> failed to pull and unpack image "docker.io/apache/flink:1.16.0-scala_2.12"
>
>
>
>
>
> note that:
>
> (1) /apache/flink:1.16.0-scala_2.12 (without the 0 version suffix ) does exist
>
> (2) /flink:1.16.0-scala_2.12 (without apache prefix )does exist
>
>
>
> Thanks in advance
>
>
>
> Alon Halimi
>
> This message is intended only for the designated recipient(s). It may contain 
> confidential or proprietary information. If you are not the designated 
> recipient, you may not review, copy or distribute this message. If you have 
> mistakenly received this message, please notify the sender by a reply e-mail 
> and delete this message. Thank you.
>
> This message has been scanned for malware by Forcepoint. www.forcepoint.com


[SUMMARY] Flink 1.17 Release Sync 11/15/2022

2022-11-15 Thread Leonard Xu
Hi devs and users,

I’d like to share some highlights about the 1.17 release sync on 11/15/2022.

- Release tracking page:
 - The community has collected some great features on the 1.17 page[1]
 - @committers Please continuously update the page in the coming week
 
- JIRA account apply :
  - Martijn updated the issue tracking flow[2][3]
  - Users without JIRA account can follow this doc[2][3] to apply JIRA 
account as well join as Flink contributor

- Blockers:
- Blocker FLINK-29387 has been fixed
- PR for blocker FLINK-29315 is opened and waiting for review.
- Blocker FLINK-29818 is reopened, Yang Wang is looking into this ticket

- Build stability: Number of growing test stability issues with “Exit code 137 
errors”
- Matthias and Qingsheng investigated the memory issue due to multiple 
azure agents on one machine use too much resources
- We’ve reduced the agents number from 7 to 5, let’s keep an eyes on this 
issue.
- Leonard offered a workaround to skip slack clickable issue in slack 
#builds channel

The next release sync will be on November 29th, 2022.

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126  

Best regards,
Martijn, Qingsheng, Matthias and Leonard 

[1] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[2] https://flink.apache.org/community.html
[3] https://flink.apache.org/zh/community.html



Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-15 Thread Etienne Chauchot

Hi,

Any feedback on the interest of the API benchmark article below ?

Best

Etienne

Le 09/11/2022 à 12:18, Etienne Chauchot a écrit :


Hi,

And by the way, I was planing on writing another article to compare 
the performances of DataSet, DataStream and SQL APIs over TPCDS 
query3. I thought that I could run the pipelines on an Amazon EMR 
cluster with different data sizes 1GB, 100GB, 1TB.


Would it be worth it, what do you think ?

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :


Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :

Hi Etienne,

Very thanks for the article! Flink is currently indeed keeping 
increasing the
ability of unified batch / stream processing with the same api, and 
its a great
pleasure that more and more users are trying this functionality. But 
I also

have some questions regarding some details.

First IMO, as a whole for the long run Flink will have two unified 
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic 
with these two APIs

for both bounded and unbounded data processing.



Yes that is what I understood also throughout the discussions and 
jiras. And I also think IMHO that reducing the number of APIs to 2 
was the good move.




Underlying Flink provides two
execution modes:  the streaming mode works with both bounded and 
unbounded data,
and it executes in a way of incremental processing based on state; 
the batch mode works
only with bounded data, and it executes in a ways level-by-level 
similar to the traditional

batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode().


As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a 
matter of fact the streaming mode seems to be more efficient on my 
use case. I'll test with higher volumes to confirm.





Specially for DataStream, as implemented in FLIP-140, currently all 
the existing DataStream
operation supports the batch execution mode in a unified way[1]:  
data will be sorted for the
keyBy() edges according to the key, then the following operations 
like reduce() could receive
all the data belonging to the same key consecutively, then it could 
directly reducing the records
of the same key without maintaining the intermediate states. In this 
way users could write the

same code for both streaming and batch processing with the same code.



Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source 
to it.





# Regarding the migration of Join / Reduce

First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx)
directly, and  if batch execution mode is set, the reduce will not 
be executed in a incremental way,
instead is acts much  like sort-based  aggregation in the 
traditional batch processing framework.


Regarding Join, although the issue of FLINK-22587 indeed exists: 
current join has to be bound
to a window and the GlobalWindow does not work properly, but with 
some more try currently
it does not need users to re-write the whole join from scratch: 
Users could write a dedicated
window assigner that assigns all the  records to the same window 
instance  and return
EventTimeTrigger.create() as the default event-time trigger [2]. 
Then it works


source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply();

It does not requires records have event-time attached since the 
trigger of window is only
relying on the time range of the window and the assignment does not 
need event-time either.


The behavior of the join is also similar to sort-based join if batch 
mode is enabled.


Of course it is not easy to use to let users do the workaround and 
we'll try to fix this issue in 1.17.



Yes, this is a better workaround than the manual state-based join 
that I proposed. I tried it and it works perfectly with similar 
performance. Thanks.




# Regarding support of Sort / Limit

Currently these two operators are indeed not supported in the 
DataStream API directly. One initial
though for these two operations are that users may convert the 
DataStream to Table API and use

Table API for these two operators:

DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());



Yes I knew that workaround but I decided not to use it because I have 
a special SQL based implementation (for comparison reasons) so I did 
not want to mix SQL and DataStream APIs in the same pipeline.




How do you think about this option? We are also assessing if the 
combination of DataStream
API / Table API is sufficient for all the 

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Theodor Wübker
Yes, you are right. Schemas are not so nice in Json. When implementing and 
testing my converter from DataType to JsonSchema I noticed that your converter 
from JsonSchema to DataType converts number to double always. I wonder, did you 
make this up? Because the table that specifies the mapping 

 only does it for DataType -> JsonSchema. 

Its generally unfortunate that json schema only offers so little possibility to 
specify type information… now when I have a Flink DataType with all kinds of 
fields like double, float, big decimal… they all get mapped to number by my 
converter - in return when I use yours they are all mapped to a Flink Datatype 
double again. So I lose a lot of precision.

I guess for my application it would in general be better to use Avro or 
Protobuf, since they retain a lot more type information when you convert them 
back and forth…
Also thanks for showing me your pattern with the SchemaConversions and stuff. 
Feels pretty clean and worked like a charm :)

-Theo


> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
> 
> >  I find it interesting that the Mapping from DataType to AvroSchema does 
> > exist in Flink (see AvroSchemaConverter), but for all the other formats 
> > there is no such Mapping, 
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There of 
> course is JSONSchema, but it isn't a real java-y type system; it's just more 
> JSON for which there exist validators.  
> 
> 
> 
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker  > wrote:
> Great, I will have a closer look at what you sent. Your idea seems very good, 
> it would be a very clean solution to be able to plug in different 
> SchemaConversions that a (Row) DataType can be mapped to. I will probably try 
> to implement it like this. I find it interesting that the Mapping from 
> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but for 
> all the other formats there is no such Mapping. Maybe this would be something 
> that would interest more people, so I when I am finished perhaps I can 
> suggest putting the solution into the flink-json and flink-protobuf packages.
> 
> -Theo
> 
>> On 9. Nov 2022, at 21:24, Andrew Otto > > wrote:
>> 
>> Interesting, yeah I think you'll have to implement code to recurse through 
>> the (Row) DataType and somehow auto generate the JSONSchema you want.  
>> 
>> We abstracted the conversions from JSONSchema to other type systems in this 
>> JsonSchemaConverter 
>> .
>>   There's nothing special going on here, I've seen versions of this schema 
>> conversion code over and over again in different frameworks. This one just 
>> allows us to plug in a SchemaConversions 
>> 
>>  implementation to provide the mappings to the output type system (like the 
>> Flink DataType mappings 
>> 
>>  I linked to before), rather than hardcoding the output types.
>> 
>> If I were trying to do what you are doing (in our codebase)...I'd create a 
>> Flink DataTypeConverter that iterated through a (Row) DataType and a 
>> SchemaConversions implementation that mapped to the JsonNode that 
>> represented the JSONSchema.  (If not using Jackson...then you could use 
>> another Java JSON object than JsonNode).
>> You could also make a SchemaConversions (with whatever 
>> Protobuf class to use...I'm not familiar with Protobuf) and then use the 
>> same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd wonder 
>> if the input schema recursion code itself could be abstracted too so that it 
>> would work for either JsonSchema OR DataType OR whatever but anyway that is 
>> probably too crazy and too much for what you are doing...but it would be 
>> cool! :p
>> 
>> 
>> 
>> 
>> 
>> On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker > > wrote:
>> I want to register the result-schema in a schema registry, as I am pushing 
>> the result-data to a Kafka topic. The result-schema is not known at 
>> compile-time, so I need to find a way to compute it at runtime from the 
>> resulting Flink Schema.
>> 
>> -Theo
>> 
>> (resent - again sorry, I forgot to add the others in the cc)
>> 
>>> On 9. Nov 2022, at 14:59, Andrew Otto >> > wrote:
>>> 
>>> >  I want to convert the schema of a 

Missing image tag in apache/flink repository ?

2022-11-15 Thread Alon Halimi via user
Hello :)

It seems the tag "apache/flink:1.16.0-scala_2.12" is missing - I get the 
following error:

failed to pull and unpack image "docker.io/apache/flink:1.16.0-scala_2.12"


note that:
(1) /apache/flink:1.16.0-scala_2.12 (without the 0 version suffix ) does exist
(2) /flink:1.16.0-scala_2.12 (without apache prefix )does exist

Thanks in advance

Alon Halimi
This message is intended only for the designated recipient(s). It may contain 
confidential or proprietary information. If you are not the designated 
recipient, you may not review, copy or distribute this message. If you have 
mistakenly received this message, please notify the sender by a reply e-mail 
and delete this message. Thank you.


This message has been scanned for malware by Forcepoint. www.forcepoint.com


Kafka transactioins & flink checkpoints

2022-11-15 Thread Vishal Surana
I wanted to achieve exactly once semantics in my job and wanted to make sure I 
understood the current behaviour correctly:

1. Only one Kafka transaction at a time (no concurrent checkpoints)
2. Only one transaction per checkpoint


My job has very large amount of state (>100GB) and I have no option but to use 
unaligned checkpoints. With the above limitation, it seems to me that if 
checkpoint interval is 1 minute and checkpoint takes about 10 seconds to 
complete then only one Kafka transaction can happen in 70 seconds. All of the 
output records will not be visible until the transaction completes. This way a 
steady stream of inputs will result in an buffered output stream where data is 
only visible after a minute, thereby destroying any sort of real time streaming 
use cases. Reducing the checkpoint interval is not really an option given the 
size of the checkpoint. Only way out would be to allow for multiple 
transactions per checkpoint.

Thanks,
Vishal