Re: Reading Parquet file with array of structs cause error

2022-11-15 Thread Benenson, Michael via user
Thanks, Jing


Do you know, if this problem will be addressed in 
FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or  it deserve a 
separate Jira?


From: Jing Ge 
Date: Tuesday, November 15, 2022 at 3:39 PM
To: Benenson, Michael 
Cc: user@flink.apache.org , Deshpande, Omkar 
, Vora, Jainik 
Subject: Re: Reading Parquet file with array of structs cause error
This email is from an external sender.

Hi Michael,

Currently, ParquetColumnarRowInputFormat does not support schemas with nested 
columns. If your parquet file stores Avro records. You might want to try e.g. 
Avro Generic record[1].

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record

Best regards,
Jing


On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user 
mailto:user@flink.apache.org>> wrote:
Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]
I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

• the problem, described in 
FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or

• something new, that deserve a separate Jira, or

• something wrong with my code?

[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
||-- extendedProperties: array (nullable = true)
|||-- element: struct (containsNull = true)
||||-- key: string (nullable = true)
||||-- value: string (nullable = true)
||-- sourceAccountId: string (nullable = true)
||-- sourceEntityId: string (nullable = true)
||-- sourceEntityType: string (nullable = true)
||-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[] {
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)
},
new String[] {
"key",
"value"
}
);

RowType element = RowType.of(
new LogicalType[] { elementType },
new String[] { "element" }
);

RowType sourceEntity = RowType.of(
new LogicalType[] {
new ArrayType(element),
new VarCharType(),
new VarCharType(),
new VarCharType(),
new VarCharType(),
},
new String[] {
"extendedProperties",
"sourceAccountId",
"sourceEntityId",
"sourceEntityType",
"sourceSystem"
}
);

return  RowType.of(
new LogicalType[] {
new DecimalType(),
new VarCharType(),
sourceEntity
},
new String[] {
"amount",
"connectionAccountId",
"sourceEntity",
});
}

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:28

Reading Parquet file with array of structs cause error

2022-11-15 Thread Benenson, Michael via user
Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]

I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

· the problem, described in 
FLINK-28867 or

· something new, that deserve a separate Jira, or

· something wrong with my code?


[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
||-- extendedProperties: array (nullable = true)
|||-- element: struct (containsNull = true)
||||-- key: string (nullable = true)
||||-- value: string (nullable = true)
||-- sourceAccountId: string (nullable = true)
||-- sourceEntityId: string (nullable = true)
||-- sourceEntityType: string (nullable = true)
||-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

static RowType getSchema()
{
RowType elementType = RowType.of(
new LogicalType[] {
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)
},
new String[] {
"key",
"value"
}
);

RowType element = RowType.of(
new LogicalType[] { elementType },
new String[] { "element" }
);

RowType sourceEntity = RowType.of(
new LogicalType[] {
new ArrayType(element),
new VarCharType(),
new VarCharType(),
new VarCharType(),
new VarCharType(),
},
new String[] {
"extendedProperties",
"sourceAccountId",
"sourceEntityId",
"sourceEntityType",
"sourceSystem"
}
);

return  RowType.of(
new LogicalType[] {
new DecimalType(),
new VarCharType(),
sourceEntity
},
new String[] {
"amount",
"connectionAccountId",
"sourceEntity",
});
}

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
at java.base/java.util.Objects.checkIndex(Objects.java:372)
at java.base/java.util.ArrayList.get(ArrayList.java:459)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 common frames omitted

Thanks



part-00121.parquet
Description: part-00121.parquet



Re: HybridSource permanently failed after restoring from checkpoint

2022-08-04 Thread Benenson, Michael via user
I have created Jira https://issues.apache.org/jira/browse/FLINK-28817

Please, assign it to me, I have a fix that I’m testing in my environment

From: Benenson, Michael via user 
Date: Tuesday, August 2, 2022 at 5:47 PM
To: mas.chen6...@gmail.com , user@flink.apache.org 

Cc: Deshpande, Omkar , Rosensweig, JD 
, Sana, Harish 
Subject: HybridSource permanently failed after restoring from checkpoint
This email is from an external sender.

Hi, folks

I’m running Flink application that use HybridSource, patched with fixes 
FLINK-27479 and FLINK-27529

This application use HybridSource and presto plugin to read from a few 
thousands s3 directories, and then switch to reading from Kafka.

Reading from s3 could cause intermittent errors, that usually are fixed after 
retrying, but there is a problem, when Flink try to recover from this failure 
and restart from checkpoint:
java.lang.NullPointerException: Source for index=0 not available
 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

Complete scenario:


1.   CheckpointCoordinator - Completed checkpoint 14 for job 


2.   HybridSource successfully completed processing a few SourceFactories, 
that reads from s3

3.   Next SourceFactory try to read contents of s3 dir, and it cause an 
error Unable to execute HTTP request: Read timed out

4.   CheckpointCoordinator - Restoring job  
from Checkpoint 14

5.   HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47

6.   This restoring fail, because of NullPointerException: in 
HybridSourceSplitEnumerator.close

7.   Again, CheckpointCoordinator trying to - Restoring job 
 from Checkpoint 14

8.   It causes

2022/08/02 22:26:52.469 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: hybrid-source while 
handling operator event 
SourceEventWrapper[SourceReaderFinishedEvent{sourceIndex=-1}] from subtask 10. 
Triggering job failover.

java.lang.NullPointerException: Source for index=0 not available

 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)

 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:149)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:223)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:750)

10.  And this pattern continues forever: Flink try restoring from checkpoint, 
but it fails, because of NullPointerException: Source for index=0 not available


Any idea, what could be the cause of the problem? Could some experts in 
HybridSource look at the issue?

I have attached extract of JobMgr log, that contains related information, I can 
send complete log, but its size is a few M.

The problem is reproducible, after a few hours run in my environment.

And I think we need Jira for this issue, could someone, please, create it?



HybridSource permanently failed after restoring from checkpoint

2022-08-02 Thread Benenson, Michael via user
Hi, folks

I’m running Flink application that use HybridSource, patched with fixes 
FLINK-27479 and FLINK-27529

This application use HybridSource and presto plugin to read from a few 
thousands s3 directories, and then switch to reading from Kafka.

Reading from s3 could cause intermittent errors, that usually are fixed after 
retrying, but there is a problem, when Flink try to recover from this failure 
and restart from checkpoint:
java.lang.NullPointerException: Source for index=0 not available
 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

Complete scenario:


  1.  CheckpointCoordinator - Completed checkpoint 14 for job 

  2.  HybridSource successfully completed processing a few SourceFactories, 
that reads from s3
  3.  Next SourceFactory try to read contents of s3 dir, and it cause an error 
Unable to execute HTTP request: Read timed out
  4.  CheckpointCoordinator - Restoring job  
from Checkpoint 14
  5.  HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47
  6.  This restoring fail, because of NullPointerException: in 
HybridSourceSplitEnumerator.close
  7.  Again, CheckpointCoordinator trying to - Restoring job 
 from Checkpoint 14
  8.  It causes

2022/08/02 22:26:52.469 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: hybrid-source while 
handling operator event 
SourceEventWrapper[SourceReaderFinishedEvent{sourceIndex=-1}] from subtask 10. 
Triggering job failover.

java.lang.NullPointerException: Source for index=0 not available

 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)

 at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:149)

 at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:223)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)

 at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:750)

10.  And this pattern continues forever: Flink try restoring from checkpoint, 
but it fails, because of NullPointerException: Source for index=0 not available


Any idea, what could be the cause of the problem? Could some experts in 
HybridSource look at the issue?

I have attached extract of JobMgr log, that contains related information, I can 
send complete log, but its size is a few M.

The problem is reproducible, after a few hours run in my environment.

And I think we need Jira for this issue, could someone, please, create it?



bf-29-JM-err-analysis.log
Description: bf-29-JM-err-analysis.log


Re: Hybrid Source stop processing files after processing 128 SourceFactories

2022-07-26 Thread Benenson, Michael via user
Hi, Mason

Think, the problem is related to 
https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358

if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {

Integer is Java object, so comparison should use ‘equals()’, not ‘==’
It is Java, not Scala 


Could you please fix this issue in your fix for 
https://issues.apache.org/jira/browse/FLINK-27479?



And thanks for the original fix.

From: Mason Chen 
Date: Tuesday, July 26, 2022 at 9:57 PM
To: Benenson, Michael 
Cc: user@flink.apache.org , Deshpande, Omkar 
, Rosensweig, JD , Sana, 
Harish 
Subject: Re: Hybrid Source stop processing files after processing 128 
SourceFactories
This email is from an external sender.

Hi Michael,

I'm glad the CPU fix works for you!

Regarding the behavior, HybridSource should only consume from Kafka after it 
finishes the bounded read of the files. At that time, files will not be read 
anymore. In addition, there is no limitation where there can only be 128 source 
factories (the upper limit should be integer max).

Can you give more details on how the HybridSource is configured? Are all 
sources unbounded? When you say it stopped processing files, does this mean it 
stops reading from Kafka too? How do you know the program is stalling? Is the 
metric numRecordsInPerSecond from the source operator 0?

Best,
Mason

On Mon, Jul 25, 2022 at 7:52 PM Benenson, Michael 
mailto:mikhail_benen...@intuit.com>> wrote:
Hi, folks

I have tried fix FLINK-27479 
for Hybrid Source from https://github.com/apache/flink/pull/20215  in Flink 14.3

It works fine, but Flink stops processing files after processing 128 
SourceFactories. I have run this program a few times, starting without 
savepoint, and each time the program hangs up, after processing 128 
SourceFactories. Program does not crash or terminate, but stop processing files.

My program is like the Hybrid source example: reading multiple files, and then 
reading from Kafka

In my case program reads a few hundred directories from s3, that contains 
snappy files, so for each directory it creates separate 
HybridSource.SourceFactory, and the last one is the SourceFactory for reading 
from Kafka.

Any idea, what could be wring? Is it a known restriction, that there should be 
no more than 128 Source Factories?
I have the program running now, so I could collect any additional info to 
clarify the cause of the problem.

Here are the last few lines from JobManager before program stop processing 
files.

2022/07/26 01:02:35.248 INFO  o.a.f.c.f.s.i.StaticFileSplitEnumerator - No more 
splits available for subtask 0
2022/07/26 01:02:36.249 INFO  c.i.strmprocess.hybrid.ReadS3Hybrid1 - Reading 
input data from path 
s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800
 for 2022-07-15T08:00:00Z
2022/07/26 01:02:36.618 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator - 
Starting enumerator for sourceIndex=128
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 2
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1




Hybrid Source stop processing files after processing 128 SourceFactories

2022-07-25 Thread Benenson, Michael via user
Hi, folks

I have tried fix FLINK-27479 
for Hybrid Source from https://github.com/apache/flink/pull/20215  in Flink 14.3

It works fine, but Flink stops processing files after processing 128 
SourceFactories. I have run this program a few times, starting without 
savepoint, and each time the program hangs up, after processing 128 
SourceFactories. Program does not crash or terminate, but stop processing files.

My program is like the Hybrid source example: reading multiple files, and then 
reading from Kafka

In my case program reads a few hundred directories from s3, that contains 
snappy files, so for each directory it creates separate 
HybridSource.SourceFactory, and the last one is the SourceFactory for reading 
from Kafka.

Any idea, what could be wring? Is it a known restriction, that there should be 
no more than 128 Source Factories?
I have the program running now, so I could collect any additional info to 
clarify the cause of the problem.

Here are the last few lines from JobManager before program stop processing 
files.

2022/07/26 01:02:35.248 INFO  o.a.f.c.f.s.i.StaticFileSplitEnumerator - No more 
splits available for subtask 0
2022/07/26 01:02:36.249 INFO  c.i.strmprocess.hybrid.ReadS3Hybrid1 - Reading 
input data from path 
s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800
 for 2022-07-15T08:00:00Z
2022/07/26 01:02:36.618 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator - 
Starting enumerator for sourceIndex=128
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 2
2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source Source: 
hybrid-source received split request from parallel task 1