Flink JDBC connector behavior

2022-02-08 Thread Qihua Yang
Hi,

We are using flink jdbc connector to read whole database table line by
line. A few things I don't quite understand.
We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
What is the flink internal behavior to read data from table?
Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM)
data each time? Or it read whole table into memory each time?
database metrics show the sql latency is extremely high, almost 20s.
is there any way to optimize it?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
.setDBUrl(url)
.setTableName(tableName)
.setDriverName(DRIVER_NAME)
.setUsername(userName)
.setPassword(password)
.build()
val readOptions = JdbcReadOptions.builder()
.setQuery(query)
.setPartitionColumnName(PARTITION_KEY)
.setPartitionLowerBound(esSinkConf.dbLowerBound)
.setPartitionUpperBound(esSinkConf.dbUpperBound)
.setNumPartitions(PARTITION_NUM)
.setFetchSize(BATCH_SIZE)
.build()
val lookupOptions = JdbcLookupOptions.builder()
.setCacheMaxSize(-1)
.setCacheExpireMs(CACHE_SIZE)
.setMaxRetryTimes(2)
.build()
val rawSource = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setLookupOptions(lookupOptions)
.setSchema(schema)
.build().getDataStream(env)


Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-19 Thread Qihua Yang
Should I change the query? something like below to add a limit? If no
limit, does that mean flink will read whole huge table to memory and fetch
and return 20 records each time?

val query = String.format("SELECT * FROM %s limit 1000", tableName)


On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang  wrote:

> Hi Caizhi,
>
> Thank you for your reply. The heap size is 512m. Fetching from the DB
> table is the only costly operation. After fetching from DB, I simply
> ingested a kafka topic. That should not be the bottleneck.
> Here is the jdbc configuration. Is that correct config?
>
> val query = String.format("SELECT * FROM %s", tableName)
>
> val options = JdbcOptions.builder()
> .setDBUrl(url)
> .setTableName(tableName)
> .setDriverName(DRIVER_NAME)
> .setUsername(userName)
> .setPassword(password)
> .build()
> val readOptions = JdbcReadOptions.builder()
> .setQuery(query)
> .setPartitionColumnName(PARTITION_KEY)
> .setPartitionLowerBound(dbLowerBound)
> .setPartitionUpperBound(dbUpperBound)
> .setNumPartitions(50)
> .setFetchSize(20)
> .build()
> val lookupOptions = JdbcLookupOptions.builder()
> .setCacheMaxSize(-1)
> .setCacheExpireMs(1000)
> .setMaxRetryTimes(2)
> .build()
> val rawSource = JdbcTableSource.builder()
> .setOptions(options)
> .setReadOptions(readOptions)
> .setLookupOptions(lookupOptions)
> .setSchema(schema)
> .build().getDataStream(env)
>
>
> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> This is not the desired behavior. As you have set fetchSize to 20 there
>> will be only 20 records in each parallelism of the source. How large is
>> your heap size? Does your job have any other operations which consume a lot
>> of heap memory?
>>
>> Qihua Yang  于2022年1月19日周三 15:27写道:
>>
>>> Here is the errors
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "server-timer"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>>
>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang  wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>>> I am using Flink JDBC to consume data from a database. The db table is
>>>> pretty large, around 18734 rows. I configured the JDBC number of
>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>>


Re: JDBC table reading

2022-01-18 Thread Qihua Yang
Hi Caizhi,

Got it! Thanks for your clarification.

On Tue, Jan 18, 2022 at 11:45 PM Caizhi Weng  wrote:

> Hi!
>
> "But flink application only has 10 task managers". I assume that you're
> talking about task slots instead of task managers.
>
> If there are more partitions than task slots, each task slot may be
> assigned more than one partition. By default this is a first come first
> served assignment. After a task slot finishes its current partition, it
> will ask from the job manager for the next partition unless all partitions
> have been distributed.
>
> Qihua Yang  于2022年1月19日周三 14:51写道:
>
>> Hi,
>>
>> I plan to use the database as flink source by using flink JDBC. I know
>> setNumPartitions can be used for parallelism in table reading.
>> If the num of task managers is less than numPartitions, what is the
>> behavior?
>> For example:
>> I config setNumPartitions(20), but flink application only has 10 task
>> managers. How flink assign tasks? Or that is totally wrong configuration?
>> num of tasks has to be larger than numPartitions?
>>
>> Thanks,
>> Qihua
>>
>


Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
Hi Caizhi,

Thank you for your reply. The heap size is 512m. Fetching from the DB table
is the only costly operation. After fetching from DB, I simply ingested a
kafka topic. That should not be the bottleneck.
Here is the jdbc configuration. Is that correct config?

val query = String.format("SELECT * FROM %s", tableName)

val options = JdbcOptions.builder()
.setDBUrl(url)
.setTableName(tableName)
.setDriverName(DRIVER_NAME)
.setUsername(userName)
.setPassword(password)
.build()
val readOptions = JdbcReadOptions.builder()
.setQuery(query)
.setPartitionColumnName(PARTITION_KEY)
.setPartitionLowerBound(dbLowerBound)
.setPartitionUpperBound(dbUpperBound)
.setNumPartitions(50)
.setFetchSize(20)
.build()
val lookupOptions = JdbcLookupOptions.builder()
.setCacheMaxSize(-1)
.setCacheExpireMs(1000)
.setMaxRetryTimes(2)
.build()
val rawSource = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setLookupOptions(lookupOptions)
.setSchema(schema)
.build().getDataStream(env)


On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng  wrote:

> Hi!
>
> This is not the desired behavior. As you have set fetchSize to 20 there
> will be only 20 records in each parallelism of the source. How large is
> your heap size? Does your job have any other operations which consume a lot
> of heap memory?
>
> Qihua Yang  于2022年1月19日周三 15:27写道:
>
>> Here is the errors
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "server-timer"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>> Exception: java.lang.OutOfMemoryError thrown from the
>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>
>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>> I am using Flink JDBC to consume data from a database. The db table is
>>> pretty large, around 18734 rows. I configured the JDBC number of
>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>


Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
Here is the errors
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "server-timer"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 16"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "HTTP-Dispatcher"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 11"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 9"

On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang  wrote:

> Hi,
>
> I have a flink cluster(50 hosts, each host runs a task manager).
> I am using Flink JDBC to consume data from a database. The db table is
> pretty large, around 18734 rows. I configured the JDBC number of
> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
> Anyone know why I got OutOfMemoryError? How should I config it?
>
> Thanks,
> Qihua
>
>


JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
Hi,

I have a flink cluster(50 hosts, each host runs a task manager).
I am using Flink JDBC to consume data from a database. The db table is
pretty large, around 18734 rows. I configured the JDBC number of
partitions to 50. fetchSize is 20. Flink application has 50 task managers.
Anyone know why I got OutOfMemoryError? How should I config it?

Thanks,
Qihua


JDBC table reading

2022-01-18 Thread Qihua Yang
Hi,

I plan to use the database as flink source by using flink JDBC. I know
setNumPartitions can be used for parallelism in table reading.
If the num of task managers is less than numPartitions, what is the
behavior?
For example:
I config setNumPartitions(20), but flink application only has 10 task
managers. How flink assign tasks? Or that is totally wrong configuration?
num of tasks has to be larger than numPartitions?

Thanks,
Qihua


Re: Flink rest api to start a job

2022-01-08 Thread Qihua Yang
Hi Yun,

Thank you for your reply! testB.jar doesn't have the same entry class as
testA.jar.
So is it expected behavior? What is the theory behind?

Thanks,
Qihua

On Fri, Jan 7, 2022 at 4:27 PM Yun Gao  wrote:

>
> Hi Qihua
>
> Sorry may I double confirm that whether the entry class exists in both
> testA and testB?
>
> IF testA.jar is included on startup, it would be loaded in the parent
> classloader, which
> is the parent classloader for the user classloader that loads testB. Thus
> at least if the
> entry-class is exist only in testA, it should still be found.
>
> Best,
> Yun
>
> --
> Sender:Qihua Yang
> Date:2022/01/07 02:55:09
> Recipient:user
> Theme:Flink rest api to start a job
>
> Hi,
>
> I found a weird behavior. We launched a k8s cluster without job. But
> includes the jar A. I use Flink rest api to upload a dummy jar(actually it
> can be any jar). Flink will create a jar id. Then I use rest api to start
> the job with the jar A entry-class. But the jar id is the dummy jar id.
> Flink will start the job from jar A. Anyone know why?
> My understanding is flink rest api should start the job from the dummy
> jar, because jar id is dummy jar id that I uploaded.
> Here are steps what I did:
> 1. deploy a k8s pod contains working jar(testA.jar)
> 1. flink rest api upload jar, testB.jar, flink generate jar id,
> 2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar
> 2. flink rest api to runJar with testB.jar id, but testA.jar entry-class.
> 3. flink start job from testA.jar
>
> Thanks,
> Qihua
>
>


Flink rest api to start a job

2022-01-06 Thread Qihua Yang
Hi,

I found a weird behavior. We launched a k8s cluster without job. But
includes the jar A. I use Flink rest api to upload a dummy jar(actually it
can be any jar). Flink will create a jar id. Then I use rest api to start
the job with the jar A entry-class. But the jar id is the dummy jar id.
Flink will start the job from jar A. Anyone know why?
My understanding is flink rest api should start the job from the dummy jar,
because jar id is dummy jar id that I uploaded.
Here are steps what I did:
1. deploy a k8s pod contains working jar(testA.jar)
1. flink rest api upload jar, testB.jar, flink generate jar id,
2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar
2. flink rest api to runJar with testB.jar id, but testA.jar entry-class.
3. flink start job from testA.jar

Thanks,
Qihua


Re: submit a job issue

2021-11-22 Thread Qihua Yang
Sorry, maybe I didn't explain it clearly.
I want to use "/jars/:jarId/run" to submit a job by using the jar that is
not uploaded from "/jars/upload"
Only jars that uploaded from rest api can be seen from "/jars/" and can
submit the job by using "/jars/:jarId/run"
For example, the instance that run the job manager has a jar (test.jar). It
was not uploaded by using "/jars/:jarId/run". rest api "/jars/" cannot show
that jar.
Is that possible to use "/jars/:jarId/run" to submit a job by using that
jar?

Thanks,
Qihua

On Mon, Nov 22, 2021 at 6:09 PM Caizhi Weng  wrote:

> Hi!
>
> You can try /jars/:jarId/run, see [1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
>
> Qihua Yang  于2021年11月23日周二 上午9:27写道:
>
>> Hi,
>>
>> We are running a flink cluster in session mode. After the cluster is
>> launched, no job is running. We want to use REST api to run a job at some
>> point.
>> The jobManager already contains a jar when we deploy. We don't want to
>> upload a new jar by using REST api. Is there any way that I use REST api to
>> run this jar?
>> Looks like REST api only can submit a job by running a jar that was
>> previously uploaded by using REST api.
>>
>> Thanks,
>> Qihua
>>
>


submit a job issue

2021-11-22 Thread Qihua Yang
Hi,

We are running a flink cluster in session mode. After the cluster is
launched, no job is running. We want to use REST api to run a job at some
point.
The jobManager already contains a jar when we deploy. We don't want to
upload a new jar by using REST api. Is there any way that I use REST api to
run this jar?
Looks like REST api only can submit a job by running a jar that was
previously uploaded by using REST api.

Thanks,
Qihua


Re: stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
If HybridSource cannot support JdbcSource, is there any approach I can try?
sequentially read input from two sources. After read data from database,
start to read data from kafka topic?

Thanks,
Qihua

On Fri, Nov 5, 2021 at 10:44 PM Qihua Yang  wrote:

> Hi Austin,
>
> That is exactly what I want. Is it possible to use JdbcTableSource as the
> first source? Looks like only FileSource can be used as the first source?
> Below is the error.
>
> val jdbcSource = JdbcTableSource.builder()
> .setOptions(options)
> .setReadOptions(readOptions)
> .setLookupOptions(lookupOptions)
> .setSchema(schema)
> .build()
>
> val hybridSource = HybridSource.builder(jdbcSource)
> .addSource(kafkaSource)
> .build();
>
>
>
> [image: Screen Shot 2021-11-05 at 10.41.59 PM.png]
> [image: Screen Shot 2021-11-05 at 10.42.13 PM.png]
>
> On Fri, Nov 5, 2021 at 5:11 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Qihua,
>>
>> If I understand correctly, you should be able to with the HybridSource,
>> released in 1.14 [1]
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/
>>
>> On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> Our stream has two sources. one is a Kafka topic, one is a database. Is
>>> it possible to consume from kafka topic only after DB scan is completed? We
>>> configured it in batch mode.
>>>
>>> Thanks,
>>> Qihua
>>>
>>


Re: stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
Hi Austin,

That is exactly what I want. Is it possible to use JdbcTableSource as the
first source? Looks like only FileSource can be used as the first source?
Below is the error.

val jdbcSource = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setLookupOptions(lookupOptions)
.setSchema(schema)
.build()

val hybridSource = HybridSource.builder(jdbcSource)
.addSource(kafkaSource)
.build();



[image: Screen Shot 2021-11-05 at 10.41.59 PM.png]
[image: Screen Shot 2021-11-05 at 10.42.13 PM.png]

On Fri, Nov 5, 2021 at 5:11 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Qihua,
>
> If I understand correctly, you should be able to with the HybridSource,
> released in 1.14 [1]
>
> Best,
> Austin
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/
>
> On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Our stream has two sources. one is a Kafka topic, one is a database. Is
>> it possible to consume from kafka topic only after DB scan is completed? We
>> configured it in batch mode.
>>
>> Thanks,
>> Qihua
>>
>


stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
Hi,

Our stream has two sources. one is a Kafka topic, one is a database. Is it
possible to consume from kafka topic only after DB scan is completed? We
configured it in batch mode.

Thanks,
Qihua


Re: Flink sink data to DB and then commit data to Kafka

2021-11-05 Thread Qihua Yang
Hi Ali,

Thank you so much! That is very helpful.

Thanks,
Qihua

On Wed, Nov 3, 2021 at 2:46 PM Ali Bahadir Zeybek  wrote:

> Hello Qihua,
>
> This will require you to implement and maintain your own database insertion
> logic using any of the clients that your database and programming language
> supports. Bear in mind that you will be losing all the optimizations
> Flink's connector
> provides for you and this will add complexity to the amount of the code
> you will have to maintain. On the other hand it will handle the case
> within one job.
>
> If you have more control on the things you can do with your database, and
> the
> latency to kafka is not a major issue since there will be more moving
> parts, then
> what @Francesco Guardiani  suggested is also a
> good approach. You will need
> to maintain more systems, i.e. Debezium, but less custom code.
>
> Therefore, it is mostly up to your requirements and available resources
> you have
> on how to proceed.
>
> Sincerely,
>
> Ali Bahadir Zeybek
>
>
>
>
>
> On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang  wrote:
>
>> Many thanks guys!
>> Hi Ali, for approach 2, what is the better way to do the database inserts
>> for this case? Currently we simply use JDBC SQL connector to sink to
>> database.
>>
>> Thanks,
>> Qihua
>>
>> On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek 
>> wrote:
>>
>>> Hello Qihua,
>>>
>>> If you do not care with the events that are not committed to DB,
>>> you can use Async I/O [1] and implement a logic that
>>>
>>>- does the database inserts
>>>- completes the original events that are only accepted by DB
>>>
>>> You can then sink this new datastream to kafka.
>>>
>>> If you are also interested in the events that are not committed to DB,
>>> you can use a Process Function [2] and implement a logic that
>>>
>>>- does the database inserts
>>>- collects the original events that are only accepted by DB
>>>- sends the ones that are not accepted by DB to a side output
>>>
>>> You can then sink this new datastream to kafka and maybe sideoutput to
>>> another topic.
>>>
>>> Sincerely,
>>>
>>> Ali Bahadir Zeybek
>>>
>>> [1]:
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio
>>> [2]:
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function
>>>
>>> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
>>>> An alternative is to use a CDC tool like Debezium to stream your table
>>>> changes, and then ingest that stream using Flink to push data later to
>>>> Kafka.
>>>>
>>>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:
>>>>
>>>>> Hi, Qihua
>>>>>
>>>>> AFAIK there is no way to do it. Maybe you need to implement a "new"
>>>>> sink to archive this target.
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Our flink application has two sinks(DB and kafka topic). We want to
>>>>>> push same data to both sinks. Is it possible to push data to kafka topic
>>>>>> only after data is pushed to DB successfully? If the commit to DB fail, 
>>>>>> we
>>>>>> don't want those data is pushed to kafka.
>>>>>>
>>>>>> Thanks,
>>>>>> Qihua
>>>>>>
>>>>>


Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Qihua Yang
Many thanks guys!
Hi Ali, for approach 2, what is the better way to do the database inserts
for this case? Currently we simply use JDBC SQL connector to sink to
database.

Thanks,
Qihua

On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek  wrote:

> Hello Qihua,
>
> If you do not care with the events that are not committed to DB,
> you can use Async I/O [1] and implement a logic that
>
>- does the database inserts
>- completes the original events that are only accepted by DB
>
> You can then sink this new datastream to kafka.
>
> If you are also interested in the events that are not committed to DB,
> you can use a Process Function [2] and implement a logic that
>
>- does the database inserts
>- collects the original events that are only accepted by DB
>- sends the ones that are not accepted by DB to a side output
>
> You can then sink this new datastream to kafka and maybe sideoutput to
> another topic.
>
> Sincerely,
>
> Ali Bahadir Zeybek
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio
> [2]:
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function
>
> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> An alternative is to use a CDC tool like Debezium to stream your table
>> changes, and then ingest that stream using Flink to push data later to
>> Kafka.
>>
>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:
>>
>>> Hi, Qihua
>>>
>>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
>>> to archive this target.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>>>
>>>> Hi,
>>>>
>>>> Our flink application has two sinks(DB and kafka topic). We want to
>>>> push same data to both sinks. Is it possible to push data to kafka topic
>>>> only after data is pushed to DB successfully? If the commit to DB fail, we
>>>> don't want those data is pushed to kafka.
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>


Flink sink data to DB and then commit data to Kafka

2021-11-02 Thread Qihua Yang
Hi,

Our flink application has two sinks(DB and kafka topic). We want to push
same data to both sinks. Is it possible to push data to kafka topic only
after data is pushed to DB successfully? If the commit to DB fail, we don't
want those data is pushed to kafka.

Thanks,
Qihua


database as stream source issue

2021-10-27 Thread Qihua Yang
Hi,

I am trying to use postgres DB as the stream data source and push to kafka
topic. Here is how I config database source. Looks like it didn't read out
any data. But I didn't see any error from the flink log.
I did a test, tried to insert wrong data to database, I saw flink throw
below error. Looks like flink try to read data from database.
*java.lang.ClassCastException: class java.lang.Long cannot be cast to class
java.lang.Integer (java.lang.Long and java.lang.Integer are in module
java.base of loader 'bootstrap')*

I saw  job manager shows switched from DEPLOYING to RUNNING. and switched
from RUNNING to FINISHED immediately.
Can anyone help understand why?
Did I config anything wrong? Or I missed anything?



*2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Sink: test-sink
(2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to
RUNNING.2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2)
(558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.*

val options = JdbcOptions.builder()
// .setDBUrl("jdbc:derby:memory:mydb")
.setDBUrl("")
.setTableName("test_store")
.setDriverName("org.postgresql.Driver")
.setUsername("dbUser")
.setPassword("123")
.build()
val readOptions = JdbcReadOptions.builder()
.setPartitionColumnName("id")
.setPartitionLowerBound(-1)
.setPartitionUpperBound(DB_SIZE)
.setNumPartitions(PARTITIONS)
//.setFetchSize(0)
.build()
val lookupOptions = JdbcLookupOptions.builder()
.setCacheMaxSize(-1)
.setCacheExpireMs(CACHE_SIZE)
.setMaxRetryTimes(2)
.build()
val dataSource = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setLookupOptions(lookupOptions)
.setSchema(storeSchema)
.build().getDataStream(env)


Re: Flink handle both kafka source and db source

2021-10-27 Thread Qihua Yang
Thank you for your reply! I will check Hybrid source.
How do we know the database table is fully scanned? And after the scan is
completed, does flink scan the table again or keep idling?

Thanks,
Qihua

On Tue, Oct 26, 2021 at 1:59 PM Rafi Aroch  wrote:

> Hi,
>
> Take a look at the new 1.14 feature called Hybrid Source:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/
>
> Rafi
>
>
> On Tue, Oct 26, 2021 at 7:46 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> My flink app has two data sources. One is from a Kafka topic, one is from
>> a database by using the JDBC connector. Flink scan the full database table.
>> Which mode should we use? batch mode or streaming mode?
>> How do we know the database table is fully scanned? Will Flink throw any
>> signal to show it is done?
>>
>>


Re: Flink JDBC connect with secret

2021-10-26 Thread Qihua Yang
Hi Jing,

Thank you for your suggestion. I will check if SSL parameters in URL works.

Thanks,
Qihua


On Sat, Oct 23, 2021 at 8:37 PM JING ZHANG  wrote:

> Hi Qihua,
> I checked user documents of several database vendors(postgres, oracle,
> solidDB,SQL server)[1][2][3][4][5], and studied how to use JDBC Driver with
> SSL to connect to these databases.
> Most of database vendors supports two ways:
> 1. Option1: Use Connection url
> 2. Option2:  Define in Properties when call `DriverManager.getConnection`
>
> Url is exposed to users in JDBC SQL connector currently, while properties
> parameters are not exposed yet.
> Would you please check whether defining SSL parameters in url could work
> first? If not, we would looking for other solution.
>
> [1] https://jdbc.postgresql.org/documentation/head/connect.html
> [2]
> https://www.oracle.com/technetwork/topics/wp-oracle-jdbc-thin-ssl-130128.pdf
> [3]
> https://support.unicomsi.com/manuals/soliddb/100/index.html#page/Administrator_Guide/6_Managing_network.07.13.html
> [4]
> https://docs.microsoft.com/en-us/sql/connect/jdbc/connecting-with-ssl-encryption?view=sql-server-ver15
> [5]
> https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl
>
> Best,
> JING ZHANG
>
>
> Qihua Yang  于2021年10月23日周六 下午1:11写道:
>
>> Hi,
>>
>> We plan to use JDBC SQL connector to read/write database. I saw JDBC
>> connector use username and password. Is it possible to use secret(*.crt) to
>> access database. I didn't find guideline how to use it. How to config jdbc
>> with secret?
>>
>> val jdbc: JdbcConnectionOptions = 
>> JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>> .withUrl(url)
>> .withDriverName("org.postgresql.Driver")
>> .withUsername(userName)
>> .withPassword(password)
>> .build()
>>
>> Thanks,
>> Qihua
>>
>


Flink JDBC connect with secret

2021-10-22 Thread Qihua Yang
Hi,

We plan to use JDBC SQL connector to read/write database. I saw JDBC
connector use username and password. Is it possible to use secret(*.crt) to
access database. I didn't find guideline how to use it. How to config jdbc
with secret?

val jdbc: JdbcConnectionOptions =
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
.withDriverName("org.postgresql.Driver")
.withUsername(userName)
.withPassword(password)
.build()

Thanks,
Qihua


Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi,

If I configure batch mode, application will stop after the job is complete,
right? Then k8s will restart the pod and rerun the job. That is not what we
want.

Thanks,
Qihua

On Tue, Oct 12, 2021 at 7:27 PM Caizhi Weng  wrote:

> Hi!
>
> It seems that you want to run a batch job instead of a streaming job.
> Call EnvironmentSettings.newInstance().inBatchMode().build() to create your
> environment settings for a batch job.
>
> Qihua Yang  于2021年10月13日周三 上午5:50写道:
>
>> Hi,
>>
>> Sorry for asking again. I plan to use JDBC connector to scan a database.
>> How do I know if it is done? Are there any metrics I can track? We want to
>> monitor the progress, stop flink application when it is done.
>>
>> Thanks,
>> Qihua
>>
>> On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:
>>
>>> It is pretty clear. Thanks Caizhi!
>>>
>>> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:
>>>
>>>> Hi!
>>>>
>>>> These configurations are not required to merely read from a database.
>>>> They are here to accelerate the reads by allowing sources to read data in
>>>> parallel.
>>>>
>>>> This optimization works by dividing the data into several
>>>> (scan.partition.num) partitions and each partition will be read by a task
>>>> slot (not a task manager, as a task manager may have multiple task slots).
>>>> You can set scan.partition.column to specify the partition key and also set
>>>> the lower and upper bounds for the range of data.
>>>>
>>>> Let's say your partition key is the column "k" which ranges from 0 to
>>>> 999. If you set the lower bound to 0, the upper bound to 999 and the number
>>>> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
>>>> into the first partition and read by the first task slot, all 100 <= k <
>>>> 200 will be divided into the second partition and read by the second task
>>>> slot and so on. So these configurations should have nothing to do with the
>>>> number of rows you have, but should be related to the range of your
>>>> partition key.
>>>>
>>>> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to read data from database with JDBC driver. From [1], I
>>>>> have to config below parameters. I am not quite sure if I understand it
>>>>> correctly. lower-bound is smallest value of the first partition,
>>>>> upper-bound is largest value of the last partition. For example, if the db
>>>>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that 
>>>>> correct?
>>>>> If  setting scan.partition.num to 10, each partition read 100 row?
>>>>> if I set scan.partition.num to 10 and I have 10 task managers. Each
>>>>> task manager will pick a partition to read?
>>>>>
>>>>>- scan.partition.column: The column name used for partitioning the
>>>>>input.
>>>>>- scan.partition.num: The number of partitions.
>>>>>- scan.partition.lower-bound: The smallest value of the first
>>>>>partition.
>>>>>- scan.partition.upper-bound: The largest value of the last
>>>>>partition.
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>


Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi,

Sorry for asking again. I plan to use JDBC connector to scan a database.
How do I know if it is done? Are there any metrics I can track? We want to
monitor the progress, stop flink application when it is done.

Thanks,
Qihua

On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:

> It is pretty clear. Thanks Caizhi!
>
> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> These configurations are not required to merely read from a database.
>> They are here to accelerate the reads by allowing sources to read data in
>> parallel.
>>
>> This optimization works by dividing the data into several
>> (scan.partition.num) partitions and each partition will be read by a task
>> slot (not a task manager, as a task manager may have multiple task slots).
>> You can set scan.partition.column to specify the partition key and also set
>> the lower and upper bounds for the range of data.
>>
>> Let's say your partition key is the column "k" which ranges from 0 to
>> 999. If you set the lower bound to 0, the upper bound to 999 and the number
>> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
>> into the first partition and read by the first task slot, all 100 <= k <
>> 200 will be divided into the second partition and read by the second task
>> slot and so on. So these configurations should have nothing to do with the
>> number of rows you have, but should be related to the range of your
>> partition key.
>>
>> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>>
>>> Hi,
>>>
>>> I am trying to read data from database with JDBC driver. From [1], I
>>> have to config below parameters. I am not quite sure if I understand it
>>> correctly. lower-bound is smallest value of the first partition,
>>> upper-bound is largest value of the last partition. For example, if the db
>>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
>>> If  setting scan.partition.num to 10, each partition read 100 row?
>>> if I set scan.partition.num to 10 and I have 10 task managers. Each task
>>> manager will pick a partition to read?
>>>
>>>- scan.partition.column: The column name used for partitioning the
>>>input.
>>>- scan.partition.num: The number of partitions.
>>>- scan.partition.lower-bound: The smallest value of the first
>>>partition.
>>>- scan.partition.upper-bound: The largest value of the last
>>>partition.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>>
>>> Thanks,
>>> Qihua
>>>
>>


Re: jdbc connector configuration

2021-10-08 Thread Qihua Yang
It is pretty clear. Thanks Caizhi!

On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:

> Hi!
>
> These configurations are not required to merely read from a database. They
> are here to accelerate the reads by allowing sources to read data in
> parallel.
>
> This optimization works by dividing the data into several
> (scan.partition.num) partitions and each partition will be read by a task
> slot (not a task manager, as a task manager may have multiple task slots).
> You can set scan.partition.column to specify the partition key and also set
> the lower and upper bounds for the range of data.
>
> Let's say your partition key is the column "k" which ranges from 0 to 999.
> If you set the lower bound to 0, the upper bound to 999 and the number of
> partitions to 10, then all data satisfying 0 <= k < 100 will be divided
> into the first partition and read by the first task slot, all 100 <= k <
> 200 will be divided into the second partition and read by the second task
> slot and so on. So these configurations should have nothing to do with the
> number of rows you have, but should be related to the range of your
> partition key.
>
> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>
>> Hi,
>>
>> I am trying to read data from database with JDBC driver. From [1], I have
>> to config below parameters. I am not quite sure if I understand it
>> correctly. lower-bound is smallest value of the first partition,
>> upper-bound is largest value of the last partition. For example, if the db
>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
>> If  setting scan.partition.num to 10, each partition read 100 row?
>> if I set scan.partition.num to 10 and I have 10 task managers. Each task
>> manager will pick a partition to read?
>>
>>- scan.partition.column: The column name used for partitioning the
>>input.
>>- scan.partition.num: The number of partitions.
>>- scan.partition.lower-bound: The smallest value of the first
>>partition.
>>- scan.partition.upper-bound: The largest value of the last partition.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>
>> Thanks,
>> Qihua
>>
>


jdbc connector configuration

2021-10-06 Thread Qihua Yang
Hi,

I am trying to read data from database with JDBC driver. From [1], I have
to config below parameters. I am not quite sure if I understand it
correctly. lower-bound is smallest value of the first partition,
upper-bound is largest value of the last partition. For example, if the db
table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
If  setting scan.partition.num to 10, each partition read 100 row?
if I set scan.partition.num to 10 and I have 10 task managers. Each task
manager will pick a partition to read?

   - scan.partition.column: The column name used for partitioning the input.
   - scan.partition.num: The number of partitions.
   - scan.partition.lower-bound: The smallest value of the first partition.
   - scan.partition.upper-bound: The largest value of the last partition.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/

Thanks,
Qihua


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
Looks like after script *flink-daemon.sh *complete, it return exit 0.
Kubernetes regard it as done. Is that expected?

Thanks,
Qihua

On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang  wrote:

> Thank you for your reply.
> From the log, exit code is 0, and reason is Completed.
> Looks like the cluster is fine. But why kubenetes restart the pod. As you
> said, from perspective of Kubernetes everything is done. Then how to
> prevent the restart?
> It didn't even give chance to upload and run a jar
>
> Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
> Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
> Command:
>   /opt/flink/bin/entrypoint.sh
> Args:
>   /opt/flink/bin/run-job-manager.sh
> State:  Waiting
>   Reason:   CrashLoopBackOff
> Last State: Terminated
>   Reason:   Completed
>   Exit Code:0
>   Started:  Wed, 29 Sep 2021 20:12:30 -0700
>   Finished: Wed, 29 Sep 2021 20:12:45 -0700
> Ready:  False
> Restart Count:  131
>
> Thanks,
> Qihua
>
> On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler 
> wrote:
>
>> Is the run-job-manager.sh script actually blocking?
>> Since you (apparently) use that as an entrypoint, if that scripts exits
>> after starting the JM then from the perspective of Kubernetes everything is
>> done.
>>
>> On 30/09/2021 08:59, Matthias Pohl wrote:
>>
>> Hi Qihua,
>> I guess, looking into kubectl describe and the JobManager logs would help
>> in understanding what's going on.
>>
>> Best,
>> Matthias
>>
>> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>>
>>> Hi,
>>> I deployed flink in session mode. I didn't run any jobs. I saw below
>>> logs. That is normal, same as Flink menual shows.
>>>
>>> + /opt/flink/bin/run-job-manager.sh
>>> Starting HA cluster with 1 masters.
>>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>>
>>>
>>> But when I check kubectl, it shows status is Completed. After a while,
>>> status changed to CrashLoopBackOff, and pod restart.
>>> NAME  READY
>>>   STATUS RESTARTS   AGE
>>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>>  5m27s
>>>
>>> NAME  READY
>>>   STATUS RESTARTS   AGE
>>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>>  7m35s
>>>
>>> Anyone can help me understand why?
>>> Why do kubernetes regard this pod as completed and restart? Should I
>>> config something? either Flink side or Kubernetes side? From the Flink
>>> manual, after the cluster is started, I can upload a jar to run the
>>> application.
>>>
>>> Thanks,
>>> Qihua
>>>
>>
>>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
Thank you for your reply.
>From the log, exit code is 0, and reason is Completed.
Looks like the cluster is fine. But why kubenetes restart the pod. As you
said, from perspective of Kubernetes everything is done. Then how to
prevent the restart?
It didn't even give chance to upload and run a jar

Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
Command:
  /opt/flink/bin/entrypoint.sh
Args:
  /opt/flink/bin/run-job-manager.sh
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Completed
  Exit Code:0
  Started:  Wed, 29 Sep 2021 20:12:30 -0700
  Finished: Wed, 29 Sep 2021 20:12:45 -0700
Ready:  False
Restart Count:  131

Thanks,
Qihua

On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler  wrote:

> Is the run-job-manager.sh script actually blocking?
> Since you (apparently) use that as an entrypoint, if that scripts exits
> after starting the JM then from the perspective of Kubernetes everything is
> done.
>
> On 30/09/2021 08:59, Matthias Pohl wrote:
>
> Hi Qihua,
> I guess, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>
>> Hi,
>> I deployed flink in session mode. I didn't run any jobs. I saw below
>> logs. That is normal, same as Flink menual shows.
>>
>> + /opt/flink/bin/run-job-manager.sh
>> Starting HA cluster with 1 masters.
>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>
>>
>> But when I check kubectl, it shows status is Completed. After a while,
>> status changed to CrashLoopBackOff, and pod restart.
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>  5m27s
>>
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>  7m35s
>>
>> Anyone can help me understand why?
>> Why do kubernetes regard this pod as completed and restart? Should I
>> config something? either Flink side or Kubernetes side? From the Flink
>> manual, after the cluster is started, I can upload a jar to run the
>> application.
>>
>> Thanks,
>> Qihua
>>
>
>


Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
I did check the kubectl describe, it shows below info. Reason is Completed.

Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
Command:
  /opt/flink/bin/entrypoint.sh
Args:
  /opt/flink/bin/run-job-manager.sh
State:  Waiting
  Reason:   CrashLoopBackOff
Last State: Terminated
  Reason:   Completed
  Exit Code:0
  Started:  Wed, 29 Sep 2021 20:12:30 -0700
  Finished: Wed, 29 Sep 2021 20:12:45 -0700
Ready:  False
Restart Count:  131


On Wed, Sep 29, 2021 at 11:59 PM Matthias Pohl 
wrote:

> Hi Qihua,
> I guess, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>
>> Hi,
>> I deployed flink in session mode. I didn't run any jobs. I saw below
>> logs. That is normal, same as Flink menual shows.
>>
>> + /opt/flink/bin/run-job-manager.sh
>> Starting HA cluster with 1 masters.
>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.
>>
>> But when I check kubectl, it shows status is Completed. After a while,
>> status changed to CrashLoopBackOff, and pod restart.
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 Completed  5
>>  5m27s
>>
>> NAME  READY
>> STATUS RESTARTS   AGE
>> job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
>>  7m35s
>>
>> Anyone can help me understand why?
>> Why do kubernetes regard this pod as completed and restart? Should I
>> config something? either Flink side or Kubernetes side? From the Flink
>> manual, after the cluster is started, I can upload a jar to run the
>> application.
>>
>> Thanks,
>> Qihua
>>
>


Re: Flink run different jars

2021-09-29 Thread Qihua Yang
Thanks a lot Yangze. That is very helpful!

On Tue, Sep 28, 2021 at 11:11 PM Yangze Guo  wrote:

> You need to edit the conf/workers. Example of the config[1] and the
> process[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang  wrote:
> >
> > Hi Yangze,
> >
> > Thanks a lot for your reply. References are very helpful!
> > Another quick question. Reference 1 can start a standalone cluster
> (session Mode). That cluster has a jobManager. I can submit job to run. How
> about taskManger? Do I need to manually start multiple taskManagers?
> > Is there a complete example to show the process?
> >
> > Thanks,
> > Qihua
> >
> >
> > On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:
> >>
> >> Hi, Qihua
> >>
> >> IIUC, what you want might be a standalone cluster[1] or session
> cluster[2][3].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
> >> >
> >> > Hi,
> >> >
> >> > Is that possible to run a flink app without a job? What I am trying
> to do is I build multiple jars. And switch jar to run different jobs.
> >> > I am not sure if flink supports this mode. I saw rest API can upload
> jar, cancel job and run a jar.
> >> > Right now I can upload a jar to flink. But when I cancel a job, flink
> will restart automatically. I checked log. It show below logs. Can anyone
> help me out?
> >> >
> >> > Caused by:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: CANCELED
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
> >> > ... 41 common frames omitted
> >> > Caused by: org.apache.flink.runtime.client.JobCancellationException:
> Job was cancelled.
> >> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
> >> > ... 41 common frames omitted
> >> >
> >> > Thanks!
>


Start Flink cluster, k8s pod behavior

2021-09-29 Thread Qihua Yang
Hi,
I deployed flink in session mode. I didn't run any jobs. I saw below logs.
That is normal, same as Flink menual shows.

+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.

But when I check kubectl, it shows status is Completed. After a while,
status changed to CrashLoopBackOff, and pod restart.
NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 Completed  5
 5m27s

NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
 7m35s

Anyone can help me understand why?
Why do kubernetes regard this pod as completed and restart? Should I config
something? either Flink side or Kubernetes side? From the Flink manual,
after the cluster is started, I can upload a jar to run the application.

Thanks,
Qihua


Re: Flink run different jars

2021-09-28 Thread Qihua Yang
Hi Yangze,

Thanks a lot for your reply. References are very helpful!
Another quick question. Reference 1 can start a standalone cluster (session
Mode). That cluster has a jobManager. I can submit job to run. How about
taskManger? Do I need to manually start multiple taskManagers?
Is there a complete example to show the process?

Thanks,
Qihua


On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:

> Hi, Qihua
>
> IIUC, what you want might be a standalone cluster[1] or session
> cluster[2][3].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
> >
> > Hi,
> >
> > Is that possible to run a flink app without a job? What I am trying to
> do is I build multiple jars. And switch jar to run different jobs.
> > I am not sure if flink supports this mode. I saw rest API can upload
> jar, cancel job and run a jar.
> > Right now I can upload a jar to flink. But when I cancel a job, flink
> will restart automatically. I checked log. It show below logs. Can anyone
> help me out?
> >
> > Caused by:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: CANCELED
> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
> > ... 41 common frames omitted
> > Caused by: org.apache.flink.runtime.client.JobCancellationException: Job
> was cancelled.
> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
> > ... 41 common frames omitted
> >
> > Thanks!
>


Flink run different jars

2021-09-28 Thread Qihua Yang
Hi,

Is that possible to run a flink app without a job? What I am trying to do
is I build multiple jars. And switch jar to run different jobs.
I am not sure if flink supports this mode. I saw rest API can upload jar,
cancel job and run a jar.
Right now I can upload a jar to flink. But when I cancel a job, flink will
restart automatically. I checked log. It show below logs. Can anyone help
me out?

Caused by:
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
Application Status: CANCELED
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
... 41 common frames omitted
Caused by: org.apache.flink.runtime.client.JobCancellationException: Job
was cancelled.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
at
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
... 41 common frames omitted

Thanks!


Re: multiple jobs in same flink app

2021-06-24 Thread Qihua Yang
Hi,

We are using HA mode. Looks like multiple jobs is not an option for us
That makes sense! Thanks for your guys' help!

Thanks,
Qihua


On Wed, Jun 23, 2021 at 7:28 PM Yang Wang  wrote:

> Robert is right. We Could only support single job submission in
> application mode when the HA mode is enabled.
>
> This is a known limitation of current application mode implementation.
>
> Best,
> Yang
>
> Robert Metzger  于2021年6月24日周四 上午3:54写道:
>
>> Thanks a lot for checking again. I just started Flink in Application mode
>> with a jar that contains two "executeAsync" submissions, and indeed two
>> jobs are running.
>>
>> I think the problem in your case is that you are using High Availability
>> (I guess, because there are log statements from the
>> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>>
>> The Application Mode allows for multi-execute() applications but
>>> High-Availability is not supported in these cases. High-Availability in
>>> Application Mode is only supported for single-execute() applications.
>>
>>
>> See also: https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Sorry again that I gave you invalid information in my first answer.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>>
>>
>>
>>
>> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:
>>
>>> Hi Robert,
>>>
>>> But I saw Flink doc shows application mode can run multiple jobs? Or I
>>> misunderstand it?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>>>
>>>
>>>
>>> *Compared to the Per-Job mode, the Application Mode allows the submission 
>>> of applications consisting of multiple jobs. The order of job execution is 
>>> not affected by the deployment mode but by the call used to launch the job. 
>>> Using execute(), which is blocking, establishes an order and it will lead 
>>> to the execution of the "next" job being postponed until "this" job 
>>> finishes. Using executeAsync(), which is non-blocking, will lead to the 
>>> "next" job starting before "this" job finishes.*
>>>
>>>
>>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
>>> wrote:
>>>
>>>> Hi Qihua,
>>>>
>>>> Application Mode is meant for executing one job at a time, not multiple
>>>> jobs on the same JobManager.
>>>> If you want to do that, you need to use session mode, which allows
>>>> managing multiple jobs on the same JobManager.
>>>>
>>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> Do you know if I can start multiple jobs for a single flink
>>>>> application?
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>
>>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using application mode.
>>>>>>
>>>>>> Thanks,
>>>>>> Qihua
>>>>>>
>>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Qihua,
>>>>>>>
>>>>>>> Which execution mode are you using?
>>>>>>>
>>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thank you for your reply. What I want is flink app has multiple
>>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job 
>>>>>>>> that
>>>>>>>> manage multiple streams.
>>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the
>>>>>>>> log, when the second executeAsync() was called, it shows " *Job
>>>>>>>>  was recovered successfully.*"
>>>>>>>> Looks like the second executeAsync() recover the first job. Not
>>>>>>>> start a second job.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Qihua
>

Re: multiple jobs in same flink app

2021-06-23 Thread Qihua Yang
Hi Robert,

But I saw Flink doc shows application mode can run multiple jobs? Or I
misunderstand it?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/



*Compared to the Per-Job mode, the Application Mode allows the
submission of applications consisting of multiple jobs. The order of
job execution is not affected by the deployment mode but by the call
used to launch the job. Using execute(), which is blocking,
establishes an order and it will lead to the execution of the "next"
job being postponed until "this" job finishes. Using executeAsync(),
which is non-blocking, will lead to the "next" job starting before
"this" job finishes.*


On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger  wrote:

> Hi Qihua,
>
> Application Mode is meant for executing one job at a time, not multiple
> jobs on the same JobManager.
> If you want to do that, you need to use session mode, which allows
> managing multiple jobs on the same JobManager.
>
> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>
>> Hi Arvid,
>>
>> Do you know if I can start multiple jobs for a single flink application?
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> I am using application mode.
>>>
>>> Thanks,
>>> Qihua
>>>
>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>>>
>>>> Hi Qihua,
>>>>
>>>> Which execution mode are you using?
>>>>
>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>>>> each job manage a stream. Currently our flink app has only 1 job that
>>>>> manage multiple streams.
>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>>>> when the second executeAsync() was called, it shows " *Job
>>>>>  was recovered successfully.*"
>>>>> Looks like the second executeAsync() recover the first job. Not start
>>>>> a second job.
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>
>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>>>> finishes then this would also work by having sequential execution.
>>>>>>
>>>>>> However, I think what you actually want to do is to use the same env
>>>>>> with 2 topologies and 1 single execute like this.
>>>>>>
>>>>>> StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> DataStream stream1 = env.addSource(new
>>>>>> SourceFunction());
>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>> DataStream stream2 = env.addSource(new
>>>>>> SourceFunction());
>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>> env.execute("Job 1+2");
>>>>>>
>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>> message, but I didn't see the second job was started, even I saw the log
>>>>>>> message.
>>>>>>>
>>>>>>> public void testJobs() throws Exception {
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream stream1 = env.addSource(new
>>>>>>> SourceFunction());
>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>> printf("### first job");
>>>>>>> env.execute("Job 1");
>>>>>>>
>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream stream2 = env.addSource(new
>>>>>>

Re: multiple jobs in same flink app

2021-06-22 Thread Qihua Yang
Hi Arvid,

Do you know if I can start multiple jobs for a single flink application?

Thanks,
Qihua


On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:

> Hi,
>
> I am using application mode.
>
> Thanks,
> Qihua
>
> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>
>> Hi Qihua,
>>
>> Which execution mode are you using?
>>
>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>
>>> Hi,
>>>
>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>> each job manage a stream. Currently our flink app has only 1 job that
>>> manage multiple streams.
>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>> when the second executeAsync() was called, it shows " *Job
>>>  was recovered successfully.*"
>>> Looks like the second executeAsync() recover the first job. Not start a
>>> second job.
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>>
>>>> Hi,
>>>>
>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>> finishes then this would also work by having sequential execution.
>>>>
>>>> However, I think what you actually want to do is to use the same env
>>>> with 2 topologies and 1 single execute like this.
>>>>
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream stream1 = env.addSource(new
>>>> SourceFunction());
>>>> stream1.addSink(new FlinkKafkaProducer());
>>>> DataStream stream2 = env.addSource(new
>>>> SourceFunction());
>>>> stream2.addSink(new DiscardingSink<>());
>>>> env.execute("Job 1+2");
>>>>
>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>>>
>>>>> Hi,
>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>> I did a simple test.  First job was started. I did see the log
>>>>> message, but I didn't see the second job was started, even I saw the log
>>>>> message.
>>>>>
>>>>> public void testJobs() throws Exception {
>>>>> StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> DataStream stream1 = env.addSource(new
>>>>> SourceFunction());
>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>> printf("### first job");
>>>>> env.execute("Job 1");
>>>>>
>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> DataStream stream2 = env.addSource(new
>>>>> SourceFunction());
>>>>> stream2.addSink(new DiscardingSink<>());
>>>>> printf("### second job");
>>>>> env.execute("Job 2");
>>>>> }
>>>>>
>>>>> Here is the log:
>>>>> ### first job
>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>>>  is submitted.
>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>> Submitting Job with JobId=.
>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>> Received JobGraph submission  (job1).
>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>> Submitting job  (job1).
>>>>>
>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>> execution of job job1 () under job master
>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>> scheduling with scheduling strategy
>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>> INFO  org.apache.flink.runtime.executiongraph.Execut

Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
Hi,

I am using application mode.

Thanks,
Qihua

On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:

> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I want is flink app has multiple jobs,
>> each job manage a stream. Currently our flink app has only 1 job that
>> manage multiple streams.
>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>> when the second executeAsync() was called, it shows " *Job
>>  was recovered successfully.*"
>> Looks like the second executeAsync() recover the first job. Not start a
>> second job.
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> env.execute("Job 1"); is a blocking call. You either have to use
>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>> finishes then this would also work by having sequential execution.
>>>
>>> However, I think what you actually want to do is to use the same env
>>> with 2 topologies and 1 single execute like this.
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream1 = env.addSource(new SourceFunction());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> DataStream stream2 = env.addSource(new SourceFunction());
>>> stream2.addSink(new DiscardingSink<>());
>>> env.execute("Job 1+2");
>>>
>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>>
>>>> Hi,
>>>> Does anyone know how to run multiple jobs in same flink application?
>>>> I did a simple test.  First job was started. I did see the log message,
>>>> but I didn't see the second job was started, even I saw the log message.
>>>>
>>>> public void testJobs() throws Exception {
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream stream1 = env.addSource(new
>>>> SourceFunction());
>>>> stream1.addSink(new FlinkKafkaProducer());
>>>> printf("### first job");
>>>> env.execute("Job 1");
>>>>
>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream stream2 = env.addSource(new
>>>> SourceFunction());
>>>> stream2.addSink(new DiscardingSink<>());
>>>> printf("### second job");
>>>> env.execute("Job 2");
>>>> }
>>>>
>>>> Here is the log:
>>>> ### first job
>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>>  is submitted.
>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>> Submitting Job with JobId=.
>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>> Received JobGraph submission  (job1).
>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>> Submitting job  (job1).
>>>>
>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>> execution of job job1 () under job master
>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>> scheduling with scheduling strategy
>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>> job1 () switched from state CREATED to
>>>> RUNNING.
>>>>
>>>> ### second job
>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter
>>>> : ### second job
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>> ResourceManager address, beginning registration
>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> Starting ZooKeeperLe

Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
Hi,

Thank you for your reply. What I want is flink app has multiple jobs, each
job manage a stream. Currently our flink app has only 1 job that manage
multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when
the second executeAsync() was called, it shows " *Job
 was recovered successfully.*"
Looks like the second executeAsync() recover the first job. Not start a
second job.

Thanks,
Qihua


On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:

> Hi,
>
> env.execute("Job 1"); is a blocking call. You either have to use
> executeAsync or use a separate thread to submit the second job. If Job 1
> finishes then this would also work by having sequential execution.
>
> However, I think what you actually want to do is to use the same env with
> 2 topologies and 1 single execute like this.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> DataStream stream2 = env.addSource(new SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink application?
>> I did a simple test.  First job was started. I did see the log message,
>> but I didn't see the second job was started, even I saw the log message.
>>
>> public void testJobs() throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> printf("### first job");
>> env.execute("Job 1");
>>
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream2 = env.addSource(new SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> printf("### second job");
>> env.execute("Job 2");
>> }
>>
>> Here is the log:
>> ### first job
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>  is submitted.
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Submitting Job with JobId=.
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Received JobGraph submission  (job1).
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Submitting job  (job1).
>>
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
>> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
>> of job job1 () under job master id
>> b03cde9dc2aebdb39c46cda4c2a94c07.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling
>> with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
>> () switched from state CREATED to RUNNING.
>>
>> ### second job
>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter :
>> ### second job
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>> ResourceManager address, beginning registration
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
>> ZooKeeperLeaderRetrievalService
>> /leader//job_manager_lock.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>  was recovered successfully.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>> successfully registered at ResourceManager, leader id:
>> 956d4431ca90d45d92c027046cd0404e.
>> INFO  org.apache.flink.runtime.jobmaster.slo

multiple jobs in same flink app

2021-06-16 Thread Qihua Yang
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test.  First job was started. I did see the log message, but
I didn't see the second job was started, even I saw the log message.

public void testJobs() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream1 = env.addSource(new SourceFunction());
stream1.addSink(new FlinkKafkaProducer());
printf("### first job");
env.execute("Job 1");

env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream2 = env.addSource(new SourceFunction());
stream2.addSink(new DiscardingSink<>());
printf("### second job");
env.execute("Job 2");
}

Here is the log:
### first job
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
 is submitted.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
Submitting Job with JobId=.
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
JobGraph submission  (job1).
INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
Submitting job  (job1).

INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution of
job job1 () under job master id
b03cde9dc2aebdb39c46cda4c2a94c07.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling
with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
() switched from state CREATED to RUNNING.

### second job
WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter :
### second job
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
ResourceManager address, beginning registration
INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
ZooKeeperLeaderRetrievalService
/leader//job_manager_lock.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
.
INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
 was recovered successfully.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
.
INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
successfully registered at ResourceManager, leader id:
956d4431ca90d45d92c027046cd0404e.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
Request slot with profile ResourceProfile{UNKNOWN} for job
 with allocation id
21134414fc60d4ef3e940609cef960b6.
INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
profile ResourceProfile{UNKNOWN} from resource manager.
INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
Request slot with profile ResourceProfile{UNKNOWN} for job
 with allocation id
650bd9100a35ef5086fd4614f5253b55.


Re: Flink stream processing issue

2021-06-03 Thread Qihua Yang
Sorry for the confusion Yes, I mean multiple parallelism. Really thanks
for your help.

Thanks,
Qihua

On Thu, Jun 3, 2021 at 12:03 AM JING ZHANG  wrote:

> Hi Qihua,
>
> I’m sorry I didn’t understand what you mean by ‘replica’. Would you please
> explain a little more?
> If you means you job has multiple parallelism, and whether same data from
> different two inputs would be send to the same downstream subtask after
> `keyedCoProcessFunction`.
> Yes, Flink could do this, if you keyBy the same field for two inputs.
>
> Best regards,
> JING ZHANG
>
> Qihua Yang  于2021年6月3日周四 下午12:25写道:
>
>> Hi,
>>
>> I have a question. We have two data streams that may contain duplicate
>> data. We are using keyedCoProcessFunction to process stream data. I defined
>> the same keySelector for both streams. Our flink application has multiple
>> replicas. We want the same data to be processed by the same replica. Can
>> flink ensure that?
>>
>> Thanks,
>> Qihua
>>
>


Flink stream processing issue

2021-06-02 Thread Qihua Yang
Hi,

I have a question. We have two data streams that may contain duplicate
data. We are using keyedCoProcessFunction to process stream data. I defined
the same keySelector for both streams. Our flink application has multiple
replicas. We want the same data to be processed by the same replica. Can
flink ensure that?

Thanks,
Qihua


Re: ES sink never receive error code

2021-05-24 Thread Qihua Yang
Got it! thanks for helping.

On Thu, May 20, 2021 at 7:15 PM Yangze Guo  wrote:

> > So, ES BulkProcessor retried after bulk request was partially rejected.
> And eventually that request was sent successfully? That is why failure
> handler was not called?
>
> If the bulk request fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.flush.backoff.enable.
> > So, ES BulkProcessor retried after bulk request was partially rejected.
> And eventually that request was sent successfully? That is why failure
> handler was not called?
> >
> > Thanks,
> > Qihua
> >
> > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Have you tried to change bulk.flush.backoff.enable?
> >> According to the docs [1], the underlying ES BulkProcessor will retry
> >> (by default), so the provided failure handler might not be called.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >> >
> >> > Hello,
> >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream
> data to ES by using bulk requests. From ES metrics, we observed some bulk
> thread pool rejections. Contacted AWS team, their explanation is part of
> bulk request was rejected. Response body should include status for each
> item. For bulk thread pool rejection, the error code is 429.
> >> > Our flink app override FailureHandler to process error cases.
> >> > I checked Flink code, it has AfterBulk() method to handle item
> errors. FailureHandler() never received any 429 error.
> >> > Is that flink issue? Or we need to config something to make it work?
> >> > Thanks,
> >> >
> >> > Qihua
>


Re: ES sink never receive error code

2021-05-20 Thread Qihua Yang
Thank you for the reply!
Yes, we did config bulk.flush.backoff.enable.
So, ES BulkProcessor retried after bulk request was partially rejected. And
eventually that request was sent successfully? That is why failure handler
was not called?

Thanks,
Qihua

On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:

> Hi,
>
> Have you tried to change bulk.flush.backoff.enable?
> According to the docs [1], the underlying ES BulkProcessor will retry
> (by default), so the provided failure handler might not be called.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >
> > Hello,
> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data
> to ES by using bulk requests. From ES metrics, we observed some bulk thread
> pool rejections. Contacted AWS team, their explanation is part of bulk
> request was rejected. Response body should include status for each item.
> For bulk thread pool rejection, the error code is 429.
> > Our flink app override FailureHandler to process error cases.
> > I checked Flink code, it has AfterBulk() method to handle item errors.
> FailureHandler() never received any 429 error.
> > Is that flink issue? Or we need to config something to make it work?
> > Thanks,
> >
> > Qihua
>


ES sink never receive error code

2021-05-20 Thread Qihua Yang
Hello,
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to
ES by using bulk requests. From ES metrics, we observed some bulk thread
pool rejections. Contacted AWS team, their explanation is part of bulk
request was rejected. Response body should include status for each item.
For bulk thread pool rejection, the error code is 429.
Our flink app override FailureHandler to process error cases.
I checked Flink code, it has AfterBulk() method to handle item errors.
FailureHandler() never received any 429 error.
Is that flink issue? Or we need to config something to make it work?
Thanks,

Qihua