Re: how to convert DataStream to Table

2021-04-12 Thread Svend
Hi,

Here's an example that works for me:


"""
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*$*;

import java.util.List;

public class Stream2Table {

public static void main(String[] args) {

var streamingEnv = 
StreamExecutionEnvironment.*getExecutionEnvironment*();
var tableEnv = StreamTableEnvironment.*create*(streamingEnv);

var userRows = streamingEnv.fromCollection(
List.*of*(
Row.*of*("user1", "al...@mail.org 
", "Alice"),
Row.*of*("user2", "b...@mail.org 
", "Bob")
),
new RowTypeInfo(Types.*STRING*, Types.*STRING*, 
Types.*STRING*));

var table = tableEnv
.fromDataStream(userRows,
*$*("user_id"), *$*("handle"), *$*("name"));

table.execute().print();
}

}
"""

You can also dig here, you'll probably find better examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table

Cheers,

Svend


On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:
> 
> Hi All,

> 

> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream and each event 
> has RowKind, but i have trouble converting DataStream to a Table.

> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is 

> ```

> root

>  |-- f0: LEGACY('RAW', 'ANY')

> ```

> 

> i want to get the schema :

> 

> ```

> root 

>  |— column1: Type,

>  |— column2: Type, 

> ….

> ```

> 

> 

> how to convert DataStream with RowKind to Table? 

> 

> 

> Thank you very much for your reply

> 


请教 Yarn Per-Job 模式下,多个 executeAsync() 与 execute() 的区别

2021-04-12 Thread 键盘击打者
Yarn Per-Job 模式下,如果一个 flink 的应用中有两个 execute(),第二个Job启动不了。但是如果换成
executeAsync()就可以,我还没有想清楚为什么...





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-04-12 Thread haihua
hi请问楼主这个问题解决了 ,有什么思路可以分享一下吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 退订

2021-04-12 Thread Leonard Xu
On Mon, Apr 12, 2021 at 3:06 PM yangxiaofei  wrote:

> 退订
>
>
Hi
是指取消订阅邮件吗?取消和订阅邮件组 不是直接发给邮件组, Apache的邮件组管理都类似。

请发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org   就可以取消订阅
user-zh@flink.apache.org  邮件列表

邮件列表的订阅管理,可以参考[1]

祝好,
Leonard Xu
[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list


Conflict in the document - About native Kubernetes per job mode

2021-04-12 Thread Fuyao Li
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#per-job-cluster-mode
[4] https://www.youtube.com/watch?v=pdFPr_VOWTU=833s

Best,
Fuyao


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman, Arvid,

So, to achieve "at least once" guarantee, currently, automatic restart of
Flink should be disabled?
Is there any workaround to get "at least once" semantics with Flink
Automatic restarts in this case?

Regards,
Rahul

On Mon, Apr 12, 2021 at 9:57 PM Roman Khachatryan  wrote:

> Hi,
>
> Thanks for the clarification.
>
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
>
> That's currently not possible, at least with the default connector.
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
>  wrote:
> >
> > Hi Roman,
> >
> > Thanks for the reply.
> > This is what I meant by Internal restarts - Automatic restore of Flink
> Job from a failure. For example, pipeline restarts when Fixed delay or
> Failure Rate restart strategies are configured.
> >
> > Quoting documentation in this link - Configuring Kafka Consumer start
> position configuration
> >
> >> Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure
> >
> >
> >
> > It seems that there will be data loss even when offsets are managed
> externally when there are pipeline restarts due to a failure, say, an
> exception. On the other hand, when the pipeline is stopped and
> resubmitted(say, an upgrade), there won't be any data loss as offsets are
> retrieved from an external store and configured while starting Kafka
> Consumer.
> >
> > We do not want to enable checkpointing as the pipeline is stateless. We
> have Deduplication logic in the pipeline and the processing is idempotent.
> >
> > Other than managing offsets externally, Are there any other ways to
> guarantee "at least once" processing without enabling checkpointing?
> >
> > Thanks,
> > Rahul
> >
> > On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Could you please explain what you mean by internal restarts?
> >>
> >> If you commit offsets or timestamps from sink after emitting records
> >> to the external system then there should be no data loss.
> >> Otherwise (if you commit offsets earlier), you have to persist
> >> in-flight records to avoid data loss (i.e. enable checkpointing).
> >>
> >> Regards,
> >> Roman
> >>
> >> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > Context:
> >> >
> >> > We have a stateless Flink Pipeline which reads from Kafka topics.
> >> > The pipeline has a Windowing operator(Used only for introducing a
> delay in processing records) and AsyncI/O operators (used for
> Lookup/Enrichment).
> >> >
> >> > "At least Once" Processing semantics is needed for the pipeline to
> avoid data loss.
> >> >
> >> > Checkpointing is disabled and we are dependent on the auto offset
> commit of Kafka consumer for fault tolerance currently.
> >> >
> >> > As auto offset commit indicates that "the record is successfully
> read", instead of "the record is successfully processed", there will be
> data loss if there is a restart when the offset is committed to Kafka but
> not successfully processed by the Flink Pipeline, as the record is NOT
> replayed again when the pipeline is restarted.
> >> >
> >> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >> >
> >> > Question:
> >> >
> >> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >> >
> >> > We can maintain offsets of each partition of each topic in
> Cassandra(or maintain timestamp, where all records with timestamps less
> than this timestamp are successfully processed) and configure Kafka
> consumer Start Position - setStartFromTimestamp() or
> setStartFromSpecificOffsets()
> >> >
> >> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >> >
> >> > Has anyone used this approach?
> >> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >> >
> >> > Thanks,
> >> > Rahul
>


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-12 Thread Kevin Lam
That's really helpful, thanks Till!

On Thu, Apr 8, 2021 at 6:32 AM Till Rohrmann  wrote:

> Hi Kevin,
>
> when decreasing the TaskManager count I assume that you also decrease the
> parallelism of the Flink job. There are three aspects which can then cause
> a slower recovery.
>
> 1) Each Task gets a larger key range assigned. Therefore, each TaskManager
> has to download more data in order to restart the Task. Moreover, there are
> fewer nodes downloading larger portions of the data (less parallelization).
> 2) If you rescaled the parallelism, then it can happen that a Task gets a
> key range assigned which requires downloading of multiple key range parts
> from the previous run/savepoint. The new key range might not need all the
> data from the savepoint parts and hence you download some data which is not
> really used in the end.
> 3) When rescaling the job, then Flink has to rebuild the RocksDB instance
> which is an expensive and slow operation. What happens is that Flink
> creates for every savepoint part which it needs for its key range a RocksDB
> instance and then extracts the part which is only relevant for its key
> range into a new RocksDB instance. This causes a lot of read and write
> amplification.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We are trying to benchmark savepoint size vs. restore time.
>>
>> One thing we've observed is that when we reduce the number of task
>> managers, the time to restore from a savepoint increases drastically:
>>
>> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
>> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
>> hours
>>
>> *Is this expected? How does the restore process work? Is this just a
>> matter of having lower restore parallelism for 30 task managers vs 156 task
>> managers? *
>>
>> Some details
>>
>> - Running on kubernetes
>> - Used Rocksdb with a local ssd for state backend
>> - Savepoint is hosted on GCS
>> - The smaller task manager case is important to us because we expect to
>> deploy our application with a high number of task managers, and downscale
>> once a backfill is completed
>>
>> Differences between 1/ and 2/:
>>
>> 2/ has decreased task manager count 156 -> 30
>> 2/ has decreased operator parallelism by a factor of ~10
>> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
>> rocksdb files
>>
>> Thanks in advance for your help!
>>
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Sonam Mandal
Hi Till,

Got it, that definitely makes sense, was just looking for some ballpark number 
to start with. Appreciate your help!

Thanks,
Sonam

From: Till Rohrmann 
Sent: Monday, April 12, 2021 1:00 AM
To: Sonam Mandal 
Cc: dhanesh arole ; Tzu-Li (Gordon) Tai 
; user@flink.apache.org 
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you have 
1 GBps network connection and local SSDs, then I guess you should see a 
difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Till and Dhanesh,

Thanks for the insights into both on how to check that this kicks in and on the 
expected behavior. My understanding too was that if multiple TMs are used for 
the job, any TMs that don’t go down can take advantage of local recovery.

Do you have any insights on a good minimum state size we should experiment with 
to check recovery time differences between the two modes?

Thanks,
Sonam

From: dhanesh arole mailto:davcdhane...@gmail.com>>
Sent: Wednesday, April 7, 2021 3:43:11 AM
To: Till Rohrmann mailto:trohrm...@apache.org>>
Cc: Sonam Mandal mailto:soman...@linkedin.com>>; Tzu-Li 
(Gordon) Tai mailto:tzuli...@apache.org>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: How to know if task-local recovery kicked in for some nodes?

Hi Till,

You are right. To give you more context about our setup, we are running 1 task 
slot per task manager and total number of task manager replicas equal to job 
parallelism. The issue actually exacerbates during rolling deployment of task 
managers as each TM goes offline and comes back online again after some time. 
So during bouncing of every TM pod somehow task allocation changes and finally 
job stabilises once all TMs are restarted.  Maybe a proper blue green setup 
would allow us to make the best use of local recovery during restart of TMs. 
But during intermittent failures of one of the TMs local recovery works as 
expected on the other healthy TM instances ( I.e it does not download from 
remote ).

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Dhanesh,

if some of the previously used TMs are still available, then Flink should try 
to redeploy tasks onto them also in case of a global failover. Only those tasks 
which have been executed on the lost TaskManager will need new slots and have 
to download the state from the remote storage.

Cheers,
Till

On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
mailto:davcdhane...@gmail.com>> wrote:
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod 
gets killed and restarts again ( i.e. the entire task manager process restarts 
) then local recovery doesn't happen. Task manager restore process actually 
downloads the latest completed checkpoint from the remote state handle even 
when the older localState data is available. This happens because with every 
run allocation-ids for tasks running on task manager change as task manager 
restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint data 
kicks in when the task manager process is alive but due to some other reason ( 
like timeout from sink or external dependency ) one of the tasks fails and the 
flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Sonam,

The easiest way to see whether local state has been used for recovery is the 
recovery time. Apart from that you can also look for "Found registered local 
state for checkpoint {} in subtask ({} - {} - {}" in the logs which is logged 
on debug. This indicates that the local state is available. However, it does 
not say whether it is actually used. E.g. when doing a rescaling operation we 
change the assignment of key group ranges which prevents local state from being 
used. However in case of a recovery the above-mentioned log message should 
indicate that we use local state recovery.

Cheers,
Till

On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
mailto:tzuli...@apache.org>> wrote:
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We are experimenting with task local recovery and I wanted to know whether 
there is a way to validate that some tasks of the job recovered from the local 
state rather than the remote state.

We've currently set this up to have 2 Task Managers with 2 slots each, and we 
run a job with parallelism 4. To simulate failure, we kill one of the Task 
Manager pods (we run on Kubernetes). I want to see if the local 

Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
Hi,

Thanks for the clarification.

> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?

That's currently not possible, at least with the default connector.

Regards,
Roman

On Mon, Apr 12, 2021 at 3:14 PM Rahul Patwari
 wrote:
>
> Hi Roman,
>
> Thanks for the reply.
> This is what I meant by Internal restarts - Automatic restore of Flink Job 
> from a failure. For example, pipeline restarts when Fixed delay or Failure 
> Rate restart strategies are configured.
>
> Quoting documentation in this link - Configuring Kafka Consumer start 
> position configuration
>
>> Note that these start position configuration methods do not affect the start 
>> position when the job is automatically restored from a failure
>
>
>
> It seems that there will be data loss even when offsets are managed 
> externally when there are pipeline restarts due to a failure, say, an 
> exception. On the other hand, when the pipeline is stopped and 
> resubmitted(say, an upgrade), there won't be any data loss as offsets are 
> retrieved from an external store and configured while starting Kafka Consumer.
>
> We do not want to enable checkpointing as the pipeline is stateless. We have 
> Deduplication logic in the pipeline and the processing is idempotent.
>
> Other than managing offsets externally, Are there any other ways to guarantee 
> "at least once" processing without enabling checkpointing?
>
> Thanks,
> Rahul
>
> On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Could you please explain what you mean by internal restarts?
>>
>> If you commit offsets or timestamps from sink after emitting records
>> to the external system then there should be no data loss.
>> Otherwise (if you commit offsets earlier), you have to persist
>> in-flight records to avoid data loss (i.e. enable checkpointing).
>>
>> Regards,
>> Roman
>>
>> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>>  wrote:
>> >
>> > Hello,
>> >
>> > Context:
>> >
>> > We have a stateless Flink Pipeline which reads from Kafka topics.
>> > The pipeline has a Windowing operator(Used only for introducing a delay in 
>> > processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>> >
>> > "At least Once" Processing semantics is needed for the pipeline to avoid 
>> > data loss.
>> >
>> > Checkpointing is disabled and we are dependent on the auto offset commit 
>> > of Kafka consumer for fault tolerance currently.
>> >
>> > As auto offset commit indicates that "the record is successfully read", 
>> > instead of "the record is successfully processed", there will be data loss 
>> > if there is a restart when the offset is committed to Kafka but not 
>> > successfully processed by the Flink Pipeline, as the record is NOT 
>> > replayed again when the pipeline is restarted.
>> >
>> > Checkpointing can solve this problem. But, since the pipeline is 
>> > stateless, we do not want to use checkpointing, which will persist all the 
>> > records in Windowing Operator and in-flight Async I/O calls.
>> >
>> > Question:
>> >
>> > We are looking for other ways to guarantee "at least once" processing 
>> > without checkpointing. One such way is to manage Kafka Offsets Externally.
>> >
>> > We can maintain offsets of each partition of each topic in Cassandra(or 
>> > maintain timestamp, where all records with timestamps less than this 
>> > timestamp are successfully processed) and configure Kafka consumer Start 
>> > Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>> >
>> > This will be helpful if the pipeline is manually restarted (say, 
>> > JobManager pod is restarted). But, how to avoid data loss in case of 
>> > internal restarts?
>> >
>> > Has anyone used this approach?
>> > What are other ways to guarantee "at least once" processing without 
>> > checkpointing for a stateless Flink pipeline?
>> >
>> > Thanks,
>> > Rahul


Re: flink -conf.yaml修改

2021-04-12 Thread JasonLee
hi

如果是 session 模式需要重启集群,如果是 per-job 模式直接提交任务即可.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Query regarding flink metric types

2021-04-12 Thread Roman Khachatryan
Hi Suchithra,

You are right, those metrics can only grow, at least until failover.

isBackPressured is reported as a boolean on subtask level. These samples
are then aggregated and a ratio of (times-back-pressured /
number-of-samples) is reported to the JobManager.

Regards,
Roman


On Fri, Apr 9, 2021 at 12:44 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hi Community,
>
>
>
> Need some information regarding metrics type mentioned in flink
> documentation.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
>
>
>
> For the checkpoint metrics, below metrics are defined as of type gauge. As
> per my understanding gauge type is used to represent a value which can
> increase/decrease whereas counter is used to represent a value which will
> keep increasing. Below metrics will be keep increasing during the job run.
> Hence counter can be appropriate metric type for these. Please share your
> input on this.
>
>
>
> numberOfCompletedCheckpoints
>
> The number of successfully completed checkpoints.
>
> Gauge
>
> numberOfFailedCheckpoints
>
> The number of failed checkpoints.
>
> Gauge
>
> totalNumberOfCheckpoints
>
> The number of total checkpoints (in progress, completed, failed).
>
> Gauge
>
>
>
> Also “isBackPressured"  metric by the name it indicates as it returns
> boolean value Yes/No. Flink documentation says backpressure is measured as
> below,
>
>- *OK*: 0 <= Ratio <= 0.10
>- *LOW*: 0.10 < Ratio <= 0.5
>- *HIGH*: 0.5 < Ratio <= 1
>
> What exactly this metric reports ?
>
>
>
> isBackPressured
>
> Whether the task is back-pressured.
>
> Gauge
>
>
>
> Thanks,
>
> Suchithra
>


Python Integration with Ververica Platform

2021-04-12 Thread Robert Cullen
I've been using the Community Edition v2.4.  Just wondering if there is a
python integration coming in future versions.

tnanks

-- 
Robert Cullen
240-475-4490


Re: Flink 1.11.4?

2021-04-12 Thread Roman Khachatryan
Hi Maciek,

There are no specific plans for 1.11.4 yet as far as I know.
The official policy is to support the current and previous minor
release [1]. So 1.12 and 1.13 will be officially supported once 1.13
is released.
However, it's likely that 1.11.4 will still be released.

[1]
https://flink.apache.org/downloads.html#update-policy-for-old-releases

Regards,
Roman


On Mon, Apr 12, 2021 at 10:35 AM Maciek Próchniak  wrote:
>
> Hello,
>
> I'd like to ask if there are any plans to release 1.11.4 - I understand
> it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just
> round the corner"?
>
> There are a few fixes we'd like to use - e.g.
> https://issues.apache.org/jira/browse/FLINK-9844,
> https://issues.apache.org/jira/browse/FLINK-21164
>
>
> thanks,
>
> maciek
>


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hi Roman,

Thanks for the reply.
This is what I meant by Internal restarts - Automatic restore of Flink Job
from a failure. For example, pipeline restarts when Fixed delay

or Failure Rate

restart strategies are configured.

Quoting documentation in this link - Configuring Kafka Consumer start
position configuration



Note that these start position configuration methods do not affect the
> start position when the job is automatically restored from a failure



It seems that there will be data loss even when offsets are managed
externally when there are pipeline restarts due to a failure, say, an
exception. On the other hand, when the pipeline is stopped and
resubmitted(say, an upgrade), there won't be any data loss as offsets are
retrieved from an external store and configured while starting Kafka
Consumer.

We do not want to enable checkpointing as the pipeline is stateless. We
have Deduplication logic in the pipeline and the processing is idempotent.

Other than managing offsets externally, Are there any other ways to
guarantee "at least once" processing without enabling checkpointing?

Thanks,
Rahul

On Mon, Apr 12, 2021 at 6:00 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please explain what you mean by internal restarts?
>
> If you commit offsets or timestamps from sink after emitting records
> to the external system then there should be no data loss.
> Otherwise (if you commit offsets earlier), you have to persist
> in-flight records to avoid data loss (i.e. enable checkpointing).
>
> Regards,
> Roman
>
> On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
>  wrote:
> >
> > Hello,
> >
> > Context:
> >
> > We have a stateless Flink Pipeline which reads from Kafka topics.
> > The pipeline has a Windowing operator(Used only for introducing a delay
> in processing records) and AsyncI/O operators (used for Lookup/Enrichment).
> >
> > "At least Once" Processing semantics is needed for the pipeline to avoid
> data loss.
> >
> > Checkpointing is disabled and we are dependent on the auto offset commit
> of Kafka consumer for fault tolerance currently.
> >
> > As auto offset commit indicates that "the record is successfully read",
> instead of "the record is successfully processed", there will be data loss
> if there is a restart when the offset is committed to Kafka but not
> successfully processed by the Flink Pipeline, as the record is NOT replayed
> again when the pipeline is restarted.
> >
> > Checkpointing can solve this problem. But, since the pipeline is
> stateless, we do not want to use checkpointing, which will persist all the
> records in Windowing Operator and in-flight Async I/O calls.
> >
> > Question:
> >
> > We are looking for other ways to guarantee "at least once" processing
> without checkpointing. One such way is to manage Kafka Offsets Externally.
> >
> > We can maintain offsets of each partition of each topic in Cassandra(or
> maintain timestamp, where all records with timestamps less than this
> timestamp are successfully processed) and configure Kafka consumer Start
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
> >
> > This will be helpful if the pipeline is manually restarted (say,
> JobManager pod is restarted). But, how to avoid data loss in case of
> internal restarts?
> >
> > Has anyone used this approach?
> > What are other ways to guarantee "at least once" processing without
> checkpointing for a stateless Flink pipeline?
> >
> > Thanks,
> > Rahul
>


Re: Flink Metric isBackPressured not available

2021-04-12 Thread Roman Khachatryan
Hi,

The metric is registered upon task deployment and reported periodically.

Which Flink version are you using? The metric was added in 1.10.
Are you checking it in the UI?

Regards,
Roman

On Fri, Apr 9, 2021 at 8:50 PM Claude M  wrote:
>
> Hello,
>
> The documentation here 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html 
> states there is a isBackPressured metric available yet I don't see it.  Any 
> ideas why?
>
>
> Thanks


Re: Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Roman Khachatryan
Hi,

Could you please explain what you mean by internal restarts?

If you commit offsets or timestamps from sink after emitting records
to the external system then there should be no data loss.
Otherwise (if you commit offsets earlier), you have to persist
in-flight records to avoid data loss (i.e. enable checkpointing).

Regards,
Roman

On Mon, Apr 12, 2021 at 12:11 PM Rahul Patwari
 wrote:
>
> Hello,
>
> Context:
>
> We have a stateless Flink Pipeline which reads from Kafka topics.
> The pipeline has a Windowing operator(Used only for introducing a delay in 
> processing records) and AsyncI/O operators (used for Lookup/Enrichment).
>
> "At least Once" Processing semantics is needed for the pipeline to avoid data 
> loss.
>
> Checkpointing is disabled and we are dependent on the auto offset commit of 
> Kafka consumer for fault tolerance currently.
>
> As auto offset commit indicates that "the record is successfully read", 
> instead of "the record is successfully processed", there will be data loss if 
> there is a restart when the offset is committed to Kafka but not successfully 
> processed by the Flink Pipeline, as the record is NOT replayed again when the 
> pipeline is restarted.
>
> Checkpointing can solve this problem. But, since the pipeline is stateless, 
> we do not want to use checkpointing, which will persist all the records in 
> Windowing Operator and in-flight Async I/O calls.
>
> Question:
>
> We are looking for other ways to guarantee "at least once" processing without 
> checkpointing. One such way is to manage Kafka Offsets Externally.
>
> We can maintain offsets of each partition of each topic in Cassandra(or 
> maintain timestamp, where all records with timestamps less than this 
> timestamp are successfully processed) and configure Kafka consumer Start 
> Position - setStartFromTimestamp() or setStartFromSpecificOffsets()
>
> This will be helpful if the pipeline is manually restarted (say, JobManager 
> pod is restarted). But, how to avoid data loss in case of internal restarts?
>
> Has anyone used this approach?
> What are other ways to guarantee "at least once" processing without 
> checkpointing for a stateless Flink pipeline?
>
> Thanks,
> Rahul


??????flink -conf.yaml????

2021-04-12 Thread Long


回复: 回复:flink -conf.yaml修改

2021-04-12 Thread 明启 孙
多谢

smq

发件人: 熊云昆
发送时间: 2021年4月12日 18:48
收件人: smq
抄送: user-zh
主题: 回复:flink -conf.yaml修改

重启job就可以了,不需要重启集群


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2021年04月12日 14:34,smq 写道:
大家好,这个配置文件修改之后需要什么操作才生效吗,比如重启集群,还是说直接启动作业自动就应用了呢



回复:flink -conf.yaml修改

2021-04-12 Thread 熊云昆
重启job就可以了,不需要重启集群


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2021年04月12日 14:34,smq 写道:
大家好,这个配置文件修改之后需要什么操作才生效吗,比如重启集群,还是说直接启动作业自动就应用了呢

Manage Kafka Offsets for Fault Tolerance Without Checkpointing

2021-04-12 Thread Rahul Patwari
Hello,

*Context*:

We have a stateless Flink Pipeline which reads from Kafka topics.
The pipeline has a Windowing operator(Used only for introducing a delay in
processing records) and AsyncI/O operators (used for Lookup/Enrichment).

"At least Once" Processing semantics is needed for the pipeline to avoid
data loss.

Checkpointing is disabled and we are dependent on the auto offset commit of
Kafka consumer for fault tolerance currently.

As auto offset commit indicates that "the record is successfully read",
instead of "the record is successfully processed", there will be data loss
if there is a restart when the offset is committed to Kafka but not
successfully processed by the Flink Pipeline, as the record is NOT replayed
again when the pipeline is restarted.

Checkpointing can solve this problem. But, since the pipeline is stateless,
we do not want to use checkpointing, which will persist all the records in
Windowing Operator and in-flight Async I/O calls.

*Question*:

We are looking for other ways to guarantee "at least once" processing
without checkpointing. One such way is to manage Kafka Offsets Externally.

We can maintain offsets of each partition of each topic in Cassandra(or
maintain timestamp, where all records with timestamps less than this
timestamp are successfully processed) and configure Kafka consumer Start
Position

- setStartFromTimestamp() or setStartFromSpecificOffsets()

This will be helpful if the pipeline is manually restarted (say, JobManager
pod is restarted). *But, how to avoid data loss in case of internal
restarts?*

Has anyone used this approach?
What are other ways to guarantee "at least once" processing without
checkpointing for a stateless Flink pipeline?

Thanks,
Rahul


Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-12 Thread HunterXHunter
1.12默认是 eventtime不需要设置



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: how to convert DataStream to Table

2021-04-12 Thread Roman Khachatryan
Hi,

I'm pulling in Timo and Jark as they know Table API better.

Regards,
Roman

On Sun, Apr 11, 2021 at 3:36 PM vtygoss  wrote:
>
> Hi All,
>
>
> there is a scenario where I need to process OGG Log data in kafka using Flink 
> Sql. I can convert the OGG Log Stream to DataStream and each event 
> has RowKind, but i have trouble converting DataStream to a Table.
>
> For test, i tried StreamTableEnvironment#fromDataStream and 
> createTemporaryView API, both TableSchema is
>
> ```
>
> root
>
>  |-- f0: LEGACY('RAW', 'ANY')
>
> ```
>
>
> i want to get the schema :
>
>
> ```
>
> root
>
>  |— column1: Type,
>
>  |— column2: Type,
>
> ….
>
> ```
>
>
>
> how to convert DataStream with RowKind to Table?
>
>
>
> Thank you very much for your reply
>
>


Flink 1.11.4?

2021-04-12 Thread Maciek Próchniak

Hello,

I'd like to ask if there are any plans to release 1.11.4 - I understand 
it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just 
round the corner"?


There are a few fixes we'd like to use - e.g. 
https://issues.apache.org/jira/browse/FLINK-9844, 
https://issues.apache.org/jira/browse/FLINK-21164



thanks,

maciek



Re: Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
In Flink, you can only limit memory usage, e.g. via
taskmanager.memory.process.size [1]
(throttling could be implemented using the DataStream API, but you
mentioned you are using SQL).
Quotas on other resources can be set in the underlying resource manager.

But I'd suggest investigating the failure and understand what's
causing it. Probably, high resource usage is not the root cause.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#memory-configuration

Regards,
Roman

On Mon, Apr 12, 2021 at 10:17 AM 张颖  wrote:
>
> Hi,
> This is not my intention.
> I was meaning that I run stream jobs and batch jobs in the same cluster, but 
> the batch job almost preemption all the resource in the cluster(maybe lead to 
> the machine loadaveage to 150 or cpu to 100% or disk io to 100%), which lead 
> my steam job to a series of problems (such as tm lost and connection time 
> out). So I want wo limit the speed of processing data on batch job.
>
>
>
>
>
>
>
> At 2021-04-12 15:49:31, "Roman Khachatryan"  wrote:
> >Hi,
> >
> >I'm not sure that I fully understand your question.
> >Is the intention to prioritize some jobs over the others in the same
> >Flink cluster? Currently, it is not possible (FLIP-156 and further
> >work aim to address this [1]). At the moment, you can either
> >- deploy the jobs in separate clusters (per-job mode [2]) and rely on
> >the underlying resource manager for resource isolation
> >- or allocate less task slots to a lower priority job by configuring:
> >parallelism, operator chaining and slot sharing groups
> >
> >[1] 
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
> >[2] 
> >https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode
> >
> >Regards,
> >Roman
> >
> >
> >
> >On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
> >>
> >> When I run a sql job with blink planner in my cluster,the task is almost 
> >> preemption the whole resources in the cluster,  and this is a bad effect 
> >> to the stream task.As it is not necessary on speed,so is there any way to 
> >> control the rate in my batch task?
> >>
> >>
> >>
> >> this is the machine performance in running some operator:
> >> https://issues.apache.org/jira/browse/FLINK-22204
> >>
> >>
> >>
> >>
>
>
>
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Till Rohrmann
Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you
have 1 GBps network connection and local SSDs, then I guess you should see
a difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal  wrote:

> Hi Till and Dhanesh,
>
> Thanks for the insights into both on how to check that this kicks in and
> on the expected behavior. My understanding too was that if multiple TMs are
> used for the job, any TMs that don’t go down can take advantage of local
> recovery.
>
> Do you have any insights on a good minimum state size we should experiment
> with to check recovery time differences between the two modes?
>
> Thanks,
> Sonam
> --
> *From:* dhanesh arole 
> *Sent:* Wednesday, April 7, 2021 3:43:11 AM
> *To:* Till Rohrmann 
> *Cc:* Sonam Mandal ; Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>; user@flink.apache.org 
> *Subject:* Re: How to know if task-local recovery kicked in for some
> nodes?
>
> Hi Till,
>
> You are right. To give you more context about our setup, we are running 1
> task slot per task manager and total number of task manager replicas equal
> to job parallelism. The issue actually exacerbates during rolling
> deployment of task managers as each TM goes offline and comes back online
> again after some time. So during bouncing of every TM pod somehow task
> allocation changes and finally job stabilises once all TMs are restarted.
> Maybe a proper blue green setup would allow us to make the best use of
> local recovery during restart of TMs. But during intermittent failures of
> one of the TMs local recovery works as expected on the other healthy TM
> instances ( I.e it does not download from remote ).
>
> On Wed, 7 Apr 2021 at 10:35 Till Rohrmann  wrote:
>
> Hi Dhanesh,
>
> if some of the previously used TMs are still available, then Flink should
> try to redeploy tasks onto them also in case of a global failover. Only
> those tasks which have been executed on the lost TaskManager will need new
> slots and have to download the state from the remote storage.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
> wrote:
>
> Hi Sonam,
>
> We have a similar setup. What I have observed is, when the task manager
> pod gets killed and restarts again ( i.e. the entire task manager process
> restarts ) then local recovery doesn't happen. Task manager restore process
> actually downloads the latest completed checkpoint from the remote state
> handle even when the older localState data is available. This happens
> because with every run allocation-ids for tasks running on task manager
> change as task manager restart causes global job failure and restart.
>
> Local recovery - i.e task restore process using locally stored checkpoint
> data kicks in when the task manager process is alive but due to some other
> reason ( like timeout from sink or external dependency ) one of the tasks
> fails and the flink job gets restarted by the job manager.
>
> Please CMIIW
>
>
> -
> Dhanesh Arole
>
> On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
> wrote:
>
> Hi Sonam,
>
> The easiest way to see whether local state has been used for recovery is
> the recovery time. Apart from that you can also look for "Found registered
> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
> is logged on debug. This indicates that the local state is available.
> However, it does not say whether it is actually used. E.g. when doing a
> rescaling operation we change the assignment of key group ranges which
> prevents local state from being used. However in case of a recovery the
> above-mentioned log message should indicate that we use local state
> recovery.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Sonam,
>
> Pulling in Till (cc'ed), I believe he would likely be able to help you
> here.
>
> Cheers,
> Gordon
>
> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:
>
> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>
> --
> - Dhanesh ( sent from my mobile device. Pardon me for any typos )
>


Re: Does it support rate-limiting in flink 1.12?

2021-04-12 Thread Roman Khachatryan
Hi,

I'm not sure that I fully understand your question.
Is the intention to prioritize some jobs over the others in the same
Flink cluster? Currently, it is not possible (FLIP-156 and further
work aim to address this [1]). At the moment, you can either
- deploy the jobs in separate clusters (per-job mode [2]) and rely on
the underlying resource manager for resource isolation
- or allocate less task slots to a lower priority job by configuring:
parallelism, operator chaining and slot sharing groups

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-156%3A+Runtime+Interfaces+for+Fine-Grained+Resource+Requirements
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/#per-job-mode

Regards,
Roman



On Mon, Apr 12, 2021 at 9:07 AM 张颖  wrote:
>
> When I run a sql job with blink planner in my cluster,the task is almost 
> preemption the whole resources in the cluster,  and this is a bad effect to 
> the stream task.As it is not necessary on speed,so is there any way to 
> control the rate in my batch task?
>
>
>
> this is the machine performance in running some operator:
> https://issues.apache.org/jira/browse/FLINK-22204
>
>
>
>


回复: 退订

2021-04-12 Thread 541122...@qq.com
退订



541122...@qq.com
 
发件人: 程鑫
发送时间: 2021-04-08 22:21
收件人: user-zh
主题: 退订
退订 


退订

2021-04-12 Thread 541122...@qq.com
退订



541122...@qq.com


Re:回复:flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 Thread 董建
感谢sllence大佬的耐心解答,还想继续请教一下:
1、假如是设置了持久化的状态后端,不知道是以什么样的格式来存储state的?是每个流单独一个state(原始数据),还是join后的结果进行state?
2、cdc 默认采用了regular 
join,全量数据都在内存中,所以数据量大的业务会对集群造成较大负担。在实际生产环境中,假如这种类型的任务非常多,集群资源是不是很快就会被耗尽了?不知道 
是否可以认为是cdc的一个问题?
3、我的实际生产中有etl的join需求,这些etl涉及多张表的写入,但是无法确认在join的流中的延迟和乱序时间,所以是不是除了regular 
join就没有更好的选择了?


感谢!


















在 2021-04-12 15:07:05,"sllence"  写道:

1、regular join(就是普通的join),无窗口的限制,会关联全部的历史数据
2、状态存在状态后端(state 
backend),内存是一种默认的状态后端,可以选择filesystem或rockdb之类的状态后端进行大状态以及持久化的状态存储。regular 
join默认会存储全量的状态进行join,可以设置ttl过期掉不用的状态,但可能会照成已过期的状态关联不上导致结果异常
3、cdc目前应该还不支持事件时间以及水印的定义,也就是还不支持窗口join,可以参考issue 
https://issues.apache.org/jira/browse/FLINK-20281
4、每种join的语法不一样,用哪种join就看sql怎么定义,但cdc目前只支持regular join
5、假如用了持久化的状态后端可以在job重新启动时指定路径进行恢复


在2021年04月12日 14:38,董建<62...@163.com> 写道:
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子
https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B
感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下:
flink  connector cdc 直接对接订单表,物流表,商品表表的binlog
1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在?
2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈?
3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了)
4、flink sql join的时候对应的哪种join是否可以指定?
5、假如job失败了重启,这些join后的数据有state吗?
感谢!

Does it support rate-limiting in flink 1.12?

2021-04-12 Thread 张颖
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:
https://issues.apache.org/jira/browse/FLINK-22204



退订

2021-04-12 Thread yangxiaofei
退订



Re: 分组滚动窗口 无法触发计算,由于 watermark 没有被生成,或者被计算。

2021-04-12 Thread jie mei
此外,在事件时间,场景下,如果一个 Stream A 有消息, 另一个 Stream B 没有消息进行 UNION ALL。那么 Stream B
的消息永远是一个 Long.MIN_VALUE, 进行水印对其的时候,UNION ALL 后的水印取所有 CHANNEL 的最小水印,也就是
Long.MIN_VALUE, 这就导致分组滚动窗口一致得不到计算。

jie mei  于2021年4月12日周一 上午11:24写道:

> 问题已经解决,因为我的 StreamEnv 默认设置为事件时间。去掉就可以了,这导致了watermark没有生成。
>
> jie mei  于2021年4月12日周一 上午1:49写道:
>
>> 大家好,我有一个 Flink 程序, 使用事件时间做分组窗口计算,但是无法触发窗口计算。我Debug到 WindowOperator,  下,
>> 发现 WindowOperator 的 TriggerContext中的当前水印一直是一个负数, StreamTaskNetworkInput
>> 中的 processElement 方法没有接受到 watermark 消息, recordOrMark.isWatermark() == false。
>>
>> 我自己的怀疑难道是事件时间每设置对? 但是对比了文档,应该是可以的。下面是我的 DDL
>>
>> create table input_table (
>> `dim` varchar,
>>  `server_time` bigint,
>>  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000,
>> '-MM-dd HH:mm:ss')),
>>  WATERMARK FOR `event_time` AS `event_time`
>> )
>> select TUMBLE_START(`event_time`, INTERVAL '1' SECOND) AS `log_time`,
>> `dim`,
>> count(1),
>> FROM input_table
>>  GROUP BY TUMBLE(`event_time`, INTERVAL '1' SECOND),`dim`
>>
>>
>>
>> *Best Regards*
>> *Jeremy Mei*
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


-- 

*Best Regards*
*Jeremy Mei*


Re: Does it support gpu coding in flink?

2021-04-12 Thread Yangze Guo
Hi, 目前有一个MNIST Inference的Demo[1]是使用GPU的,但是没有用到TensorFlow.
在flink-ai-extended项目中有个TensorFlow训练MNIST的例子[2],但不确定能否直接用GPU版TF执行。我帮你involve一下Becket和Wei来确认下。

[1] https://github.com/KarmaGYZ/flink-mnist
[2] 
https://github.com/alibaba/flink-ai-extended/tree/master/deep-learning-on-flink/flink-ml-examples/src/main/java/com/alibaba/flink/ml/examples/tensorflow/mnist

Best,
Yangze Guo


On Mon, Apr 12, 2021 at 2:35 PM 张颖  wrote:
>
> HI,I am running a tf inference task on my cluster,but I flind it took so long 
> a time to get response, becase it is a bert model and I run it on cpu 
> machine.My componey has gpu k8s cluster,and I read the document 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/external_resources/
>
>
>
> count you give me a demo?Including tf inference on gpu and train on gpu?
>
> I use alink in some of my task, is there a demo for alink on gpu?
>
>
> this is part of the answer:
> https://issues.apache.org/jira/browse/FLINK-22205


Does it support gpu coding in flink?

2021-04-12 Thread 张颖
HI,I am running a tf inference task on my cluster,but I flind it took so long a 
time to get response, becase it is a bert model and I run it on cpu machine.My 
componey has gpu k8s cluster,and I read the document 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/advanced/external_resources/

 

count you give me a demo?Including tf inference on gpu and train on gpu?

I use alink in some of my task, is there a demo for alink on gpu?


this is part of the answer:
https://issues.apache.org/jira/browse/FLINK-22205

Does it support rate-limiting in flink 1.12?

2021-04-12 Thread 张颖
When I run a sql job with blink planner in my cluster,the task is almost 
preemption the whole resources in the cluster,  and this is a bad effect to the 
stream task.As it is not necessary on speed,so is there any way to control the 
rate in my batch task?

 

this is the machine performance in running some operator:

flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 Thread 董建
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子
https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B
感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下:
flink  connector cdc 直接对接订单表,物流表,商品表表的binlog
1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在?
2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈?
3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了)
4、flink sql join的时候对应的哪种join是否可以指定?
5、假如job失败了重启,这些join后的数据有state吗?
感谢!

Re: Not able to run with two task manager in native flink in kubernetes with high availability mode

2021-04-12 Thread Yang Wang
Hi Priyanka Manickam,

If you are using the native Kubernetes integration, the TaskManagers will
be started/stopped dynamically on demands.
It also means you could not control the number of running TaskManager pods.
Refer to here[1] for more information.

In your case, numOfTaskSlots has been configured to 25 and the default
parallelism is 8, so the second TaskManager
is unnecessary. If you want to running your Flink application with two
TaskManagers, please set the "taskmanager.numberOfTaskSlots"
to 4 and keep the parallelism to 8. Then you could find two started
TaskManagers and without spare slots.

Please note that "kubernetes.taskmanager.node-selector" is used to start
TaskManager pods on specific nodes. It
is not used to configure the number of TaskManagers. Refer to K8s
documentation for more information[2].

[1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
[2].
https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/

Best,
Yang


Priyanka Manickam  于2021年4月10日周六 下午3:47写道:

>
> -- Forwarded message -
> From: Priyanka Manickam 
> Date: Fri, 9 Apr 2021, 23:03
> Subject: Fwd: Not able to run with two task manager in native flink in
> kubernetes with high availability mode
> To: 
>
>
>
> -- Forwarded message -
> From: Priyanka Manickam 
> Date: Fri, 9 Apr 2021, 18:56
> Subject: Not able to run with two task manager in native flink in
> kubernetes with high availability mode
> To: 
>
>
> Hi all,
>
> I am using flink deployment in azure kubernetes . The mode i have chosen
> is native deployment high availabilty depolyment in kubernetes.
> But when i used the below configuration. I am seeing only one task manager
> is running.but i need to run two taskmanger. I have searched the
> configuration for it in flink website, but did not find any solution for
> this. Also used kubernetes.taskmanager.node-selector=1, then pod only i am
> not able to see..
>
> Could some one help with the commands to run two taskmanger in native
> flink in kubernetes with high availability mode.
>
> Please find the Attached configuration used and kindly guide with the
> solution for it.
>
>
> Thanks,
> Priyanka Manickam
>


flink -conf.yaml修改

2021-04-12 Thread smq
大家好,这个配置文件修改之后需要什么操作才生效吗,比如重启集群,还是说直接启动作业自动就应用了呢

flink hive维表关联报错snappy压缩问题

2021-04-12 Thread kandy.wang
 java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)

at 
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)

at 
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)

at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)

at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1886)

at 
org.apache.hadoop.mapred.SequenceFileRecordReader.(SequenceFileRecordReader.java:49)

at 
org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64)

at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:113)

at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:162)

at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:128)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:103)

at LookupFunction$74.flatMap(Unknown Source)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at StreamExecCalc$71.processElement(Unknown Source)
请问snappy的问题怎么解决?

"flink 1.12.2升级之后恢复任务出现异常"

2021-04-12 Thread 张锴
在client 提交命令出现以下错误,没遇到过,不清楚如何配置。

Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly
or indirectly in static fields. If the stacktrace suggests that the leak
occurs in a third party library and cannot be fixed immediately, you can
disable this check with the configuration
'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2738)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2952)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2926)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2806)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1788)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

hadoop 版本使用的3.x的,哪位大佬有遇到过这个错误,如何解决。


Re: Re: flink的cpu和内存资源分配

2021-04-12 Thread Xintong Song
你截图的日志也明确显示了各部分的内存大小分别是多少,heap 只是其中一部分,所有的加起来才是你配置的 1728m。

调整配置是可以让 TM 用到更多的内存,至于能否提升性能,那要看你的计算任务瓶颈是否在内存上。如果瓶颈在 cpu、io
甚至上游数据源,那一味调大内存也帮助不大。

Thank you~

Xintong Song



On Mon, Apr 12, 2021 at 10:32 AM penguin.  wrote:

> 谢谢!因为我是一个机器作为一个TM,flink配置文件中默认的taskmanager.memory.process.size
> 
> 大小是1728m,然后日志里面显示堆内存512。
> 如果我把这个参数taskmanager.memory.process.size
> 
> 调大一点比如4GB,是否会对任务执行的性能有所提升呢?
> 默认如下
> INFO [] - The derived from fraction jvm overhead memory (172.800mb (
> 181193935 bytes)) is less than its min value 192.000mb (201326592 bytes),
> min value will be used instead
> INFO [] - Final TaskExecutor Memory configuration:
> INFO [] - Total Process Memory: 1.688gb (1811939328 bytes)
> INFO [] - Total Flink Memory: 1.250gb (1342177280 bytes)
> INFO [] - Total JVM Heap Memory: 512.000mb (536870902 bytes)
> INFO [] - Framework: 128.000mb (134217728 bytes)
> INFO [] - Task: 384.000mb (402653174 bytes)
> INFO [] - Total Off-heap Memory: 768.000mb (805306378 bytes)
> INFO [] - Managed: 512.000mb (536870920 bytes)
> INFO [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
> INFO [] - Framework: 128.000mb (134217728 bytes)
> INFO [] - Task: 0 bytes
> INFO [] - Network: 128.000mb (134217730 bytes)
> INFO [] - JVM Metaspace: 256.000mb (268435456 bytes)
> INFO [] - JVM Overhead: 192.000mb (201326592 bytes)
>
> 调为4GB后:
>
>
>
>
>
> Penguin.
>
>
>
>
>
>
>
> 在 2021-04-12 10:04:32,"Xintong Song"  写道:
> >>
> >> 现在比如一个节点16核cpu 16g内存,4个slot;
> >
> >
> >你这里所说的节点,应该指的是 Flink TM 所在的物理机或虚拟机吧。
> >
> >你这里混淆了好几个概念
> >
> >- 节点、TM、slot 是三个不同层次的概念。16c16g 是针对节点的,4 slot 是针对 TM 的。一个节点是可以有多个 TM的。
> >
> >- TM 的内存不仅包括堆内存,还包括其他内存类型,因此 512M 不代表 TM 的内存大小。
> >
> >- TM 的 cpu 和内存是否会超用,很大程度上取决于你的运行环境。从 Flink 自身来看,Heap、Direct、Mataspace
> >这几种内存都是不会超用的,但是 Native 内存有一部分是有可能超用的,另外 CPU 也是有可能超用的。但是通常 K8s/Yarn
> >运行环境中都提供外围的资源限制,比如不允许资源超用或只允许一定比例的资源超用,这个要看具体的环境配置。
> >
> >
> >可以看一下内存模型与配置相关的几篇官方文档 [1]。
> >
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >[1]
> >https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/memory/mem_setup.html
> >
> >On Sun, Apr 11, 2021 at 9:16 PM penguin.  wrote:
> >
> >> 得知flink的内存是隔离的,cpu不能隔离;
> >> 现在比如一个节点16核cpu 16g内存,4个slot;
> >> 通过调试和日志,发现每个slot拥有1个cpu,那么4个slot就占用4个cpu核心。且堆内存为512M。
> >> 这样的话其他12个cpu核心以及那么大的内存是没有被使用然后浪费了吗?
> >>
> >>
> >> 期待回复,多谢!
>
>
>
>
>