??????Fiink-sql????????????????????????????

2022-02-21 Thread ?Y??????????????????
?? flink-table-runtime-blink # 
org.apache.flink.table.runtime.functions.SqlFunctionUtils
??demo




----
??: "??"

Fiink-sql的官方函数的代码哪里可以看到

2022-02-21 Thread 王宇航
Hi:
  经常会用到flink-sql的内置函数,因为官方函数比自己写的UDF更加健壮,想学习下官方函数是怎么写的,请问在哪一块能看到这个代码呢?

Re:hive 进行 overwrite 合并数据后文件变大?

2022-02-21 Thread 周瑞
是不是数据重复了,如果是ORC格式可以尝试执行alter table table_name partition (pt_dt='2021-02-20') 
concatenate 语句进行小文件的合并。


--Original--
From: "RS"; 
Date: 2022年2月22日(星期二) 上午9:36
To: "user-zh"; 
Subject: hive 进行 overwrite 合并数据后文件变大?


Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M 1.5 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M 822.1 M /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M 144.2 M 
/user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M 1.5 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G 8.7 G 
/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M 142.9 M 
/user/hive/warehouse/ods.db/table1/date=2022-02-22

Re: [Table API] [JDBC CDC] Caching Configuration from MySql instance needed in multiple Flink Jobs

2022-02-21 Thread Leonard Xu
Hello, Dan

> 2022年2月21日 下午9:11,Dan Serb  写道:
> 1.Have a processor that uses Flink JDBC CDC Connector over the table that 
> stores the information I need. (This is implemented currently - working)

You mean you’ve implemented a Flink JDBC Connector? Maybe the Flink CDC 
Connectors[1] would help you.


> 2.Find a way to store that Stream Source inside a table inside Flink. (I 
> tried with the approach to create a MySql JDBC Catalog – but apparently, I 
> can only create Postgres Catalog programmatically) – This is the question – 
> What api do I need to use to facilitate saving inside Flink in a SQL Table, 
> the data retrieved by the CDC Source?
>  3.The solution from point 2. Needs to be done in a way that I can query that 
> table, for each record I receive in a different Job that has a Kafka Source 
> as the entrypoint.


The Flink JDBC Catalog only provides the Postgres implementation, you need to 
implement your Catalog e.g mysql catalog which provides a CDC TableSource, you 
can encapsulate a mysql-cdc source[2] in your catalog implementation

> I’m just worried that I might need to reuse this data sets from the sql 
> database in future jobs, so this is why I’d like to have something decoupled 
> and available for the entire cluster.

If you want to reuse the data set for avoiding capturing the database table 
multiple times,  you can send the CDC data to message queue like Kafka/Pulsar 
and then consume the changelogs from message queue in different Flink jobs. 

Hope above information can help you. 

Best,
Leonard
[1]https://ververica.github.io/flink-cdc-connectors/master/content/about.html
[2] 
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java




hive 进行 overwrite 合并数据后文件变大?

2022-02-21 Thread RS
Hi,
flink写hive任务,checkpoint周期配置的比较短,生成了很多小文件,一天一个目录,
然后我调用flink sql合并之前的数据,跑完之后,发现存储变大了,请教下这个是什么原因导致的?
合并之前是很多小part文件,overwrite之后文件减少了,但是存储变大了,从274MB变大成2.9GB了?


hive表table1的分区字段是`date`
insert overwrite aw_topic_compact select * from `table1` where 
`date`='2022-02-21';


合并前:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
274.0 M  822.1 M  /user/hive/warehouse/ods.db/table1/date=2022-02-21
48.1 M   144.2 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



合并后:
514.0 M  1.5 G/user/hive/warehouse/ods.db/table1/date=2022-02-20
2.9 G8.7 G/user/hive/warehouse/ods.db/table1/date=2022-02-21
47.6 M   142.9 M  /user/hive/warehouse/ods.db/table1/date=2022-02-22



Re: Cannot upgrade helm chart

2022-02-21 Thread Austin Cawley-Edwards
Hey Marco,

There’s unfortunately no perfect fit here, at least that I know of. A
Deployment will make it possible to upgrade the image, but does not support
container exits (eg if the Flink job completes, even successfully, K8s will
still restart the container). If you are only running long lived streaming
jobs, this may be acceptable for you, but makes it difficult to stop the
job with a final savepoint (since it will exit after completion).

What you could look into with the Job approach is building an upgrade
procedure that takes the final savepoint, allows the Job to exit, then
deploys a new helm release with the upgraded image and savepoint path. It
is more expensive, but may be more flexible.


Hope that helps,
Austin

On Mon, Feb 21, 2022 at 2:40 PM marco andreas 
wrote:

> Hello flink community,
>
> I am deploying a flink application cluster using a helm chart , the
> problem is that the jobmanager component type is a "Job" , and with helm i
> can't do an upgrade of the chart in order to change the application image
> version  because helm is unable to upgrade the docker image of the kind
> "Job" so i am wondering if i can change the jobmanager kind to a
> "deployment" and if there are any drawbacks in doing do.
>
> Thanks.
>


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Ananth Gundabattula
Thanks a lot Yufei and Wong.

I was able to get a version working by combining both the aspects mentioned in 
each of your responses.


  1.  Trying the sample code base that Wong mentioned below resulted in a 
no-response from JobManager. I had to use the non-sql connector jar in my 
python script to get around this exception.
  2.  I still had to copy the flink-sql-pulsar-connector in the lib folder of 
FLINK_HOME and had to add the jar flink-pulsar-connector.jar in the client 
side.  In my previous tests, I was not doing both at the same time. Specifying 
the flink-pulsar-connector.jar jar in the client side overcomes the 
serialization issue that Yufei hypothesised as the root cause.
  3.  Not adding the flink-sql-pulsar-connector jar in FLINK_HOME/lib resulted 
in an Exception because the variable flinkSchema (in the Pulsar connector java 
code base) which is a static field does not seem to have been initialized if 
only added as an env jar in the client side.
  4.  If I add just the flink-pulsar-connector (non-sql one) in the 
FLINK_HOME/lib, there are exceptions related to missing Pulsar classes (Not 
Under the Flink code-base). I guess using the flink-sql-pulsar-connector.jar 
gives me the advantage of having the pulsar classes (flink and non-flink) 
resolvable via a single “uber” jar.


A point to note is that when the subscription type is set to “Shared”, the 
client seems to “hang” when running in standalone mode with a topic that has a 
single partition. Perhaps this is because there is only one partition in my 
topic. Will test with larger partitions to confirm if this is indeed a bug.



Regards,
Ananth

From: Luning Wong 
Date: Monday, 21 February 2022 at 10:45 pm
To: user@flink.apache.org , Ananth Gundabattula 

Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.


Luning Wong mailto:gfen...@gmail.com>> 于2022年2月21日周一 19:38写道:
import logging
import sys

from pyflink.common import SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import PulsarSource, 
PulsarDeserializationSchema, SubscriptionType
from pyflink.common.typeinfo import Types

def foo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars('file:///Users/a/src/me/flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15-SNAPSHOT.jar')
deserialization_schema = 
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())
# deserialization_schema = 
PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)

ps = PulsarSource.builder()\
.set_deserialization_schema(deserialization_schema)\
.set_service_url('pulsar://localhost:6650')\
.set_admin_url('http://localhost:8080')\
.set_topics('ada')\
.set_subscription_name('axcsdas')\
.set_subscription_type(SubscriptionType.Exclusive)\
.build()

kafka_source = env.from_source(
source=ps,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="pulsar_source"
)
kafka_source.print()
env.execute('pulsar_source')

if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
foo()

The above is my test script. it is successful to submit a job to a standalone 
cluster.

Best,
Wong
Yufei Zhang mailto:affei...@gmail.com>> 于2022年2月21日周一 
18:33写道:
Hi Ananth,


From the steps you described, the steps involved using 
`flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge pulsar 
connector has not supported Table API yet, so would you mind considering using 
the  `flink-connector-pulsar-1.14.jar` (without sql, though the classes should 
be the same. 1.14 is also the stable version) . Since it failed to submit, I'm 
wildly guessing it's because some class not found issue prevented the 
serialization before submitting.

Also, you mentioned "Get a “transactions not enabled” error in spite of 
enabling transactions in 2.8.0 broker" this is interesting. To use 
transactions, not only do we need to enable transactions in the broker, but 
also in the pulsar source connector as well. Please refer to 
PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a call 
PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true) 
would suffice)


Thank you for your report and I think since you have these detailed steps to 
reproduce, I'd recommend submitting a JIRA ticket and we'll try to reproduce 
the issue you just described in the coming days to find the exact cause. Thank 
you so much for your precise steps to reproduce.

Cheers,
Yufei.

On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Thanks Guowei.

A small correction in the telnet result command below. I had a typo in the 
telnet command earlier (did not separate the port from host name ). Issuing the 
proper telnet command resolved the jobmanagers host properly.

Regards,
Ananth

From: Guowei Ma 

Cannot upgrade helm chart

2022-02-21 Thread marco andreas
Hello flink community,

I am deploying a flink application cluster using a helm chart , the problem
is that the jobmanager component type is a "Job" , and with helm i can't do
an upgrade of the chart in order to change the application image version
because helm is unable to upgrade the docker image of the kind "Job" so i
am wondering if i can change the jobmanager kind to a "deployment" and if
there are any drawbacks in doing do.

Thanks.


Re: Apache Flink - Continuously streaming data using jdbc connector

2022-02-21 Thread M Singh
 Thanks Guovei and Francis for your references.   
On Monday, February 21, 2022, 01:05:58 AM EST, Guowei Ma 
 wrote:  
 
 Hi,
You can try flink's cdc connector [1] to see if it meets your needs.

[1] https://github.com/ververica/flink-cdc-connectors

Best,Guowei

On Mon, Feb 21, 2022 at 6:23 AM M Singh  wrote:

Hi Folks:
I am trying to monitor a jdbc source and continuously streaming data in an 
application using the jdbc connector.  However, the application stops after 
reading the data in the table.
I've checked the docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
 and it looks like there is a streaming sink but the sources are scan and 
lookup only.  I've also checked the connector settings but could not find any 
flag for continuous monitoring.
Can you please let me know if there is setting in the connector or advice to 
make the jdbc connector source streaming data continuously ?
Thanks for your help.
Mans
  

[Table API] [JDBC CDC] Caching Configuration from MySql instance needed in multiple Flink Jobs

2022-02-21 Thread Dan Serb
Hello all,

I kind of need the community’s help with some ideas, as I’m quite new with 
Flink and I feel like I need a little bit of guidance in regard to an 
implementation I’m working on.

What I need to do, is to have a way to store a mysql table in Flink, and expose 
that data to other jobs, as I need to query the data i to enrich some records 
received on a Kafka Source.

The initial solution, I’m working now is:


  1.  Have a processor that uses Flink JDBC CDC Connector over the table that 
stores the information I need. (This is implemented currently - working)
  2.  Find a way to store that Stream Source inside a table inside Flink. (I 
tried with the approach to create a MySql JDBC Catalog – but apparently, I can 
only create Postgres Catalog programmatically) – This is the question – What 
api do I need to use to facilitate saving inside Flink in a SQL Table, the data 
retrieved by the CDC Source?
  3.  The solution from point 2. Needs to be done in a way that I can query 
that table, for each record I receive in a different Job that has a Kafka 
Source as the entrypoint.

I was thinking about having the CDC Source inside the job that has the Kafka 
source, and I’m going to test if this is feasible as we speak, but the idea is 
that I need to get some information from the MySql database, each time I 
process one record from the Kafka source – will this be a good option if I’m 
able to persist the data into a temporary view inside the processor? I’m just 
worried that I might need to reuse this data sets from the sql database in 
future jobs, so this is why I’d like to have something decoupled and available 
for the entire cluster.

Like I said I’m new to Flink and it’s proven quite difficult for me to 
understand exactly what would be the best solution to use in my situation, this 
is the reason why I’m asking users that might have more experience with this 
and that might have had the same issues sometime in the past.

Thank you in advance, guys!

Regards,
Dan Serb



Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Luning Wong
Luning Wong  于2022年2月21日周一 19:38写道:

> import logging
> import sys
>
> from pyflink.common import SimpleStringSchema, WatermarkStrategy
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import PulsarSource,
> PulsarDeserializationSchema, SubscriptionType
> from pyflink.common.typeinfo import Types
>
>
> def foo():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.add_jars(
> 'file:///Users/a/src/me/flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15-SNAPSHOT.jar'
> )
> deserialization_schema = PulsarDeserializationSchema.flink_schema(
> SimpleStringSchema())
> # deserialization_schema =
> PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)
>
> ps = PulsarSource.builder()\
> .set_deserialization_schema(deserialization_schema)\
> .set_service_url('pulsar://localhost:6650')\
> .set_admin_url('http://localhost:8080')\
> .set_topics('ada')\
> .set_subscription_name('axcsdas')\
> .set_subscription_type(SubscriptionType.Exclusive)\
> .build()
>
> kafka_source = env.from_source(
> source=ps,
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name="pulsar_source"
> )
> kafka_source.print()
> env.execute('pulsar_source')
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="
> %(message)s")
> foo()
>
> The above is my test script. it is successful to submit a job to a
> standalone cluster.
>
> Best,
> Wong
>
> Yufei Zhang  于2022年2月21日周一 18:33写道:
>
>> Hi Ananth,
>>
>>
>> From the steps you described, the steps involved using
>> `flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge
>> pulsar connector has not supported Table API yet, so would you mind
>> considering using the  `flink-connector-pulsar-1.14.jar` (without sql,
>> though the classes should be the same. 1.14 is also the stable version) .
>> Since it failed to submit, I'm wildly guessing it's because some class not
>> found issue prevented the serialization before submitting.
>>
>> Also, you mentioned "Get a “transactions not enabled” error in spite of
>> enabling transactions in 2.8.0 broker" this is interesting. To use
>> transactions, not only do we need to enable transactions in the broker, but
>> also in the pulsar source connector as well. Please refer to
>> PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a
>> call PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION,
>> true) would suffice)
>>
>>
>> Thank you for your report and I think since you have these detailed steps
>> to reproduce, I'd recommend submitting a JIRA ticket and we'll try to
>> reproduce the issue you just described in the coming days to find the exact
>> cause. Thank you so much for your precise steps to reproduce.
>>
>> Cheers,
>> Yufei.
>>
>> On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula <
>> agundabatt...@darwinium.com> wrote:
>>
>>> Thanks Guowei.
>>>
>>>
>>>
>>> A small correction in the telnet result command below. I had a typo in
>>> the telnet command earlier (did not separate the port from host name ).
>>> Issuing the proper telnet command resolved the jobmanagers host properly.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>>
>>>
>>> *From: *Guowei Ma 
>>> *Date: *Monday, 21 February 2022 at 8:42 pm
>>> *To: *Ananth Gundabattula 
>>> *Cc: *user@flink.apache.org , affei...@gmail.com
>>> 
>>> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
>>> standalone.
>>>
>>> Thanks Ananth for your clarification.But I am not an expert on Pulsar.
>>>
>>> I would cc the author of the connector to have a look. Would Yufei like
>>> to give some insight?
>>>
>>>
>>> Best,
>>>
>>> Guowei
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula <
>>> agundabatt...@darwinium.com> wrote:
>>>
>>> Thanks for the response Guowei.
>>>
>>>
>>>
>>>- Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086
>>>: nodename nor servname provided, or not known*”
>>>which suggests that the network access is fine ?
>>>- I resubmitted the word count example and it ran fine to completion.
>>>
>>>
>>>
>>> For the pulsar script, I have also tried localhost, and the local LAN
>>> Ips as jobmanager host configuration in conf/flink.yaml and all of them end
>>> with the same result. I have also tried this with Pulsar 2.8.0 and it did
>>> have issues with “shared” subscription type (Get a “transactions not
>>> enabled” error in spite of enabling transactions in 2.8.0 broker).  When I
>>> change the subscription type to “Exclusive” it exhibits the same behavior
>>> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in
>>> both 2.8.0 pulsar and 2.9.1 pulsar)
>>>
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>>
>>>
>>> *From: *Guowei Ma 
>>> *Date: *Monday, 21 February 2022 at 4:57 pm
>>> *To: *Ananth Gundabattula 
>>> *Cc: *user@flink.apache.org 

CSV join in batch mode

2022-02-21 Thread Killian GUIHEUX
Hello all,

I have to perform a join between two large csv sets that do not fit in ram. I 
process this two files in batch mode. I also need a side output to catch csv 
processing errors.
So my question is what is the best way to this kind of join operation ? I think 
I should use a valueState state backend but would it work if my ram is my 
states goes larger than my RAM ?

Regards.

Killian

This message contains confidential information and is intended only for the 
individual(s) addressed in the message. If you are not the named addressee, you 
should not disseminate, distribute, or copy this e-mail. If you are not the 
intended recipient, you are notified that disclosing, distributing, or copying 
this e-mail is strictly prohibited.


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Yufei Zhang
Hi Ananth,


>From the steps you described, the steps involved using
`flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge
pulsar connector has not supported Table API yet, so would you mind
considering using the  `flink-connector-pulsar-1.14.jar` (without sql,
though the classes should be the same. 1.14 is also the stable version) .
Since it failed to submit, I'm wildly guessing it's because some class not
found issue prevented the serialization before submitting.

Also, you mentioned "Get a “transactions not enabled” error in spite of
enabling transactions in 2.8.0 broker" this is interesting. To use
transactions, not only do we need to enable transactions in the broker, but
also in the pulsar source connector as well. Please refer to
PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a
call PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION,
true) would suffice)


Thank you for your report and I think since you have these detailed steps
to reproduce, I'd recommend submitting a JIRA ticket and we'll try to
reproduce the issue you just described in the coming days to find the exact
cause. Thank you so much for your precise steps to reproduce.

Cheers,
Yufei.

On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula <
agundabatt...@darwinium.com> wrote:

> Thanks Guowei.
>
>
>
> A small correction in the telnet result command below. I had a typo in the
> telnet command earlier (did not separate the port from host name ). Issuing
> the proper telnet command resolved the jobmanagers host properly.
>
>
>
> Regards,
>
> Ananth
>
>
>
> *From: *Guowei Ma 
> *Date: *Monday, 21 February 2022 at 8:42 pm
> *To: *Ananth Gundabattula 
> *Cc: *user@flink.apache.org , affei...@gmail.com <
> affei...@gmail.com>
> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
> standalone.
>
> Thanks Ananth for your clarification.But I am not an expert on Pulsar.
>
> I would cc the author of the connector to have a look. Would Yufei like to
> give some insight?
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula <
> agundabatt...@darwinium.com> wrote:
>
> Thanks for the response Guowei.
>
>
>
>- Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086
>: nodename nor servname provided, or not known*”
>which suggests that the network access is fine ?
>- I resubmitted the word count example and it ran fine to completion.
>
>
>
> For the pulsar script, I have also tried localhost, and the local LAN Ips
> as jobmanager host configuration in conf/flink.yaml and all of them end
> with the same result. I have also tried this with Pulsar 2.8.0 and it did
> have issues with “shared” subscription type (Get a “transactions not
> enabled” error in spite of enabling transactions in 2.8.0 broker).  When I
> change the subscription type to “Exclusive” it exhibits the same behavior
> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in
> both 2.8.0 pulsar and 2.9.1 pulsar)
>
>
>
> Regards,
>
> Ananth
>
>
>
> *From: *Guowei Ma 
> *Date: *Monday, 21 February 2022 at 4:57 pm
> *To: *Ananth Gundabattula 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
> standalone.
>
> Hi, Ansanth
>
>
>
> I don't see any error logs on the server side, so it's hard to tell what
> the specific problem is. From the current log, there are two things to try
> first:
>
>
> 1. From the client's log, it is a 5-minute timeout, so you can telnet
> 127.0.0.1:8086 to see if there is a problem with the local network
> 2. From the log on the server side, there is no job submission at all. You
> can try to submit the wordcount example again when submitting the pulsar
> example fails. So as to rule out whether the session cluster is inherently
> problematic.
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
> agundabatt...@darwinium.com> wrote:
>
> Hello All,
>
>
>
> I have a Pyflink script that needs to read from Pulsar and process the
> data.
>
>
>
> I have done the following to implement a prototype.
>
>
>
>1. Since I need Pyflink way to connect to Pulsar , I checked out the
>code from master branch as advised in a different thread. (PyFlink Pulsar
>connector seems to be slated for 1.15 release)
>2. I built the Flink source.
>3. I am using the following location as FLINK_HOME under the source:
>flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>4. The python pyflink wheels have been appropriately installed in the
>right python conda environment.
>5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the
>$FLINK_HOME/lib folder.
>6. I started the standalone cluster by running bin/start-cluster.sh
>7. I submit my test script by using bin/flink run –python …
>8. If am launching the the word_count example in flink documentation,
>everything runs fine and 

Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Ananth Gundabattula
Thanks Guowei.

A small correction in the telnet result command below. I had a typo in the 
telnet command earlier (did not separate the port from host name ). Issuing the 
proper telnet command resolved the jobmanagers host properly.

Regards,
Ananth

From: Guowei Ma 
Date: Monday, 21 February 2022 at 8:42 pm
To: Ananth Gundabattula 
Cc: user@flink.apache.org , affei...@gmail.com 

Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Thanks Ananth for your clarification.But I am not an expert on Pulsar.
I would cc the author of the connector to have a look. Would Yufei like to give 
some insight?

Best,
Guowei


On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Thanks for the response Guowei.


  *   Tried a telnet to the jobmanager host:port and I get 
“127.0.0.1:8086: nodename nor servname provided, or not 
known” which suggests that the network access is fine ?
  *   I resubmitted the word count example and it ran fine to completion.

For the pulsar script, I have also tried localhost, and the local LAN Ips as 
jobmanager host configuration in conf/flink.yaml and all of them end with the 
same result. I have also tried this with Pulsar 2.8.0 and it did have issues 
with “shared” subscription type (Get a “transactions not enabled” error in 
spite of enabling transactions in 2.8.0 broker).  When I change the 
subscription type to “Exclusive” it exhibits the same behavior as the Pulsar 
2.9.1 version. i.e. The job manager submission fails. (in both 2.8.0 pulsar and 
2.9.1 pulsar)

Regards,
Ananth

From: Guowei Ma mailto:guowei@gmail.com>>
Date: Monday, 21 February 2022 at 4:57 pm
To: Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what the 
specific problem is. From the current log, there are two things to try first:

1. From the client's log, it is a 5-minute timeout, so you can telnet 
127.0.0.1:8086 to see if there is a problem with the 
local network
2. From the log on the server side, there is no job submission at all. You can 
try to submit the wordcount example again when submitting the pulsar example 
fails. So as to rule out whether the session cluster is inherently problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



????

2022-02-21 Thread Allen


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-21 Thread Guowei Ma
Thanks Ananth for your clarification.But I am not an expert on Pulsar.
I would cc the author of the connector to have a look. Would Yufei like to
give some insight?

Best,
Guowei


On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula <
agundabatt...@darwinium.com> wrote:

> Thanks for the response Guowei.
>
>
>
>- Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086
>: nodename nor servname provided, or not known*”
>which suggests that the network access is fine ?
>- I resubmitted the word count example and it ran fine to completion.
>
>
>
> For the pulsar script, I have also tried localhost, and the local LAN Ips
> as jobmanager host configuration in conf/flink.yaml and all of them end
> with the same result. I have also tried this with Pulsar 2.8.0 and it did
> have issues with “shared” subscription type (Get a “transactions not
> enabled” error in spite of enabling transactions in 2.8.0 broker).  When I
> change the subscription type to “Exclusive” it exhibits the same behavior
> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in
> both 2.8.0 pulsar and 2.9.1 pulsar)
>
>
>
> Regards,
>
> Ananth
>
>
>
> *From: *Guowei Ma 
> *Date: *Monday, 21 February 2022 at 4:57 pm
> *To: *Ananth Gundabattula 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
> standalone.
>
> Hi, Ansanth
>
>
>
> I don't see any error logs on the server side, so it's hard to tell what
> the specific problem is. From the current log, there are two things to try
> first:
>
>
> 1. From the client's log, it is a 5-minute timeout, so you can telnet
> 127.0.0.1:8086 to see if there is a problem with the local network
> 2. From the log on the server side, there is no job submission at all. You
> can try to submit the wordcount example again when submitting the pulsar
> example fails. So as to rule out whether the session cluster is inherently
> problematic.
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
> agundabatt...@darwinium.com> wrote:
>
> Hello All,
>
>
>
> I have a Pyflink script that needs to read from Pulsar and process the
> data.
>
>
>
> I have done the following to implement a prototype.
>
>
>
>1. Since I need Pyflink way to connect to Pulsar , I checked out the
>code from master branch as advised in a different thread. (PyFlink Pulsar
>connector seems to be slated for 1.15 release)
>2. I built the Flink source.
>3. I am using the following location as FLINK_HOME under the source:
>flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>4. The python pyflink wheels have been appropriately installed in the
>right python conda environment.
>5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the
>$FLINK_HOME/lib folder.
>6. I started the standalone cluster by running bin/start-cluster.sh
>7. I submit my test script by using bin/flink run –python …
>8. If am launching the the word_count example in flink documentation,
>everything runs fine and it completes successfully.
>9. However, if the script involves the Pulsar connector, the logs show
>that the Flink client codebase is not able to submit the job to the
>Jobamanger.
>10. It ultimately dies with a Channel Idle exception. (See this in
>DEBUG mode of the logs). I am attaching the logs for reference.
>
>
>
> I am trying this on OSx. Please note that the classic word_count script
> works fine without any issues and I see the job submission failures on the
> client only when the pulsar source connector is in the script. I have also
> added the logs for the standalone session job manager.I am also attaching
> the script for reference.
>
>
>
> Could you please advise what can I do to resolve the issue. (Will raise an
> JIRA-Issue if someone thinks it is a bug).
>
>
>
> Regards,
>
> Ananth
>
>
>
>


????

2022-02-21 Thread Blake


退订

2022-02-21 Thread 王翔
退订

Re: DataStream API: Parquet File Format with Scala Case Classes

2022-02-21 Thread Fabian Paul
Hi Ryan,

Thanks for bringing up this topic. Currently, your analysis is
correct, and reading parquet files outside the Table API is rather
difficult. The community started an effort in Flink 1.15 to
restructure some of the formats to make them better applicable to the
DataStream and Table API. You can have a look a the CSV format
implementation[1]. Obviously, implementing the Parquet format is more
complicated since it is more performance-sensitive.

If you are willing to work on it, that would be great. We can also
assist with the design and offer guidance during the implementation.

One question I'd still like to ask is about your exact envisioned
setup. My understanding so far is you have Parquet files with backfill
data and want to read all files and then continue the reading from
Kafka. Is that correct?

Best
Fabian

[1] 
https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java#L71


On Fri, Feb 18, 2022 at 11:22 PM Ryan van Huuksloot
 wrote:
>
> Hello,
>
>
> Context:
>
> We are working on integrating Hybrid Sources with different Sources and 
> Sinks. I have been working on a Parquet source that allows users to load the 
> FileSource[T] so that the source can be used within Hybrid Sources where the 
> HybridSource is of Type[T].
>
> The environment is Scala 2.12 and we are using the DataStream API. The 
> generic type “T” used in the email would be a Scala case class.
>
>
> Problem:
>
> Based on the documentation, it is recommended that you use the 
> ParquetColumnarRowInputFormat as an entrypoint to set up the Source. Given 
> that ParquetColumnarRowInputFormat hard codes RowData, your other sources 
> would then need to be of Type[RowData] to be used in HybridSource - from my 
> experience - and you can’t convert FileSource[RowData] -> FileSource[T].
>
> An alternative I looked into was extending ParquetVectorizedInputFormat but 
> found that the type restrictions were difficult to reconcile.
>
>
> Potential Solution:
>
> Create a better AbstractParquetBulkFormat, similar to the 
> AbstractAvroBulkFormat added in 1.15. We would be available to contribute but 
> want to understand if this is a direction Flink is willing to go before 
> putting in the work!
>
>
> Questions:
>
>
> Based on the current implementation of Parquet within Flink, is it correct to 
> say that the only entry-point for parquet is ParquetColumnarRowInputFormat?
>
> Is there any way to convert a FileSource[RowData] -> FileSource[T]?
>
> Would the potential solution listed above be an implementation that Flink 
> would be interested in integrating?
>
> If not, do you have an example of Parquet being used in a HybridSource along 
> with a Kafka Source?
>
>
> Thanks!
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities