Re: Bug report for reading Hive table as streaming source.

2024-04-15 Thread Xiaolong Wang
Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35118

On Mon, Apr 15, 2024 at 12:03 PM Xiaolong Wang 
wrote:

> Sure
>
> On Mon, Apr 1, 2024 at 9:28 AM yuxia  wrote:
>
>> Thanks for reporting. Could you please help create a jira about it?
>>
>> Best regards,
>> Yuxia
>>
>> - 原始邮件 -
>> 发件人: "Xiaolong Wang" 
>> 收件人: "dev" 
>> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20
>> 主题: Re: Bug report for reading Hive table as streaming source.
>>
>> I think it worth mentioning in the documentation of Hive read that it
>> cannot read a table that has more than 32,767 partitions.
>>
>> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com>
>> wrote:
>>
>> > Found out the reason:
>> >
>> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
>> > partitions using the following method:
>> >
>> >   List listPartitionNames(String db_name, String tbl_name,
>> > short max_parts) throws MetaException, TException;
>> >
>> > where the max_parts represents the max number of partitions it can fetch
>> > from the Hive metastore.
>> > So the max number of partitions it can fetch is Short.MAX_VALUE, which
>> is
>> > 32767 .
>> >
>> > But the table has a way more partition number than the max value, thus
>> the
>> > list partition operations cannot fetch all partitions, hence it cannot
>> > consume the recent partition.
>> >
>> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I found a weird bug when reading a Hive table as a streaming source.
>> >>
>> >> In summary, if the first partition is not time related, then the Hive
>> >> table cannot be read as a streaming source.
>> >>
>> >> e.g.
>> >>
>> >> I've a Hive table in the definition of
>> >>
>> >> ```
>> >> CREATE TABLE article (
>> >> id BIGINT,
>> >> edition STRING,
>> >> dt STRING,
>> >> hh STRING
>> >> )
>> >> PARTITIONED BY (edition, dt, hh)
>> >> USING orc;
>> >> ```
>> >> Then I try to query it as a streaming source:
>> >>
>> >> ```
>> >> INSERT INTO kafka_sink
>> >> SELECT id
>> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> >> 'streaming-source.partition-order' = 'partition-name',
>> >> 'streaming-source.consume-start-offset' =
>> >> 'edition=en_US/dt=2024-03-26/hh=00') */
>> >> ```
>> >>
>> >> And I see no output in the `kafka_sink`.
>> >>
>> >> Then I defined an external table pointing to the same path but has no
>> >> `edition` partition,
>> >>
>> >> ```
>> >> CREATE TABLE en_article (
>> >> id BIGINT,
>> >> edition STRING,
>> >> dt STRING,
>> >> hh STRING
>> >> )
>> >> PARTITIONED BY (edition, dt, hh)
>> >> LOCATION 's3://xxx/article/edition=en_US'
>> >> USING orc;
>> >> ```
>> >>
>> >> And insert with the following statement:
>> >>
>> >> ```
>> >> INSERT INTO kafka_sink
>> >> SELECT id
>> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> >> 'streaming-source.partition-order' = 'partition-name',
>> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> >> ```
>> >>
>> >> The data is sinked.
>> >>
>> >>
>> >
>>
>


Re: Bug report for reading Hive table as streaming source.

2024-04-14 Thread Xiaolong Wang
Sure

On Mon, Apr 1, 2024 at 9:28 AM yuxia  wrote:

> Thanks for reporting. Could you please help create a jira about it?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Xiaolong Wang" 
> 收件人: "dev" 
> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20
> 主题: Re: Bug report for reading Hive table as streaming source.
>
> I think it worth mentioning in the documentation of Hive read that it
> cannot read a table that has more than 32,767 partitions.
>
> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang  >
> wrote:
>
> > Found out the reason:
> >
> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> > partitions using the following method:
> >
> >   List listPartitionNames(String db_name, String tbl_name,
> > short max_parts) throws MetaException, TException;
> >
> > where the max_parts represents the max number of partitions it can fetch
> > from the Hive metastore.
> > So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> > 32767 .
> >
> > But the table has a way more partition number than the max value, thus
> the
> > list partition operations cannot fetch all partitions, hence it cannot
> > consume the recent partition.
> >
> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <
> xiaolong.w...@smartnews.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I found a weird bug when reading a Hive table as a streaming source.
> >>
> >> In summary, if the first partition is not time related, then the Hive
> >> table cannot be read as a streaming source.
> >>
> >> e.g.
> >>
> >> I've a Hive table in the definition of
> >>
> >> ```
> >> CREATE TABLE article (
> >> id BIGINT,
> >> edition STRING,
> >> dt STRING,
> >> hh STRING
> >> )
> >> PARTITIONED BY (edition, dt, hh)
> >> USING orc;
> >> ```
> >> Then I try to query it as a streaming source:
> >>
> >> ```
> >> INSERT INTO kafka_sink
> >> SELECT id
> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
> >> 'streaming-source.partition-order' = 'partition-name',
> >> 'streaming-source.consume-start-offset' =
> >> 'edition=en_US/dt=2024-03-26/hh=00') */
> >> ```
> >>
> >> And I see no output in the `kafka_sink`.
> >>
> >> Then I defined an external table pointing to the same path but has no
> >> `edition` partition,
> >>
> >> ```
> >> CREATE TABLE en_article (
> >> id BIGINT,
> >> edition STRING,
> >> dt STRING,
> >> hh STRING
> >> )
> >> PARTITIONED BY (edition, dt, hh)
> >> LOCATION 's3://xxx/article/edition=en_US'
> >> USING orc;
> >> ```
> >>
> >> And insert with the following statement:
> >>
> >> ```
> >> INSERT INTO kafka_sink
> >> SELECT id
> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
> >> 'streaming-source.partition-order' = 'partition-name',
> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
> >> ```
> >>
> >> The data is sinked.
> >>
> >>
> >
>


Re: Bug report for reading Hive table as streaming source.

2024-03-31 Thread yuxia
Thanks for reporting. Could you please help create a jira about it?

Best regards,
Yuxia

- 原始邮件 -
发件人: "Xiaolong Wang" 
收件人: "dev" 
发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20
主题: Re: Bug report for reading Hive table as streaming source.

I think it worth mentioning in the documentation of Hive read that it
cannot read a table that has more than 32,767 partitions.

On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang 
wrote:

> Found out the reason:
>
> It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> partitions using the following method:
>
>   List listPartitionNames(String db_name, String tbl_name,
> short max_parts) throws MetaException, TException;
>
> where the max_parts represents the max number of partitions it can fetch
> from the Hive metastore.
> So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> 32767 .
>
> But the table has a way more partition number than the max value, thus the
> list partition operations cannot fetch all partitions, hence it cannot
> consume the recent partition.
>
> On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I found a weird bug when reading a Hive table as a streaming source.
>>
>> In summary, if the first partition is not time related, then the Hive
>> table cannot be read as a streaming source.
>>
>> e.g.
>>
>> I've a Hive table in the definition of
>>
>> ```
>> CREATE TABLE article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> USING orc;
>> ```
>> Then I try to query it as a streaming source:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' =
>> 'edition=en_US/dt=2024-03-26/hh=00') */
>> ```
>>
>> And I see no output in the `kafka_sink`.
>>
>> Then I defined an external table pointing to the same path but has no
>> `edition` partition,
>>
>> ```
>> CREATE TABLE en_article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> LOCATION 's3://xxx/article/edition=en_US'
>> USING orc;
>> ```
>>
>> And insert with the following statement:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> ```
>>
>> The data is sinked.
>>
>>
>


Re: Bug report for reading Hive table as streaming source.

2024-03-28 Thread Xiaolong Wang
I think it worth mentioning in the documentation of Hive read that it
cannot read a table that has more than 32,767 partitions.

On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang 
wrote:

> Found out the reason:
>
> It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> partitions using the following method:
>
>   List listPartitionNames(String db_name, String tbl_name,
> short max_parts) throws MetaException, TException;
>
> where the max_parts represents the max number of partitions it can fetch
> from the Hive metastore.
> So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> 32767 .
>
> But the table has a way more partition number than the max value, thus the
> list partition operations cannot fetch all partitions, hence it cannot
> consume the recent partition.
>
> On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I found a weird bug when reading a Hive table as a streaming source.
>>
>> In summary, if the first partition is not time related, then the Hive
>> table cannot be read as a streaming source.
>>
>> e.g.
>>
>> I've a Hive table in the definition of
>>
>> ```
>> CREATE TABLE article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> USING orc;
>> ```
>> Then I try to query it as a streaming source:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' =
>> 'edition=en_US/dt=2024-03-26/hh=00') */
>> ```
>>
>> And I see no output in the `kafka_sink`.
>>
>> Then I defined an external table pointing to the same path but has no
>> `edition` partition,
>>
>> ```
>> CREATE TABLE en_article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> LOCATION 's3://xxx/article/edition=en_US'
>> USING orc;
>> ```
>>
>> And insert with the following statement:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> ```
>>
>> The data is sinked.
>>
>>
>


Re: Bug report for reading Hive table as streaming source.

2024-03-28 Thread Xiaolong Wang
Found out the reason:

It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
partitions using the following method:

  List listPartitionNames(String db_name, String tbl_name,
short max_parts) throws MetaException, TException;

where the max_parts represents the max number of partitions it can fetch
from the Hive metastore.
So the max number of partitions it can fetch is Short.MAX_VALUE, which is
32767 .

But the table has a way more partition number than the max value, thus the
list partition operations cannot fetch all partitions, hence it cannot
consume the recent partition.

On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang 
wrote:

> Hi,
>
> I found a weird bug when reading a Hive table as a streaming source.
>
> In summary, if the first partition is not time related, then the Hive
> table cannot be read as a streaming source.
>
> e.g.
>
> I've a Hive table in the definition of
>
> ```
> CREATE TABLE article (
> id BIGINT,
> edition STRING,
> dt STRING,
> hh STRING
> )
> PARTITIONED BY (edition, dt, hh)
> USING orc;
> ```
> Then I try to query it as a streaming source:
>
> ```
> INSERT INTO kafka_sink
> SELECT id
> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
> 'streaming-source.partition-order' = 'partition-name',
> 'streaming-source.consume-start-offset' =
> 'edition=en_US/dt=2024-03-26/hh=00') */
> ```
>
> And I see no output in the `kafka_sink`.
>
> Then I defined an external table pointing to the same path but has no
> `edition` partition,
>
> ```
> CREATE TABLE en_article (
> id BIGINT,
> edition STRING,
> dt STRING,
> hh STRING
> )
> PARTITIONED BY (edition, dt, hh)
> LOCATION 's3://xxx/article/edition=en_US'
> USING orc;
> ```
>
> And insert with the following statement:
>
> ```
> INSERT INTO kafka_sink
> SELECT id
> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
> 'streaming-source.partition-order' = 'partition-name',
> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
> ```
>
> The data is sinked.
>
>