Re: Configure vvp 2.3 with file blob storage

2020-11-03 Thread Fabian Hueske
Hi Laurent,

Thanks for trying out Ververica platform!
However, please note that this is the mailing list of the Apache Flink
project.
Please post further questions using the "Community Edition Feedback" button
on this page: https://ververica.zendesk.com/hc/en-us
We are working on setting up a better channel to support our users.

This being said, the right syntax is

vvp:
  blobStorage:
baseUri: file://tmp/


However, setting up UBS on the local file system won't work because it
needs to be accessible from different components.
Please have a look at the VVP playground:

https://github.com/ververica/ververica-platform-playground/

You can also check the Ververica Platform documentation:

https://docs.ververica.com/platform_operations/blob_storage.html#configuration

Best, Fabian



Am Mo., 2. Nov. 2020 um 21:21 Uhr schrieb Laurent Exsteens <
laurent.exste...@euranova.eu>:

> Hello,
>
> I need to try out the SQL editor in Ververica platform 2.3.
>
> It requires to configure the persistence stack as local, which I did.
> It also requires a blob storage. To keep things simple, I'd like to
> configure a blob storage as local file system, or k8s volume. Configuring
> it as:
>
>   blobStorage: {}
> ## The base URI for universal blob storage.
> ##
> ## If unset, universal blob storage is disabled. If credentials are
> required, add them to the
> ## "Blob storage credentials" section below.
> baseUri: "file://tmp/"
>
> gives me the following error:
> Error: failed to parse values.yaml: error converting YAML to JSON: yaml:
> line 86: did not find expected key
>
> I'm very new to kubernetes, I'm just trying to get the platform running as
> fast as possible since we only have 2 weeks to try the SQL editor.
> Therefore I'd be very happy with the exact configuration to put there to
> have it running on a k8s volume.
>
> Thanks a lot for your help.
>
> Best Regards,
>
> Laurent.
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen


Re: coordination of sinks

2020-08-17 Thread Fabian Hueske
Hi Marco,

You cannot really synchronize data that is being emitted via different
streams (without bringing them together in an operator).

I see two options:

1) emit the event to create the partition and the data to be written into
the partition to the same stream. Flink guarantees that records do not
overtake records in the same partition. However, you need to ensure that
all records remain in the same partition, for example by partitioning on
the same ke.
2) emit the records to two different streams but have a CoProcessFunction
that processes the create partition and data events. The processing
function would just buffer the data events (in state) until it observes the
create partition event for which it creates the partitions (in a
synchronous fashion). Once the partition is created, it forwards all
buffered data and the remaining data.

Hope this helps,
Fabian

Am Sa., 15. Aug. 2020 um 07:45 Uhr schrieb Marco Villalobos <
mvillalo...@kineteque.com>:

> Given a source that goes into a tumbling window with a process function
> that yields two side outputs, in addition to the main data stream, is it
> possible to coordinate the order of completion
> of sink 1, sink 2, and sink 3 as data leaves the tumbling window?
>
> source -> tumbling window -> process function -> side output tag 1 ->
> sink 1   \-> side output tag 2
> -> sink 2
>  \-> main stream -> sink 3
>
>
> sink 1 will create partitions in PostgreSQL for me.
> sink 2 will insert data into the partitioned table
> sink 3 can happen in any order
> but all of them need to finish before the next window fires.
>
> Any advice will help.
>


Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Fabian Hueske
Hi Dongwon,

Maybe you can add your use case to the FLIP-107 discussion thread [1] and
thereby support the proposal (after checking that it would solve your
problem).

It's always helpful to learn about the requirements of users when designing
new features.
It also helps to prioritize which features to push for.

Thank you,
Fabian

[1]
https://lists.apache.org/thread.html/r351dea720788c4f1ca8c5133c34f0f11077a0df54ef4ac8c3e28a6dc%40%3Cdev.flink.apache.org%3E

Am Di., 11. Aug. 2020 um 09:42 Uhr schrieb Dongwon Kim <
eastcirc...@gmail.com>:

> Hi Dawid,
>
> I'll try your suggestion [2] and wait for [1] to be supported in next
> versions.
>
> Thanks,
>
> p.s. It's not easy to insert the timestamp into the body because it will
> affect other applications. In this regard, I hope [1] is going to be
> available soon.
>
> Dongwon
>
>
> On Tue, Aug 11, 2020 at 4:31 PM Dawid Wysakowicz 
> wrote:
>
>> I'm afraid it is not supported yet. The discussion[1] to support it
>> started in the past, but unfortunately it has not concluded yet.
>>
>> One approach I can think of, how you can work this limitation around is
>> to provide your own Format[2]. Unfortunately it is not the most
>> straightforward solution.
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#encoding--decoding-formats
>> On 11/08/2020 09:20, Dongwon Kim wrote:
>>
>> Hi,
>>
>> I'm working on a Kafka topic where timestamps are not shown in the
>> message body, instead in the message's metadata.
>>
>> I want to declare a table from the topic with DDL but
>> "rowtime_column_name" in the below definition seems to accept only existing
>> columns.
>>
>>> WATERMARK FOR rowtime_column_name AS watermark_strategy_expression.
>>
>>
>> Can I define watermarks in this situation where timestamps are shown only
>> in the metadata?
>>
>> Thanks,
>>
>> Dongwon
>>
>>


Re: Simple MDC logs don't show up

2020-07-21 Thread Fabian Hueske
Hi,

When running your code in the IDE, everything runs in the same local JVM.
When you run the job on Kubernetes, the situation is very different.
Your code runs in multiple JVM processes distributed in a cluster.

Flink provides a metrics collection system that you should use to collect
metrics from the various processes.
Please have a look at the metrics documentation [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html


Am Mo., 20. Juli 2020 um 15:28 Uhr schrieb Manish G <
manish.c.ghildi...@gmail.com>:

> Hi All,
>
> I have some very simple MDC logs in my flink job:
>
> MDC.put("methodName", new Object() 
> {}.getClass().getEnclosingMethod().getName());
> MDC.put("className", this.getClass().getSimpleName());
>
> When I run flink job locally, I can see them in the application logs.
>
> But when I run the same job on kubernetes clutter, these don't show up.
>
> Any input here?
>
> With regards
>
>


Re: Flink rest api cancel job

2020-07-21 Thread Fabian Hueske
Hi White,

Can you describe your problem in more detail?

* What is your Flink version?
* How do you deploy the job (application / session cluster), (Kubernetes,
Docker, YARN, ...)
* What kind of job are you running (DataStream, Table/SQL, DataSet)?

Best, Fabian

Am Mo., 20. Juli 2020 um 08:42 Uhr schrieb snack white <
amazingu...@gmail.com>:

> Hi,
>   When I using rest api to cancel my job , the rest 9 TM has been
> canceled quickly , but the other one TM is always cancelling status ,
> someone can show me how can I solve the question .
> Thanks,
> White


Re: Pravega connector cannot recover from the checkpoint due to "Failure to finalize checkpoint"

2020-07-21 Thread Fabian Hueske
Hi Brian,

AFAIK, Arvid and Piotr (both in CC) have been working on the threading
model of the checkpoint coordinator.
Maybe they can help with this question.

Best, Fabian

Am Mo., 20. Juli 2020 um 03:36 Uhr schrieb :

> Anyone can help us on this issue?
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Zhou, Brian
> *Sent:* Wednesday, July 15, 2020 18:26
> *To:* 'user@flink.apache.org'
> *Subject:* Pravega connector cannot recover from the checkpoint due to
> "Failure to finalize checkpoint"
>
>
>
> Hi community,
>
> To give some background, https://github.com/pravega/flink-connectors is a
> Pravega connector for Flink. The ReaderCheckpointHook[1] class uses the
> Flink `MasterTriggerRestoreHook` interface to trigger the Pravega
> checkpoint during Flink checkpoints to make sure the data recovery. We
> experienced the failures in the latest Flink 1.11 upgrade with the
> checkpoint recovery, there are some timeout issues for the continuous
> checkpoint failure on some of the test cases.
> Error stacktrace:
>
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint
> acknowledgement message
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 3. Failure reason: Failure to finalize
> checkpoint.
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>
>  at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>
>  at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint
> has not been fully acknowledged yet
>
>  at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>
>  at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>
>  at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>
>  ... 9 common frames omitted
>
> After some investigation, the main problem is found. It is about the
> checkpoint recovery. When Flink CheckpointCoordinator wants to finalize a
> checkpoint, it needs to check everything is acknowledged, but for some
> reason, the master state still has our ReaderCheckpointHook remaining
> unack-ed, hence leading the checkpoint failure in the complete stage.
> In the PendingCheckpoint::snapshotMasterState, there is an async call to
> acknowledge the master state for each hook. But it returned before the
> acknowledgement.
> I think it might be related to the latest changes of the thread model of
> the checkpoint coordinator. Can someone help to verify?
>
>
>
> *Reproduce procedure:*
> Checkout this branch
> https://github.com/crazyzhou/flink-connectors/tree/brian-1.11-test and
> run below test case:
> FlinkPravegaReaderSavepointITCase::testPravegaWithSavepoint
>
>
>
> [1]
> https://github.com/pravega/flink-connectors/blob/e15cfe5b7917431ce8672cf9f232cb4603d8143a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
>
>
>
> Best Regards,
>
> Brian
>
>
>


Re: Custom metrics output

2020-07-21 Thread Fabian Hueske
Hi Joris,

I don't think that the approach of "add methods in operator class code that
can be called from the main Flink program" will work.

The most efficient approach would be implementing a ProcessFunction that
counts in 1-min time buckets (using event-time semantics) and updates the
metrics.
If you need the metric values to be exact, you can keep the intermediate
counts as operator state.
I would not use a KeyedProcessFunction because you didn't mention a key and
to save the overhead of the shuffle.

You can integrate the ProcessFunctions in different ways in your job.

1) just embed it into the regular flow. The ProcessFunction would just
count and forward every record it receives.
2) fork off a stream of records that just just hold the timestamp to a side
output and apply the ProcessFunction on the forked-off stream.

I think the first approach is simpler and more efficient. The
ProcessFunction would be an identity function to your actual data, just
counting and reporting metrics.

Best, Fabian

Am Mo., 20. Juli 2020 um 01:30 Uhr schrieb Joris Geer <
joris.van.der.g...@oracle.com>:

> Hi,
>
> We want to collect metrics for stream processing, typically counts
> aggregated over 1-minute buckets. However, we want these 1-minute
> boundaries determined by timestamps within the data records. Flink metrics
> do not handle this so we want to roll our own. How to proceed ? Some of our
> team members believe we can add methods in operator class code that can be
> called from the main Flink program, whist I am not sure this is supposed to
> be possible. Others consider using a side output stream with a record per
> input record and use Flink operators to do the aggregation. That may double
> the amount of records processed.
>
> Can we extend the Flink metrics to provide such aggregation ?
>
> Regards,
>
> Joris
>
>


Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-08 Thread Fabian Hueske
Hi Jie,

The auto-ID generation is not done by the SQL translation component but on
a lower level, i.e., it's independent of Flink's SQL translation.
The ID generation only depends on the topology / graph structure of the
program's operators.
The ID of an operator depends on the IDs of its predecessors (and not on
its own processing logic or operator name).

So, as long as the operator graph structure of a program remains the same,
it will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's
control.
The operator graph is automatically generated by the SQL optimizer and
slight changes of a query can result in a different graph while other
changes do not affect the structure.

In your example, the graph structure should remain the same because there
is already a Filter operator (due to "where id == '001'") in the first
query and the second query just extends the filter predicate ("id == '001'
and age >= '28'").
If there was no WHERE clause in the first query, the plan might have been
changed.
In order to reason about which query changes are savepoint compatible, you
need in-depth knowledge about the optimizer's translation process.

I would not rely on being able to start a query from a savepoint of a
(slightly) modified query.
First because it is very fragile given the query translation process and
second because it results in incorrect results.

Given your example query, I would start it from scratch and add a predicate
to continue after the latest result of the previous query:

select id, name, sum(salary) from user_info where id == '001' and age >=
'28' and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id,
name;

If the last result of the first query was for '2020-07-07' I would set xxx
to '2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but
it gives correct results.

Best, Fabian

Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell :

>
> Hi Fabian,
>
> Thanks for your information!
> Actually, I am not clear about the mechanism of auto-generated IDs in
> Flink SQL and the mechanism of how does the operator state mapping back
> from savepoint.
> I hope to get some detail information by giving an example bellow.
>
> I have two sql as samples:
> old sql : select id, name, sum(salary) from user_info where id == '001'
> group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
> new sql:   select id, name, sum(salary) from user_info where id == '001'
> and age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
> I just add some age limitation in new SQL. Now, I want to switch the job
> from old one to the new one by trigger a savepoint. Flink will generate
> operator IDs for operators in new SQL.
> In this case, just from a technical point of view,  the operator IDs in
> the savepoint of the old SQL job can match the operator IDs in the new SQL
> job?
> My understanding is that Flink will reorder the operators and generate new
> IDs for operators. The new IDs may not match the old IDs.
> This will cause some states failed to be mapped back from the old job
> savepoint, which naturally leads to inaccurate calculation results.
> I wonder if my understanding is correct.
>
> Thanks~
> Jie
>
> shadowell
> shadow...@126.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=shadowell=shadowell%40126.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22shadowell%40126.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
> On 7/7/2020 17:23,Fabian Hueske 
> wrote:
>
> Hi Jie Feng,
>
> As you said, Flink translates SQL queries into streaming programs with
> auto-generated operator IDs.
> In order to start a SQL query from a savepoint, the operator IDs in the
> savepoint must match the IDs in the newly translated program.
> Right now this can only be guaranteed if you translate the same query with
> the same Flink version (optimizer changes might change the structure of the
> resulting plan even if the query is the same).
> This is of course a significant limitation, that the community is aware of
> and planning to improve in the future.
>
> I'd also like to add that it can be very difficult to assess whether it is
> meaningful to start a query from a savepoint that was generated with a
> different query.
> A savepoint holds intermediate data that is needed to compute the result
> of a query.
> If you update a query it is very well possible that the result computed by
> Flink won't be equal to the actual result of the new query.
>
> Best, Fabian
>
> Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :
>
>>
>> Hello, everyone,
>> I have some unclear points when 

Re: How to ensure that job is restored from savepoint when using Flink SQL

2020-07-07 Thread Fabian Hueske
Hi Jie Feng,

As you said, Flink translates SQL queries into streaming programs with
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with
the same Flink version (optimizer changes might change the structure of the
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of
and planning to improve in the future.

I'd also like to add that it can be very difficult to assess whether it is
meaningful to start a query from a savepoint that was generated with a
different query.
A savepoint holds intermediate data that is needed to compute the result of
a query.
If you update a query it is very well possible that the result computed by
Flink won't be equal to the actual result of the new query.

Best, Fabian

Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :

>
> Hello, everyone,
> I have some unclear points when using Flink SQL. I hope to get an
> answer or tell me where I can find the answer.
> When using the DataStream API, in order to ensure that the job can
> recover the state from savepoint after adjustment, it is necessary to
> specify the uid for the operator. However, when using Flink SQL, the uid of
> the operator is automatically generated. If the SQL logic changes (operator
> order changes), when the task is restored from savepoint, will it cause
> some of the operator states to be unable to be mapped back, resulting in
> state loss?
>
> Thanks~
> Jie Feng
> shadowell
> shadow...@126.com
>
> 
> 签名由 网易邮箱大师  定制
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 Thread Fabian Hueske
Congrats Yu!

Cheers, Fabian

Am Mi., 17. Juni 2020 um 10:20 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Congratulations Yu!
>
> Cheers,
> Till
>
> On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li 
> wrote:
>
> > Congratulations Yu, well deserved!
> >
> > Best,
> > Jingsong
> >
> > On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei  wrote:
> >
> >> Congrats, Yu!
> >>
> >> GXGX & well deserved!!
> >>
> >> Best Regards,
> >>
> >> Yuan
> >>
> >> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
> >>> part of the Apache Flink Project Management Committee (PMC).
> >>>
> >>> Yu Li has been very active on Flink's Statebackend component, working
> on
> >>> various improvements, for example the RocksDB memory management for
> 1.10.
> >>> and keeps checking and voting for our releases, and also has
> successfully
> >>> produced two releases(1.10.0&1.10.1) as RM.
> >>>
> >>> Congratulations & Welcome Yu Li!
> >>>
> >>> Best,
> >>> Jincheng (on behalf of the Flink PMC)
> >>>
> >>
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: Flink 1.8.3 Kubernetes POD OOM

2020-05-22 Thread Fabian Hueske
Hi Josson,

I don't have much experience setting memory bounds in Kubernetes myself,
but my colleague Andrey (in CC) reworked Flink's memory configuration for
the last release to ease the configuration in container envs.
He might be able to help.

Best, Fabian

Am Do., 21. Mai 2020 um 18:43 Uhr schrieb Josson Paul :

> Cluster type: Standalone cluster
> Job Type: Streaming
> JVM memory: 26.2 GB
> POD memory: 33 GB
> CPU: 10 Cores
> GC: G1GC
> Flink Version: 1.8.3
> State back end: File based
> NETWORK_BUFFERS_MEMORY_FRACTION : 0.02f of the Heap
> We are not accessing Direct memory from application. Only Flink uses
> direct memory
>
> We notice that in Flink 1.8.3 over a period of 30 minutes the POD is
> killed with OOM. JVM Heap is with in limit.
> We read from Kafka and have windows in the application. Our Sink is either
> Kafka or Elastic Search
> *The same application/job was working perfectly in Flink 1.4.1 with the
> same input rate and output rate*
> No back pressure
> *I have attached few Grafana charts as PDF*
> Any idea why the off heap memory / outside JVM memory is going up and
> eventually reaching the limit.
>
>  Java Heap (reserved=26845184KB, committed=26845184KB)
> (mmap: reserved=26845184KB, committed=26845184KB)
>
> - Class (reserved=1241866KB, committed=219686KB)
> (classes #36599)
> (malloc=4874KB #74568)
> (mmap: reserved=1236992KB, committed=214812KB)
>
> - Thread (reserved=394394KB, committed=394394KB)
> (thread #383)
> (stack: reserved=392696KB, committed=392696KB)
> (malloc=1250KB #1920)
> (arena=448KB #764)
>
> - Code (reserved=272178KB, committed=137954KB)
> (malloc=22578KB #33442)
> (mmap: reserved=249600KB, committed=115376KB)
>
> - GC (reserved=1365088KB, committed=1365088KB)
> (malloc=336112KB #1130298)
> (mmap: reserved=1028976KB, committed=1028976KB)
>
>
>
> --
> Thanks
> Josson
>


Re: Adaptive Watermarks Generator

2020-05-22 Thread Fabian Hueske
Hi,

The code of the implementation is linked in the paper:
https://github.com/DataSystemsGroupUT/Adaptive-Watermarks
Since this is a prototype for a research paper, I'm doubtful that the
project is maintained.
I also didn't find an open-source license attached to the code.
Hence adding the project to flink-packages.org wouldn't help a lot.

One option would be to get permission to fork the project (by adding a
permissive open source license) and finding somebody who forks and
maintains the code.
Alternatively, it might be possible to create a completely new
implementation based on the paper (I guess).
Then it could be added to flink-packages.org.

Best, Fabian

Am Fr., 22. Mai 2020 um 09:01 Uhr schrieb 杨东晓 :

> Thanks Robert! Could you point out the repository name ? I didn't find it
> from  flink-packages.org  Packages list.
>
> Robert Metzger  于2020年5月19日周二 下午5:02写道:
>
>> Hi,
>> I haven't seen this paper before. I'm not aware of any efforts in the
>> community to integrate this watermark generator.
>> I'm also not sure if we would integrate it directory into the main Flink
>> repository. What I could see is this generator being maintained in a public
>> GitHub repository, linked from flink-packages.org
>>
>> Best,
>> Robert
>>
>> On Sat, May 9, 2020 at 8:47 AM 杨东晓  wrote:
>>
>>> Hi , I noticed there is a paper describe about adaptive watermark
>>> generator on top of Apache Flink v1.6.2 :
>>> https://www.dfki.de/fileadmin/user_upload/import/10260_awad-adaptive-watermarks.pdf
>>>
>>> This looks like a more precise generator with much less data drop . Does
>>> anybody know more detail about this or does Flink community have any plan
>>> about this?
>>>
>>> Thanks!
>>>
>>


Re: Broadcast stream causing GC overhead limit exceeded

2020-05-07 Thread Fabian Hueske
Hi Eleanore,

Thanks for sharing your findings with us. :-)

Cheers, Fabian

Am Do., 7. Mai 2020 um 04:56 Uhr schrieb Eleanore Jin <
eleanore@gmail.com>:

> Hi Fabian,
>
> I just got confirmation from Apache Beam community, Beam will buffer the
> data until there is data from broadcast stream.
>
> Thanks!
> Eleanore
>
> On Tue, May 5, 2020 at 12:31 AM Fabian Hueske  wrote:
>
>> Hi Eleanore,
>>
>> The "GC overhead limit exceeded" error shows that the JVM spends way too
>> much time garbage collecting and only recovers little memory with every run.
>> Since, the program doesn't make any progress in such a situation it is
>> terminated with the GC Overhead Error. This typically happens when lots of
>> temporary objects are created.
>> The root cause could be Flink, Beam, or your own code.
>> It's important to understand that this error is not directly related to a
>> shortage of memory (although more memory can help to mitigate the issue a
>> bit) but rather indicates an implementation issue.
>>
>> Coming back to your question, Flink's Broadcast stream does *not* block
>> or collect events from the non-broadcasted side if the broadcast side
>> doesn't serve events.
>> However, the user-implemented operators (Beam or your code in this case)
>> often puts non-broadcasted events into state to wait for input from the
>> other side.
>> Since the error is not about lack of memory, the buffering in Flink state
>> might not be the problem here.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
>> eleanore@gmail.com>:
>>
>>> Hi All,
>>>
>>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>>> filter of the data from main stream.
>>>
>>> I have experienced OOM: GC overhead limit exceeded continuously.
>>>
>>> After did some experiments, I observed following behaviour:
>>> 1. run job without side input(broadcast stream): no OOM issue
>>> 2. run job with side input (kafka topic with 1 partition) with data
>>> available from this side input: no OOM issue
>>> 3. run job with side input (kafka topic with 1 partition) without any
>>> data from the side input: *OOM issue*
>>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>>> looks like due to the references hold by Broadcast stream
>>> [image: image.png]
>>>
>>> My question is: what is the behaviour from Broadcast stream if there is
>>> no data available? Does it cache the data from main stream and wait until
>>> data becoming available from Broadcast stream to process?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Re: multiple joins in one job

2020-05-06 Thread Fabian Hueske
You can in fact forward both time attributes because Flink makes sure that
the watermark is automatically adjusted to the "slower" of both input
streams.

You can run the following queries in the SQL CLI client (here taken an
example from a Flink SQL training [1]

Flink SQL> CREATE VIEW ridesWithFare AS
> SELECT
>   *
> FROM
>   Rides r,
>   Fares f
> WHERE
>   r.rideId = f.rideId AND
>   NOT r.isStart AND
>   f.payTime BETWEEN r.rideTime - INTERVAL '5' MINUTE AND r.rideTime;
[INFO] View has been created.

Flink SQL> DESCRIBE ridesWithFare;
root
 |-- rideId: BIGINT
 |-- taxiId: BIGINT
 |-- isStart: BOOLEAN
 |-- lon: FLOAT
 |-- lat: FLOAT
 |-- rideTime: TIMESTAMP(3) *ROWTIME*
 |-- psgCnt: INT
 |-- rideId0: BIGINT
 |-- payTime: TIMESTAMP(3) *ROWTIME*
 |-- payMethod: STRING
 |-- tip: FLOAT
 |-- toll: FLOAT
 |-- fare: FLOAT

As you see, both rideTime and payTime are of type TIMESTAMP(3) *ROWTIME*.
Hence, both can be used as time attributes later one. However, typically
you'll just select one of them, e.g., when defining a grouping window.

Cheers,
Fabian

[1]
https://github.com/ververica/sql-training/wiki/Joining-Dynamic-Tables#average-tip-per-hour-of-day

Am Mi., 6. Mai 2020 um 03:52 Uhr schrieb Benchao Li :

> Yes. The watermark will be propagated correctly, which is the min of two
> inputs.
>
> lec ssmi  于2020年5月6日周三 上午9:46写道:
>
>> Even if the time attribute field is retained, will the  related watermark
>> be retained?
>> If not, and there is no sql syntax to declare watermark again, it is
>> equivalent to not being able to do multiple joins in one job.
>>
>> Benchao Li  于2020年5月5日周二 下午9:23写道:
>>
>>> You cannot select more than one time attribute, the planner will give
>>> you an Exception if you did that.
>>>
>>>
>>> lec ssmi  于2020年5月5日周二 下午8:34写道:
>>>
>>>> As  you said, if   I  select  all  the  time  attribute  fields   from
>>>> both  ,  which  will be  the  final  one?
>>>>
>>>> Benchao Li  于 2020年5月5日周二 17:26写道:
>>>>
>>>>> Hi lec,
>>>>>
>>>>> You don't need to specify time attribute again like `TUMBLE_ROWTIME`,
>>>>> you just select  the time attribute field
>>>>> from one of the input, then it will be time attribute automatically.
>>>>>
>>>>> lec ssmi  于2020年5月5日周二 下午4:42写道:
>>>>>
>>>>>> But  I  have  not  found  there  is  any  syntax to  specify   time
>>>>>>  attribute  field  and  watermark  again  with  pure  sql.
>>>>>>
>>>>>> Fabian Hueske  于 2020年5月5日周二 15:47写道:
>>>>>>
>>>>>>> Sure, you can write a SQL query with multiple interval joins that
>>>>>>> preserve event-time attributes and watermarks.
>>>>>>> There's no need to feed data back to Kafka just to inject it again
>>>>>>> to assign new watermarks.
>>>>>>>
>>>>>>> Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi <
>>>>>>> shicheng31...@gmail.com>:
>>>>>>>
>>>>>>>> I mean using pure sql statement to make it . Can it be possible?
>>>>>>>>
>>>>>>>> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> If the interval join emits the time attributes of both its inputs,
>>>>>>>>> you can use either of them as a time attribute in a following operator
>>>>>>>>> because the join ensures that the watermark will be aligned with both 
>>>>>>>>> of
>>>>>>>>> them.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>>>>>>>>> shicheng31...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Thanks for your replay.
>>>>>>>>>> But as I known, if   the time attribute  will be retained and
>>>>>>>>>> the time attribute field  of both streams is selected in the result 
>>>>>>>>>> after
>>>>>>>>>> joining, who is the final time attribute variable?
>>>>>>>>>>
>>>>>>>>>> Benchao Li  于2020年4月30日周四 下午8:25写道:
>>>>>>>>>>
>>>&

Re: table.show() in Flink

2020-05-05 Thread Fabian Hueske
There's also the Table API approach if you want to avoid typing a "full"
SQL query:

Table t = tEnv.from("myTable");

Cheers,
Fabian

Am Di., 5. Mai 2020 um 16:34 Uhr schrieb Őrhidi Mátyás <
matyas.orh...@gmail.com>:

> Thanks guys for the prompt answers!
>
> On Tue, May 5, 2020 at 2:49 PM Kurt Young  wrote:
>
>> A more straightforward way after FLIP-84 would be:
>> TableResult result = tEnv.executeSql("select xxx ...");
>> result.print();
>>
>> And if you are using 1.10 now, you can
>> use TableUtils#collectToList(table) to collect the
>> result to a list, and then print rows by yourself.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, May 5, 2020 at 8:44 PM Jark Wu  wrote:
>>
>>> Hi Matyas,
>>>
>>> AFAIK, currently, this is the recommended way to print result of table.
>>> In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new
>>> APIs to do the fluent printing like this.
>>>
>>> Table table2 = tEnv.sqlQuery("select yy ...");
>>> TableResult result2 = table2.execute();
>>> result2.print();
>>>
>>> cc @Godfrey, please correct if I misunderstand the above API.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>
>>> On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás 
>>> wrote:
>>>
 Dear Flink Community,

 I'm missing Spark's table.show() method in Flink. I'm using the
 following alternative at the moment:

 Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
 tableEnv.toAppendStream(results, Row.class).print();

 Is it the recommended way to print the content of a table?


 Thanks,

 Matyas






Re: multiple joins in one job

2020-05-05 Thread Fabian Hueske
Sure, you can write a SQL query with multiple interval joins that preserve
event-time attributes and watermarks.
There's no need to feed data back to Kafka just to inject it again to
assign new watermarks.

Am Di., 5. Mai 2020 um 01:45 Uhr schrieb lec ssmi :

> I mean using pure sql statement to make it . Can it be possible?
>
> Fabian Hueske  于2020年5月4日周一 下午4:04写道:
>
>> Hi,
>>
>> If the interval join emits the time attributes of both its inputs, you
>> can use either of them as a time attribute in a following operator because
>> the join ensures that the watermark will be aligned with both of them.
>>
>> Best, Fabian
>>
>> Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi <
>> shicheng31...@gmail.com>:
>>
>>> Thanks for your replay.
>>> But as I known, if   the time attribute  will be retained and  the time
>>> attribute field  of both streams is selected in the result after joining,
>>> who is the final time attribute variable?
>>>
>>> Benchao Li  于2020年4月30日周四 下午8:25写道:
>>>
>>>> Hi lec,
>>>>
>>>> AFAIK, time attribute will be preserved after time interval join.
>>>> Could you share your DDL and SQL queries with us?
>>>>
>>>> lec ssmi  于2020年4月30日周四 下午5:48写道:
>>>>
>>>>> Hi:
>>>>>I need to join multiple stream tables  using  time interval join.
>>>>> The problem is that the time attribute will disappear  after the jon , and
>>>>> pure  sql cannot declare the time attribute field again . So, to make is
>>>>> success,  I need to insert  the last result of join to kafka ,and consume
>>>>> it and join it with another stream table  in another flink job . This 
>>>>> seems
>>>>> troublesome.
>>>>> Any good idea?
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Benchao Li
>>>> School of Electronics Engineering and Computer Science, Peking University
>>>> Tel:+86-15650713730
>>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>>
>>>>


Re: Broadcast stream causing GC overhead limit exceeded

2020-05-05 Thread Fabian Hueske
Hi Eleanore,

The "GC overhead limit exceeded" error shows that the JVM spends way too
much time garbage collecting and only recovers little memory with every run.
Since, the program doesn't make any progress in such a situation it is
terminated with the GC Overhead Error. This typically happens when lots of
temporary objects are created.
The root cause could be Flink, Beam, or your own code.
It's important to understand that this error is not directly related to a
shortage of memory (although more memory can help to mitigate the issue a
bit) but rather indicates an implementation issue.

Coming back to your question, Flink's Broadcast stream does *not* block or
collect events from the non-broadcasted side if the broadcast side doesn't
serve events.
However, the user-implemented operators (Beam or your code in this case)
often puts non-broadcasted events into state to wait for input from the
other side.
Since the error is not about lack of memory, the buffering in Flink state
might not be the problem here.

Best, Fabian





Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
eleanore@gmail.com>:

> Hi All,
>
> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
> filter of the data from main stream.
>
> I have experienced OOM: GC overhead limit exceeded continuously.
>
> After did some experiments, I observed following behaviour:
> 1. run job without side input(broadcast stream): no OOM issue
> 2. run job with side input (kafka topic with 1 partition) with data
> available from this side input: no OOM issue
> 3. run job with side input (kafka topic with 1 partition) without any
> data from the side input: *OOM issue*
> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
> looks like due to the references hold by Broadcast stream
> [image: image.png]
>
> My question is: what is the behaviour from Broadcast stream if there is no
> data available? Does it cache the data from main stream and wait until data
> becoming available from Broadcast stream to process?
>
> Thanks a lot!
> Eleanore
>


Re: multiple joins in one job

2020-05-04 Thread Fabian Hueske
Hi,

If the interval join emits the time attributes of both its inputs, you can
use either of them as a time attribute in a following operator because the
join ensures that the watermark will be aligned with both of them.

Best, Fabian

Am Mo., 4. Mai 2020 um 00:48 Uhr schrieb lec ssmi :

> Thanks for your replay.
> But as I known, if   the time attribute  will be retained and  the time
> attribute field  of both streams is selected in the result after joining,
> who is the final time attribute variable?
>
> Benchao Li  于2020年4月30日周四 下午8:25写道:
>
>> Hi lec,
>>
>> AFAIK, time attribute will be preserved after time interval join.
>> Could you share your DDL and SQL queries with us?
>>
>> lec ssmi  于2020年4月30日周四 下午5:48写道:
>>
>>> Hi:
>>>I need to join multiple stream tables  using  time interval join.
>>> The problem is that the time attribute will disappear  after the jon , and
>>> pure  sql cannot declare the time attribute field again . So, to make is
>>> success,  I need to insert  the last result of join to kafka ,and consume
>>> it and join it with another stream table  in another flink job . This seems
>>> troublesome.
>>> Any good idea?
>>>
>>>
>>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>


Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-21 Thread Fabian Hueske
Thanks for sharing your solution Anil!

Cheers, Fabian

Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K :

> Thanks Fabian,
>
> I ended up using something like below.
>
> public class GenericSerializer implements 
> KafkaSerializationSchema {
>
>   private final SerializationSchema valueSerializer;
>   private final String topic;
>
>   public GenericSerializer(String topic, Schema schemaValue, String 
> schemaRegistryUrl) {
> this.valueSerializer =
> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, 
> schemaValue, schemaRegistryUrl);
> this.topic = topic;
>   }
>
>   @Override
>   public ProducerRecord serialize(GenericRecord element, Long 
> timestamp) {
> byte[] value = valueSerializer.serialize(element);
> return new ProducerRecord<>(topic, value);
>   }
> }
>
> Then used a new object of GenericSerializer in the FlinkKafkaProducer
>
> FlinkKafkaProducer producer =
> new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, 
> schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);
>
> Thanks , Anil.
>
>
> On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske  wrote:
>
>> Hi Anil,
>>
>> Here's a pointer to Flink's end-2-end test that's checking the
>> integration with schema registry [1].
>> It was recently updated so I hope it works the same way in Flink 1.9.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
>>
>> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <
>> sendto.ani...@gmail.com>:
>>
>>> Hi,
>>>
>>> What is the best way to use Confluent SchemaRegistry with
>>> FlinkKafkaProducer?
>>>
>>> What I have right now is as follows.
>>>
>>> SerializationSchema serializationSchema =
>>> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
>>> schemaRegistryUrl);
>>>
>>> FlinkKafkaProducer kafkaProducer =
>>> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
>>> outputStream.addSink(producer);
>>>
>>> FlinkKafkaProducer with is SerializationSchema now depricated.
>>>
>>> I am using flink 1.9.
>>>
>>> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
>>> ConfluentSchemaRegsitry?
>>>
>>> Is there some reference/documentation i could use?
>>>
>>> Thanks , Anil.
>>>
>>>


Re: Problem getting watermark right with event time

2020-04-20 Thread Fabian Hueske
Hi Sudan,

I noticed a few issues with your code:

1) Please check the computation of timestamps. Your code

public long extractAscendingTimestamp(Eventi.Event element) {
  return element.getEventTime().getSeconds() * 1000;
}

only seems to look at the seconds of a timestamp. Typically, you would just
return the whole timestamp encoded as a long that represents the
milliseconds since epoch (1970-01-01 00:00:00.000).
Why do you multiple with 1000?

2) An AscendingTimestampExtractor assumes that records arrive with strictly
ascending timestamps.
If the timestamps in your data are slightly out of order, you probably want
another watermark assigner for example a
BoundedOutOfOrdernessTimestampExtractor [1].

3) You probably don't want to key on event time:

keyBy(Eventi.Event::getEventTime)

Usually, you choose a partitioning key here. If you cannot partition your
data and all records should be grouped in the single stream of windows you
should use DataStream.windowAll().
Note however, that this means that your code cannot run in parallel. See
[2] for details.

 Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#keyed-vs-non-keyed-windows

Am So., 19. Apr. 2020 um 21:37 Uhr schrieb Sudan S :

> Hi,
>
> I am having a problem getting watermark right. The setup is
> - I have a Flink Job which reads from a Kafka topic, uses Protobuf
> Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
> the value and finally returns the result.
>
> The code is pasted below.
>
> The problem here is, I'm not able to reach the sink. I am able to reach
> the assignTimestamp when the timestamp arrives, but past that, neither
> process function nor the sink function is getting invoked in spite of
> pumping events regularly. I'm not able to figure out how to debug this
> issue.
> Plz help.
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> Properties kafkaConsumerProps = new Properties();
> kafkaConsumerProps.setProperty("bootstrap.servers",
> "{bootstrap_servers}");
> kafkaConsumerProps.setProperty("group.id", "{group_id}");
>
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());
> env.enableCheckpointing(100);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setMaxParallelism(5);
> env.setParallelism(5);
>
> SingleOutputStreamOperator texStream = env
> .addSource(new FlinkKafkaConsumer011<>("auth", new
> EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
> SlidingEventTimeWindows window =
> SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
> texStream.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Eventi.Event element) {
> return element.getEventTime().getSeconds() * 1000;
> }
> }).keyBy(Eventi.Event::getEventTime).window(window).process(new
> ProcessWindowFunction() {
> @Override
> public void process(Timestamp timestamp, Context context,
> Iterable elements, Collector out) throws Exception {
> int sum = 0;
> for (Eventi.Event element : elements) {
> sum++;
> }
> out.collect(sum);
> }
> }).print()
>
> env.execute();
> }
> }
>
> --
> *"The information contained in this e-mail and any accompanying documents
> may contain information that is confidential or otherwise protected from
> disclosure. If you are not the intended recipient of this message, or if
> this message has been addressed to you in error, please immediately alert
> the sender by replying to this e-mail and then delete this message,
> including any attachments. Any dissemination, distribution or other use of
> the contents of this message by anyone other than the intended recipient is
> strictly prohibited. All messages sent to and from this e-mail address may
> be monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."*
> --
>


Re: FlinkKafakaProducer with Confluent SchemaRegistry and KafkaSerializationSchema

2020-04-20 Thread Fabian Hueske
Hi Anil,

Here's a pointer to Flink's end-2-end test that's checking the integration
with schema registry [1].
It was recently updated so I hope it works the same way in Flink 1.9.

Best,
Fabian

[1]
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java

Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K :

> Hi,
>
> What is the best way to use Confluent SchemaRegistry with
> FlinkKafkaProducer?
>
> What I have right now is as follows.
>
> SerializationSchema serializationSchema =
> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
> schemaRegistryUrl);
>
> FlinkKafkaProducer kafkaProducer =
> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
> outputStream.addSink(producer);
>
> FlinkKafkaProducer with is SerializationSchema now depricated.
>
> I am using flink 1.9.
>
> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
> ConfluentSchemaRegsitry?
>
> Is there some reference/documentation i could use?
>
> Thanks , Anil.
>
>


Re: how to hold a stream until another stream is drained?

2020-04-06 Thread Fabian Hueske
Hi,

With Flink streaming operators

However, these parts are currently being reworked to enable a better
integration of batch and streaming use cases (or hybrid use cases such as
yours).
A while back, we wrote a blog post about these plans [1]:

> *"Unified Stream Operators:* Blink extends the Flink streaming runtime
operator model to support selectively reading from different inputs, while
keeping the push model for very low latency. This control over the inputs
helps to now support algorithms like hybrid hash-joins on the same operator
and threading model as continuous symmetric joins through RocksDB. These
operators also form the basis for future features like “Side Inputs”

."

I'm not familiar with the internal details here, but I found the
InputSelectable [2] interface that looks like it would do what you are
looking for.
Note that this interface is not used on the higher-level DataStream API
level, but rather on the lower StreamOperator level.

Best, Fabian

[1]
https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
[2]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java




W

Am Mo., 6. Apr. 2020 um 12:49 Uhr schrieb 刘宇宝 :

> I’m using JDBCInputFormat to read snapshot of a MySQL table  and
> FlinkKafkaConsumer to read binlog which is written to Kafka by Debezium.
>
>
>
> DataStream binlogStream = env.addSource(new FlinkKafkaConsumer(…));
>
> DataStream snapshotStream =
> env.createInput(JDBCInputFormat.buildJDBCInputFormat()….);
>
>
>
> // map() is to convert two streams into same type:  (action,  fields…),
> where action is “insert”, “update”, “delete”.  The action for
> “snapshotStream” is always “insert”.
>
> DataStream tableStream = binlogStream.map(…).union(snapshotStream.map(…));
>
>
>
> tableStream.print();
>
> env.execute(“example”);
>
>
>
>1. To make sure “tableStream” doesn’t miss any row,  the
>“binlogStream” must connect to  Kafka first so that binlog starts before
>the table snapshot,  I can roughly achieve this by
>“myKafkaConsumer.setStartFromTimestamp(System.currentTimeMillis() –
>600*1000)”.
>2. To make sure changes from “binlogStream” always overwrite upon
>“snapshotStream”,   I need a way to hold “binlogStream”  until
>“snapshotStream” is drained,  so that changes from “binlogStream” are all
>behind changes from “snapshotStream”.  How can I achieve this ?
>
>
>
> I’m considering a wrapper SourceFunction to combine FlinkKafkaConsumer and
> JDBCInputFormat,  but they are different on parallelism  and
> checkpointing,  I’m not sure how to get the wrapper right and even whether
> it’s right direction.
>
>
>
> Any suggestion will be very appreciated!
>
>
>


Re: Storing Operator state in RocksDb during runtime - plans

2020-04-06 Thread Fabian Hueske
Hi Kristoff,

I'm not aware of any concrete plans for such a feature.

Best,
Fabian

Am So., 5. Apr. 2020 um 22:33 Uhr schrieb KristoffSC <
krzysiek.chmielew...@gmail.com>:

> Hi,
> according to [1] operator state and broadcast state (which is a "special"
> type of operator state) are not stored in RocksDb during runtime when
> RocksDb is choosed as state backend.
>
> Are there any plans to change this?
>
> I'm working around a case where I will have a quite large control stream.
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink job getting killed

2020-04-06 Thread Fabian Hueske
Hi Giriraj,

This looks like the deserialization of a String failed.
Can you isolate the problem to a pair of sending and receiving tasks?

Best, Fabian

Am So., 5. Apr. 2020 um 20:18 Uhr schrieb Giriraj Chauhan <
graj.chau...@gmail.com>:

> Hi,
>
> We are submitting a flink(1.9.1) job for data processing. It runs fine and
> processes data for sometime i.e. ~30 mins and later it throws following
> exception and job gets killed.
>  2020-04-02 14:15:43,371 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Sink: Unnamed (2/4) (45d01514f0fb99602883ca43e997e8f3)
> switched from RUNNING to FAILED.
> java.io.EOFException
> at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:321)
> at
> org.apache.flink.types.StringValue.readString(StringValue.java:769)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:205)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:141)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:91)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.base/java.lang.Thread.run(Unknown Source)
>
>
> Once the above exception occur, we do see following runtime exception
>
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.dell.emc.mars.topology.bl.DiffGenerator.handleCreate(DiffGenerator.java:519)
> at
> com.dell.emc.mars.topology.bl.DiffGenerator.populateHTable(DiffGenerator.java:294)
> at
> com.dell.emc.mars.topology.bl.DiffGenerator.compare(DiffGenerator.java:58)
> at
> com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:146)
> at
> com.dell.emc.mars.topology.bl.CompareWithDatabase.flatMap(CompareWithDatabase.java:22)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:110)
> at
> com.dell.emc.mars.topology.bl.CompareWithModel.flatMap(CompareWithModel.java:24)
> at
> 

Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Fabian Hueske
Congrats Jingsong!

Cheers, Fabian

Am Fr., 21. Feb. 2020 um 17:49 Uhr schrieb Rong Rong :

> Congratulations Jingsong!!
>
> Cheers,
> Rong
>
> On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
>
> > Congrats, Jingsong!
> >
> > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
> > wrote:
> >
> >> Congratulations Jingsong!
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
> >>
> >>>   Congratulations Jingsong!
> >>>
> >>>Best,
> >>>Yun
> >>>
> >>> --
> >>> From:Jingsong Li 
> >>> Send Time:2020 Feb. 21 (Fri.) 21:42
> >>> To:Hequn Cheng 
> >>> Cc:Yang Wang ; Zhijiang <
> >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
> >>> he ; dev ; user <
> >>> user@flink.apache.org>
> >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> >>>
> >>> Thanks everyone~
> >>>
> >>> It's my pleasure to be part of the community. I hope I can make a
> better
> >>> contribution in future.
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
> >>> Congratulations Jingsong! Well deserved.
> >>>
> >>> Best,
> >>> Hequn
> >>>
> >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
> wrote:
> >>> Congratulations!Jingsong. Well deserved.
> >>>
> >>>
> >>> Best,
> >>> Yang
> >>>
> >>> Zhijiang  于2020年2月21日周五 下午1:18写道:
> >>> Congrats Jingsong! Welcome on board!
> >>>
> >>> Best,
> >>> Zhijiang
> >>>
> >>> --
> >>> From:Zhenghua Gao 
> >>> Send Time:2020 Feb. 21 (Fri.) 12:49
> >>> To:godfrey he 
> >>> Cc:dev ; user 
> >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> >>>
> >>> Congrats Jingsong!
> >>>
> >>>
> >>> *Best Regards,*
> >>> *Zhenghua Gao*
> >>>
> >>>
> >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
> wrote:
> >>> Congrats Jingsong! Well deserved.
> >>>
> >>> Best,
> >>> godfrey
> >>>
> >>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
> >>> Congratulations!Jingsong. You deserve it
> >>>
> >>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
> >>> Congrats Jingsong!
> >>>
> >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
> >>>
> >>> > Congrats Jingsong!
> >>> >
> >>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
> >>> > >
> >>> > > Congratulations Jingsong! Well deserved.
> >>> > >
> >>> > > Best,
> >>> > > Jark
> >>> > >
> >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
> >>> > >
> >>> > >> Congratulations! Jingsong
> >>> > >>
> >>> > >>
> >>> > >> Best,
> >>> > >> Dan Zou
> >>> > >>
> >>> >
> >>> >
> >>>
> >>>
> >>> --
> >>> Best Regards
> >>>
> >>> Jeff Zhang
> >>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>>
> >>>
>


Re: [ANNOUNCE] Flink Forward San Francisco 2020 Program is Live

2020-02-14 Thread Fabian Hueske
Hi everyone,

Sorry for writing another email but I forgot to mention the community
discounts.
When you register for the conference [1], you can use one of the following
discount codes:

* As a member of the Flink community we offer a 50% discount on your
conference pass if you register with the code: FFSF20-MailingList
* If you are an Apache committer (for any project), we offer a *free*
conference pass if you register with your Apache email address and the
discount code: FFSF20-ApacheCommitter

Have a nice weekend,
Fabian

[1] https://events.evolutionaryevents.com/flink-forward-sf-2020

Am Fr., 14. Feb. 2020 um 17:48 Uhr schrieb Fabian Hueske :

> Hi everyone,
>
> We announced the program of Flink Forward San Francisco 2020.
> The conference takes place at the Hyatt Regency in San Francisco from
> March 23rd to 25th.
>
> On the first day we offer four training sessions [1]:
> * Apache Flink Developer Training
> * Apache Flink Runtime & Operations Training
> * Apache Flink Tuning & Troubleshooting Training
> * Apache Flink SQL Developer Training
>
> On day two and three we have a great lineup of talks [2], including
> speakers from AWS, Bird, Cloudera, Lyft, Netflix, Splunk, Uber, Yelp,
> Alibaba, Ververica and others.
>
> Flink Forward is an excellent opportunity to learn about Flink use cases,
> recent Flink features, and best practices of running Flink applications in
> production.
> It's also a great place to connect and mingle with the Flink community.
>
> You can register for the event at
> -> https://events.evolutionaryevents.com/flink-forward-sf-2020
>
> Hope to see you there,
> Fabian
>
> [1] https://www.flink-forward.org/sf-2020/training-program
> [2] https://www.flink-forward.org/sf-2020/conference-program
>


[ANNOUNCE] Flink Forward San Francisco 2020 Program is Live

2020-02-14 Thread Fabian Hueske
Hi everyone,

We announced the program of Flink Forward San Francisco 2020.
The conference takes place at the Hyatt Regency in San Francisco from March
23rd to 25th.

On the first day we offer four training sessions [1]:
* Apache Flink Developer Training
* Apache Flink Runtime & Operations Training
* Apache Flink Tuning & Troubleshooting Training
* Apache Flink SQL Developer Training

On day two and three we have a great lineup of talks [2], including
speakers from AWS, Bird, Cloudera, Lyft, Netflix, Splunk, Uber, Yelp,
Alibaba, Ververica and others.

Flink Forward is an excellent opportunity to learn about Flink use cases,
recent Flink features, and best practices of running Flink applications in
production.
It's also a great place to connect and mingle with the Flink community.

You can register for the event at
-> https://events.evolutionaryevents.com/flink-forward-sf-2020

Hope to see you there,
Fabian

[1] https://www.flink-forward.org/sf-2020/training-program
[2] https://www.flink-forward.org/sf-2020/conference-program


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Fabian Hueske
Congrats team and a big thank you to the release managers!

Am Mi., 12. Feb. 2020 um 16:33 Uhr schrieb Timo Walther :

> Congratualations everyone! Great stuff :-)
>
> Regards,
> Timo
>
>
> On 12.02.20 16:05, Leonard Xu wrote:
> > Great news!
> > Thanks everyone involved !
> > Thanks Gary and Yu for being the release manager !
> >
> > Best,
> > Leonard Xu
> >
> >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
> >>
> >> Congrats to us all.
> >>
> >> A big piece of work, nicely done.
> >>
> >> Let's hope that this helps our users make their existing use cases
> easier and also opens up new use cases.
> >>
> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  beggingh...@gmail.com>> wrote:
> >> Greet work.
> >>
> >> Congxian Qiu mailto:qcx978132...@gmail.com>>
> 于2020年2月12日周三 下午10:11写道:
> >> Great work.
> >> Thanks everyone involved.
> >> Thanks Gary and Yu for being the release manager
> >>
> >>
> >> Best,
> >> Congxian
> >>
> >>
> >> Jark Wu mailto:imj...@gmail.com>> 于2020年2月12日周三
> 下午9:46写道:
> >> Congratulations to everyone involved!
> >> Great thanks to Yu & Gary for being the release manager!
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  reed...@gmail.com>> wrote:
> >> Cheers!
> >> Thanks Gary and Yu for the great job as release managers.
> >> And thanks to everyone whose contribution makes the release possible!
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Wyatt Chun mailto:wyattc...@gmail.com>>
> 于2020年2月12日周三 下午9:36写道:
> >> Sounds great. Congrats & Thanks!
> >>
> >> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  car...@gmail.com>> wrote:
> >> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.10.0, which is the latest major release.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html <
> https://flink.apache.org/downloads.html>
> >>
> >> Please check out the release blog post for an overview of the
> improvements for this new major release:
> >> https://flink.apache.org/news/2020/02/11/release-1.10.0.html <
> https://flink.apache.org/news/2020/02/11/release-1.10.0.html>
> >>
> >> The full release notes are available in Jira:
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> >
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >>
> >> Cheers,
> >> Gary & Yu
> >
> >
>
>


Re: Flink solution for having shared variable between task managers

2020-02-03 Thread Fabian Hueske
Hi,

I think you are looking for BroadcastState [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Am Fr., 17. Jan. 2020 um 14:50 Uhr schrieb Soheil Pourbafrani <
soheil.i...@gmail.com>:

> Hi,
>
> According to the processing logic, I need to have a HashMap variable that
> should be shared between the taskmanagers. The scenario is the HashMap data
> will be continuously updated according to the incoming stream of data.
>
> What I observed is declaring the HashMap variable as a class attribute, it
> will be shared among a single taskmanagers slots, but in case I have
> multiple taskmanager, each will have a separate HashMap instance.
>
> What is the standard way to achieve this? Does Flink provide any utility
> for that?
>


[ANNOUNCE] Community Discounts for Flink Forward SF 2020 Registrations

2020-01-30 Thread Fabian Hueske
Hi everyone,

The registration for Flink Forward SF 2020 is open now!

Flink Forward San Francisco 2020 will take place from March 23rd to 25th.
The conference will start with one day of training and continue with two
days of keynotes and talks.
We would like to invite you to join the Apache Flink community to connect
with other Flink enthusiasts and learn about Flink use cases, operational
best practices, and technical deep dives.

Please register at
--> https://events.evolutionaryevents.com/flink-forward-sf-2020
if you'd like to attend Flink Forward SF.

IMPORTANT:
* As a member of the Flink community we offer you a 50% discount on your
conference pass if you register with the code: FFSF20-MailingList
* If you are an Apache committer (for any project), we offer a *free*
conference pass if you register with your Apache email address and the
discount code: FFSF20-ApacheCommitter

Best,
Fabian


Re: PostgreSQL JDBC connection drops after inserting some records

2020-01-28 Thread Fabian Hueske
Hi,

The exception is thrown by Postgres.
I'd start investigating there what the problem is.

Maybe you need to tweak your Postgres configuration, but it might also be
that the Flink connector needs to be differently configured.
If the necessary config option is missing, it would be good to add.

However, at this point it's not clear why Postgres fails.
I'd recommend to check the Postgres exception and figure out why it is
failing.

Cheers,
Fabian


Am Di., 28. Jan. 2020 um 09:02 Uhr schrieb Arvid Heise :

> Hi Soheil,
>
> what is your actual question? Did the application eventually finish or
> does it keep restarting?
>
> In general, communication with external systems may fail from time to
> time. Only if it persists, we would explore it. If it is very rare, a
> restart should already help.
>
> Best,
>
> Arvid
>
> On Thu, Jan 23, 2020 at 5:35 PM Soheil Pourbafrani 
> wrote:
>
>> Hi,
>> I have a peace of Flink Streaming code that reads data from files and
>> inserts them into the PostgreSQL table. After inserting 6 to 11 million
>> records, I got the following errors:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: java.lang.RuntimeException: Execution of JDBC statement
>> failed. at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>> at
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>> at
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>> ... 15 moreCaused by: java.sql.BatchUpdateException: Batch entry 0 INSERT
>> INTO csv_data(asset, tag, t, q, v, backfill, createdAt, createdBy) VALUES
>> ('SST', 'XC_XC', '2015-04-11 21:36:23+03', 12.0, '1.00', 'FALSE',
>> '2020-01-23 19:22:14.469+03', 'system') ON CONFLICT DO NOTHING was aborted:
>> An I/O error occurred while sending to the backend.  Call getNextException
>> to see other errors in the batch. at
>> org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
>> at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:515)
>> at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:853) at
>> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>> ... 21 moreCaused by: org.postgresql.util.PSQLException: An I/O error
>> occurred while sending to the backend. at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:516)
>> ... 24 moreCaused by: java.io.EOFException at
>> org.postgresql.core.PGStream.receiveChar(PGStream.java:337) at
>> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2000)*
>> at
>> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:510)
>> ... 24 more
>>
>> However as I enabled the Restart Strategy, the app will automatically be
>> restarted and reconnect to the database.
>> My code simply reads data from files and after transforming them into the
>> table schema, insert the rows into the table.
>>
>> It would be great if anyone can help me with this
>> Thanks
>>
>


Re: How to declare the Row object schema

2020-01-17 Thread Fabian Hueske
Hi,

Which version are you using?
I can't find the error message in the current code base.

When writing data to a JDBC database, all Flink types must be correctly
matched to a JDBC type.
The problem is probably that Flink cannot match the 8th field of your Row
to a JDBC type.
What's the type of the 8th field?

Best, Fabian

Am Do., 16. Jan. 2020 um 20:51 Uhr schrieb Soheil Pourbafrani <
soheil.i...@gmail.com>:

> Hi,
>
> Inserting a DataSet of the type Row using the Flink *JDBCOutputFormat *I
> continuously go the warning:
> [DataSink (org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@18be83e4)
> (1/4)] WARN org.apache.flink.api.java.io.jdbc.JDBCOutputFormat - Unknown
> column type for column 8. Best effort approach to set its value: system.
>
> I tried to declare the Row schema by implementing the
> *ResultTypeQueryable* class alongside with the FlatMap function I create
> the final Row type for insertion but made no changes!
>
> What is this warning for?
>


Re: Filter with large key set

2020-01-17 Thread Fabian Hueske
Hi Eleanore,

A dynamic filter like the one you need, is essentially a join operation.
There is two ways to do this:

* partitioning the key set and the message on the attribute. This would be
done with a KeyedCoProcessFunction.
* broadcasting the key set and just locally forwarding the messages. This
would be done with a KeyedBroadcastProcessFunction.

The challenge in your application is that the key set entries have
different types which is something that Flink does not very well support.
There is two ways to go about this:

1) route all data through the same operators that can handle all types. You
can model this with an n-ary Either type. Flink only has a binary Either
type, so you would need to implement the TypeInformation, serializer, and
comparator yourself. The Either classes should give you good guidance for
that.
2) have different operators and flows for each basic data type. This will
fan out your job, but should be the easier approach.

Best, Fabian



Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi :

> Hi there,
>
> I have the following usecase:
> a key set say [A,B,C,] with around 10M entries, the type of the
> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
> Integer etc...
>
> and each message looks like below:
> message: {
>header: A
>body: {}
> }
>
> I would like to use Flink to filter each message' header field, to see if
> the value present in the key set.
>
> *The key set needs to be dynamic, meaning at any time, we can perform
> add/read/delete operations on the key set. *
>
> Any suggestions are very welcome!
>
> Thanks a lot!
> Eleanore
>


Re: Filter with large key set

2020-01-17 Thread Fabian Hueske
Hi Eleanore,

A dynamic filter like the one you need, is essentially a join operation.
There is two ways to do this:

* partitioning the key set and the message on the attribute. This would be
done with a KeyedCoProcessFunction.
* broadcasting the key set and just locally forwarding the messages. This
would be done with a KeyedBroadcastProcessFunction.

The challenge in your application is that the key set entries have
different types which is something that Flink does not very well support.
There is two ways to go about this:

1) route all data through the same operators that can handle all types. You
can model this with an n-ary Either type. Flink only has a binary Either
type, so you would need to implement the TypeInformation, serializer, and
comparator yourself. The Either classes should give you good guidance for
that.
2) have different operators and flows for each basic data type. This will
fan out your job, but should be the easier approach.

Best, Fabian



Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi :

> Hi there,
>
> I have the following usecase:
> a key set say [A,B,C,] with around 10M entries, the type of the
> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
> Integer etc...
>
> and each message looks like below:
> message: {
>header: A
>body: {}
> }
>
> I would like to use Flink to filter each message' header field, to see if
> the value present in the key set.
>
> *The key set needs to be dynamic, meaning at any time, we can perform
> add/read/delete operations on the key set. *
>
> Any suggestions are very welcome!
>
> Thanks a lot!
> Eleanore
>


Re: Why would indefinitely growing state an issue for Flink while doing stream to stream joins?

2020-01-17 Thread Fabian Hueske
Hi,

Large state is mainly an issue for Flink's fault tolerance mechanism which
is based on periodic checkpoints, which means that the state is copied to a
remote storage system in regular intervals.
In case of a failure, the state copy needs to be loaded which takes more
time with growing state size.
There are a few features of Flink that reduce the cost of large state, like
incremental checkpoints and local recovery.
However, in general is large state more difficult to handle than small
state.

If your application needs to persists state forever to run a join with
correct semantics, than this can be fine.
However, you should roughly assess how fast your state will be growing and
prepare your application to be able to scale to more machines (configure
max-parallelism) when the limits of your current setup are reached.

Best, Fabian


Am Do., 16. Jan. 2020 um 16:07 Uhr schrieb kant kodali :

> Hi All,
>
> The doc
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#regular-joins
>  says
> the following.
>
> "However, this operation has an important implication: it requires to
> keep both sides of the join input in Flink’s state forever. Thus, the
> resource usage will grow indefinitely as well, if one or both input tables
> are continuously growing"
>
> I wonder why this would be an issue especially when the state is stored in
> RocksDB which in turn is backed by disk?
>
> I have a use case where I might need to do stream-stream join or some
> emulation of that across say 6 or more tables and I don't know for sure how
> long I need to keep the state because a row today can join with a row a
> year or two years from now. will that be an issue? do I need to think about
> designing a solution in another way without using stream-stream join?
>
> Thanks!
>
>
>
>
>
>
>
>


[ANNOUNCE] Flink Forward San Francisco 2020 Call for Presentation extended!

2020-01-13 Thread Fabian Hueske
Hi everyone,

We know some of you only came back from holidays last week.
To give you more time to submit a talk, we decided to extend the Call for
Presentations for Flink Forward San Francisco 2020 until Sunday January
19th.

The conference takes place on March 23-25 with two days of talks and one
day of training.
If you are working on an interesting Flink project or use case that you'd
like to share with many enthusiastic Flink users and committer, you should
submit a talk proposal.

We are looking for talks on the following topics:
* Use Cases
* Operations
* Technology Deep Dive
* Ecosystem
* Community

You can find more detailed track descriptions and the form to submit a
proposal on the Flink Forward website at

--> https://www.flink-forward.org/sf-2020/call-for-presentations

As usual, accepted speakers get a free conference pass (incl. training
sessions).

Best regards,
Fabian
(PC Chair for Flink Forward SF 2020)


[ANNOUNCE] Flink Forward SF Call for Presentation closing soon!

2020-01-06 Thread Fabian Hueske
Hi all,

First of all, Happy New Year to everyone!

Many of you probably didn't spent the holidays thinking a lot about Flink.
Now, however, is the right time to focus again and decide which talk(s) to
submit for Flink Forward San Francisco because the Call for Presentations
is closing this Sunday, January 12th.

Flink Forward SF will talk place on March 23-25, 2020.
For the first time, the conference will feature two days of talks (one more
than before), so we are offering twice as many speaking slots as last year.

If you'd like to share your Flink experience with an international audience
of enthusiastic Flink users and committers, you should definitely submit
talk proposal.

We are looking for talks on the following topics:
* Use Cases
* Operations
* Technology Deep Dive
* Ecosystem
* Community

You can find more detailed track descriptions and the form to submit a
proposal at

--> https://www.flink-forward.org/sf-2020/call-for-presentations

Best regards,
Fabian
(PC Chair for Flink Forward SF 2020)


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Fabian Hueske
Congrats Zhu Zhu and welcome on board!

Best, Fabian

Am Fr., 13. Dez. 2019 um 17:51 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Hi everyone,
>
> I'm very happy to announce that Zhu Zhu accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Zhu Zhu has been an active community member for more than a year now. Zhu
> Zhu played an essential role in the scheduler refactoring, helped
> implementing fine grained recovery, drives FLIP-53 and fixed various bugs
> in the scheduler and runtime. Zhu Zhu also helped the community by
> reporting issues, answering user mails and being active on the dev mailing
> list.
>
> Congratulations Zhu Zhu!
>
> Best, Till
> (on behalf of the Flink PMC)
>


Re: Joining multiple temporal tables

2019-12-06 Thread Fabian Hueske
Thank you!
Please let me know if the workaround works for you.

Best, Fabian

Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller :

> Hi Fabian,
>
> Thanks for confirming the issue and suggesting a workaround - I'll give
> that a try. I've created a JIRA issue as you suggested,
> https://issues.apache.org/jira/browse/FLINK-15112
>
> Many thanks,
> Chris
>
>
> -- Original Message --
> From: "Fabian Hueske" 
> To: "Chris Miller" 
> Cc: "user@flink.apache.org" 
> Sent: 06/12/2019 14:52:16
> Subject: Re: Joining multiple temporal tables
>
> Hi Chris,
>
> Your query looks OK to me.
> Moreover, you should get a SQLParseException (or something similar) if it
> wouldn't be valid SQL.
>
> Hence, I assume you are running in a bug in one of the optimizer rules.
> I tried to reproduce the problem on the SQL training environment and
> couldn't write a query that joins two temporal tables.
> What worked though was to first create a view of a query that joins the
> stream with one temporal table and then join the view with the second one.
> Maybe that workaround also works for you?
>
> It would be great if you could open a Jira issue for this bug including
> your program to reproduce the bug.
>
> Thank you,
> Fabian
>
> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller  >:
>
>> I want to decorate/enrich a stream by joining it with "lookups" to the
>> most recent data available in other streams. For example, suppose I have a
>> stream of product orders. For each order, I want to add price and FX rate
>> information based on the order's product ID and order currency.
>>
>> Is it possible to join a table with two other temporal tables to achieve
>> this? I'm trying but getting a NullPointerException deep inside Flink's
>> Calcite code. I've attached some sample code that demonstrates the problem.
>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>> problem and provide a better error message), or is the SQL correct and this
>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>> result?
>>
>> The SQL I'm trying to run:
>>
>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>   FROM Orders AS o,
>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>   WHERE o.currency = f.currency
>>   AND o.productId = p.productId
>>
>> The exception I get:
>>
>> Exception in thread "main" java.lang.NullPointerException
>> at
>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>> at
>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>> at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>> at
>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>> at
>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>> at
>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>> at
>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>> at
>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>> at
>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
>> at
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
>> at
>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLi

Re: Joining multiple temporal tables

2019-12-06 Thread Fabian Hueske
Hi Chris,

Your query looks OK to me.
Moreover, you should get a SQLParseException (or something similar) if it
wouldn't be valid SQL.

Hence, I assume you are running in a bug in one of the optimizer rules.
I tried to reproduce the problem on the SQL training environment and
couldn't write a query that joins two temporal tables.
What worked though was to first create a view of a query that joins the
stream with one temporal table and then join the view with the second one.
Maybe that workaround also works for you?

It would be great if you could open a Jira issue for this bug including
your program to reproduce the bug.

Thank you,
Fabian

Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller :

> I want to decorate/enrich a stream by joining it with "lookups" to the
> most recent data available in other streams. For example, suppose I have a
> stream of product orders. For each order, I want to add price and FX rate
> information based on the order's product ID and order currency.
>
> Is it possible to join a table with two other temporal tables to achieve
> this? I'm trying but getting a NullPointerException deep inside Flink's
> Calcite code. I've attached some sample code that demonstrates the problem.
> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
> problem and provide a better error message), or is the SQL correct and this
> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
> result?
>
> The SQL I'm trying to run:
>
> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>   FROM Orders AS o,
>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>   WHERE o.currency = f.currency
>   AND o.productId = p.productId
>
> The exception I get:
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
> at
> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
> at
> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
> at
> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
> at
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
> at test.PojoTest.run(PojoTest.java:96)
> at test.PojoTest.main(PojoTest.java:23)
>


Re: Row arity of from does not match serializers.

2019-12-06 Thread Fabian Hueske
Hi,

The inline lambda MapFunction produces a Row with 12 String fields (12
calls to String.join()).
You use RowTypeInfo rowTypeDNS to declare the return type of the lambda
MapFunction. However, rowTypeDNS is defined with much more String fields.

The exception tells you that the number of fields returned by the function
is not equal to the number of fields that were declared by rowTypeDNS.

Hope this helps,
Fabian

Am Do., 5. Dez. 2019 um 20:35 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> My Flink job does reading from Kafka stream and does some processing.
>
> Code snippet:
>
>> DataStream flatternedDnsStream = filteredNonNullStream.rebalance()
>> .map(node -> {
>> JsonNode source = node.path("source");
>> JsonNode destination = node.path("destination");
>> JsonNode dns = node.path("dns");
>> JsonNode event = node.path("event");
>> JsonNode client = node.path("client");
>> JsonNode organization = node.path("organization");
>> JsonNode timestamp_received = node.path("timestamp_received");
>> JsonNode transaction = node.path("transaction");
>> JsonNode timestamp = node.path("@timestamp");
>> JsonNode message = node.path("message");
>> JsonNode network = node.path("network");
>> JsonNode ecs = node.path("ecs");
>>
>> return Row.of(String.join(",", getParsed(ecs)), String.join(",",
>> getParsed(source)),
>> String.join(",", getParsed(destination)), String.join(",",
>> getParsed(event)),
>> String.join(",", getParsed(organization)), String.join(",",
>> getParsed(timestamp_received)),
>> String.join(",", getParsed(client)), String.join(",",
>> getParsed(transaction)),
>> String.join(",", getParsed(timestamp)), String.join(",",
>> getParsed(message)),
>> String.join(",", getParsed(dns)), String.join(",", getParsed(network)));
>> }).returns(rowTypeDNS);
>>
>> public static final RowTypeInfo rowTypeDNS = new
>> RowTypeInfo(Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(),
>> Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING());
>>
>
>> private static List getParsed(JsonNode node) {
>>
>> List list = new ArrayList<>();
>> Iterator> it = node.fields();
>> iterateAndExtract(it, list);
>> return list;
>> }
>>
>> private static void iterateAndExtract(Iterator>
>> it, List list) {
>>
>> while (it.hasNext()) {
>> Entry e = it.next();
>> if (!e.getValue().isContainerNode()) {
>> list.add(e.getValue().asText());
>> continue;
>> }
>>
>> iterateAndExtract(e.getValue().fields(), list);
>> }
>> }
>>
>
> failing with the following error:
> java.lang.RuntimeException: Row arity of from does not match serializers.
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:86)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:44)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> 

Flink Forward North America 2020 - Call for Presentations open until January 12th, 2020

2019-11-20 Thread Fabian Hueske
Hi all,

Flink Forward North America returns to San Francisco on March 23-25, 2020.
For the first time in North America, the conference will feature two days
of talks and one day of training.

We are happy to announce that the Call for Presentations is open!

If you'd like to give a talk and share your Flink experience with an
international audience of highly skilled and enthusiastic Flink users and
committers, we invite you to submit a talk proposal.

We are looking for talks on the following topics:
* Use Cases
* Operations
* Technology Deep Dive
* Ecosystem
* Community

You can find more detailed track descriptions and the form to submit a
proposal on the Flink Forward website at

--> https://www.flink-forward.org/sf-2020/call-for-presentations

The deadline for submissions is January 12th, 2020.

Best regards,
Fabian
(PC Chair for Flink Forward NA 2020)


Re: Issue with writeAsText() to S3 bucket

2019-11-06 Thread Fabian Hueske
Hi Micheal,

I'm not super familiar with S3 but from my understanding, files might not
be visible to other services (such as a directory browser) immediately
after they've been created.
Did you wait for some time after you cancelled the job before checking for
the files?

Best, Fabian

Am Mo., 28. Okt. 2019 um 08:03 Uhr schrieb Nguyen, Michael <
michael.nguye...@t-mobile.com>:

> Hi Fabian,
>
>
>
> Thank you for the response. So I am currently using .writeAsText() to
> print out 9 different datastreams in one Flink job as I am printing my
> original datastream with various filters applied to it. I usually see
> around 6-7 of my datastreams successfully list the JSON file in my S3
> bucket upon cancelling my Flink job.
>
>
>
> Even in my situation, would this still be an issue with S3’s file listing
> command?
>
>
>
> Thanks,
>
> Michael
>
>
>
> *From: *Fabian Hueske 
> *Date: *Friday, October 25, 2019 at 6:04 AM
> *To: *Michael Nguyen 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Issue with writeAsText() to S3 bucket
>
>
>
> *[External]*
>
>
>
> Hi Michael,
>
>
>
> One reason might be that S3's file listing command is only eventually
> consistent.
>
> It might take some time until the file appears and is listed.
>
>
>
> Best, Fabian
>
>
>
> Am Mi., 23. Okt. 2019 um 22:41 Uhr schrieb Nguyen, Michael <
> michael.nguye...@t-mobile.com>:
>
> Hello all,
>
>
>
> I am running into issues at the moment trying to print my DataStreams to
> an S3 bucket using writeAsText(“s3://bucket/result.json”) in my Flink job.
> I used print() on the same DataStream and I see the output I am looking for
> in standard output. I first confirm that my datastream has data by looking
> at the standard output, then I cancel my Flink job. After cancelling the
> job, result.json only gets created in my S3 bucket some of the time. It
> does not always gets created, but I confirmed that I see my data in
> standard output.
>
>
>
> I understand writeAsText() should be used for debugging purposes only
> according to Flink’s documentation, but I’m just curious as to why I can’t
> get writeAsText() to always work every time I cancel my job.
>
>
>
> Thank you for your help,
>
> Michael
>
>


Re: Are Dynamic tables backed by rocksdb?

2019-10-31 Thread Fabian Hueske
Hi,

Dynamic tables might not be persisted at all but only when it is necessary
for the computation of a query.
For example a simple "SELECT * FROM t WHERE a = 1" query on an append only
table t does not require to persist t.

However, there are a bunch of operations that require to store some parts
of a dynamic table but not necessarily the full table.
All operators that need to store data put it into regular Flink state. This
means that all state is stored in the configured state backend.
You cannot directly query the data from there, but could use the state
processor API. However, this also assumes that you know the internal data
representation that the operators use.

Best, Fabian


Am Do., 31. Okt. 2019 um 09:51 Uhr schrieb kant kodali :

> Hi All,
>
> Are Dynamic tables backed by Rocksdb or in memory? if they are backed by
> RocksDB can I use SQL to query the state?
>
> Thanks!
>


Re: Guarantee of event-time order in FlinkKafkaConsumer

2019-10-25 Thread Fabian Hueske
Hi Wojciech,

I posted an answer on StackOverflow.

Best, Fabian

Am Do., 24. Okt. 2019 um 13:03 Uhr schrieb Wojciech Indyk <
wojciechin...@gmail.com>:

> Hi!
> I use Flink 1.8.0 with Kafka 2.2.1. I need to guarantee of correct order
> of events by event timestamp. I generate periodic watermarks every 1s. I
> use FlinkKafkaConsumer with AscendingTimestampExtractor.
> The code (and the same question) is here:
> https://stackoverflow.com/questions/58539379/guarantee-of-event-time-order-in-flinkkafkaconsumer
>
> I realized, that for unordered events, that came in the same ms or a few
> ms later, the order is not corrected by Flink. What I found in the docs:
> "the watermark triggers computation of all windows where the maximum
> timestamp (which is end-timestamp - 1) is smaller than the new watermark",
> so I added a step of timeWindowAll with size of 100ms and inside that
> window I sort messages by the event timestamp. It works, but I find this
> solution ugly and it looks like a workaround. I am also concerned about
> per-partition watermarks of KafkaSource.
>
> Ideally I would like to put the guarantee of order in the KafkaSource and
> keep it for each kafka partition, like per-partition watermarks. Is it
> possible to do so? What is the current best solution for guarantee the
> event-time order of events in Flink?
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>


Re: Flink 1.5+ performance in a Java standalone environment

2019-10-25 Thread Fabian Hueske
Hi Jakub,

I had a look at the changes of Flink 1.5 [1] and didn't find anything
obvious.
Something that might cause a different behavior is the new deployment and
process model (FLIP-6).

In Flink 1.5, there is a switch to disable it and use the previous
deployment mechanism.
You could try to disable the new new model [2] and see if this cause the
performance issue.

Note that the legacy mode was removed in one of the later versions.

Best, Fabian

[1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment

Am Do., 24. Okt. 2019 um 19:37 Uhr schrieb Jakub Danilewicz <
jdanilew...@alto-analytics.com>:

> Hi,
>
> I have recently tried to upgrade Flink from 1.2.0 to the newest version
> and noticed that starting from the version 1.5 the performance is much
> worse when processing fixed graphs in a standalone JVM environment (Java
> 8).
>
> This affects all the use-cases when a Gelly graph (pre-built from a fixed
> collection of nodes/edges) gets processed by any of our custom algorithms
> (VertexCentric, ScatterGather or GSA), especially when using parallel
> processing for a local ExecutionEnvironment. The processing times
> (compared to the versions <= 1.4.2) double/triple, while CPU and memory
> consumption increase significantly.
>
> Are there any fine-tuning steps/tricks for the job processing engine
> behind Flink 1.5+ that would improve the performance in the scenarios
> described above?
>
> Best,
>
> Jakub
>


Re: Using STSAssumeRoleSessionCredentialsProvider for cross account access

2019-10-25 Thread Fabian Hueske
Hi Vinay,

Maybe Gordon (in CC) has an idea about this issue.

Best, Fabian

Am Do., 24. Okt. 2019 um 14:50 Uhr schrieb Vinay Patil <
vinay18.pa...@gmail.com>:

> Hi,
>
> Can someone pls help here , facing issues in Prod . I see the following
> ticket in unresolved state.
>
> https://issues.apache.org/jira/browse/FLINK-8417
>
>
> Regards,
> Vinay Patil
>
>
> On Thu, Oct 24, 2019 at 11:01 AM Vinay Patil 
> wrote:
>
>> Hi,
>>
>> I am trying to access dynamo streams from a different aws account but
>> getting resource not found exception while trying to access the dynamo
>> streams from Task Manager. I have provided the following configurations :
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_CREDENTIALS_PROVIDER,AWSConfigConstants.CredentialProvider.ASSUME_ROLE.name
>> ());*
>>
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_ARN,dynamoDbConnect.getRoleArn());*
>>
>>
>> *dynamodbStreamsConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ROLE_SESSION_NAME,dynamoDbConnect.getRoleSessionName());*
>>
>> In the main class I am able to get the arn of dynamoDb table
>> using STSAssumeRoleSessionCredentialsProvider, so the assume role is
>> working fine . Getting error only while accessing from TM.
>>
>> I assume that the credentials are not required to be passed :
>> https://github.com/apache/flink/blob/abbd6b02d743486f3c0c1336139dd6b3edd20840/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L164
>>
>>
>> Regards,
>> Vinay Patil
>>
>


Re: Can a Flink query outputs nested json?

2019-10-25 Thread Fabian Hueske
Hi,

I did not understand what you are trying to achieve.
Which field of the input table do you want to write to the output table?

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:[nested: Row]

This does not work, because the nested schema of the query result and sink
are not identical.
The table sink has only one nested String field called inner. The query
result looks like this: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>

You need to make sure that the schema of query result and sink table are
exactly the same. Otherwise, Flink will throw those ValidationExceptions.

Best, Fabian

Am Do., 24. Okt. 2019 um 12:24 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> I'm working on Flink SQL client. Input data is json format and contains
> nested json.
>
> I'm trying to query the nested json from the table and expecting the
> output to be nested json instead of string.
>
> I've build the environment file to define a table schema as:
>
>> format:
>>
>   type: json
>>   fail-on-missing-field: false
>>   json-schema: >
>> {
>>   type: 'object',
>>   properties: {
>> 'lon': {
>>   type: 'string'
>> },
>> 'rideTime': {
>>   type: 'string'
>> },
>> 'nested': {
>>   type: 'object',
>>   properties: {
>> 'inner': {
>>   type: 'string'
>> },
>> 'nested1': {
>>   type: 'object',
>>   properties: {
>> 'inner1': {
>>   type: 'string'
>> }
>>   }
>> }
>>   }
>> },
>> 'name': {
>>   type: 'string'
>> }
>>   }
>> }
>>   derive-schema: false
>> schema:
>>   - name: 'lon'
>> type: VARCHAR
>>   - name: 'rideTime'
>> type: VARCHAR
>>   - name: 'nested'
>> type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
>>   - name: 'name'
>> type: VARCHAR
>
>
> Sink table schema:
>
>> format:
>>   type: json
>>   fail-on-missing-field: false
>>   derive-schema: true
>> schema:
>>   - name: 'nested'
>> type: ROW<`inner` STRING>
>>
>
> Queries Been trying the following queries
> Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner`
> from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink [nestedSink] do not match.
> Query result schema: [nested.inner: String]
> TableSink schema:[nested: Row]
>
> Flink SQL> insert into nestedSink select nested from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Field types of query
> result and registered TableSink [nestedSink] do not match.
> Query result schema: [nested: Row]
> TableSink schema:[nested: Row]
>
> Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner`
> from nestedJsonStream;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line
> 1, column 55.
> Was expecting one of:
> 
> "EXCEPT" ...
> "FETCH" ...
> "FROM" ...
> "INTERSECT" ...
> "LIMIT" ...
> "OFFSET" ...
> "ORDER" ...
> "MINUS" ...
> "UNION" ...
> "," ...
>
> Help me understand the problem with my schema/query?
> Also would like to add new columns and nested colums.
>
> Thanks
> Srikanth
>
>
>
>


Re: JDBCInputFormat does not support json type

2019-10-25 Thread Fabian Hueske
Hi Fanbin,

One approach would be to ingest the field as a VARCHAR / String and
implement a Scalar UDF to convert it into a nested tuple.
The UDF could use the code of the flink-json module.

AFAIK, there is some work on the way to add built-in JSON functions.

Best, Fabian

Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu :

> Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
>
> case VARIANT:
>   colType = Types.VARCHAR;
>   extColTypeName = "VARIANT";
>   break;
>
> and SnowflakeResultSet just return the string of the field
>
> switch(type)
> {
>   case Types.VARCHAR:
>   case Types.CHAR:
> return getString(columnIndex);
>
> What would be the best way to handle this on Flink side?
>
>
>
> On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu  wrote:
>
>> Hi there,
>>
>> Flink Version: 1.8.1
>> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>>
>> Here is the code snippet:
>>
>> val rowTypeInfo = new RowTypeInfo(
>>   Array[TypeInformation[_]](
>> new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO),
>>   Array[String]("a", "b")
>> )
>>   ),
>>   Array[String]("fieldName")
>> )
>> val inputFormat = buildInputFormat(query, rowTypeInfo)
>> env.createInput(inputFormat)
>>
>> my snowflake table data looks like this (fieldName has type VARIANT)
>>
>> fieldName
>> --
>> {
>> "a": "1",
>> "b": "2"
>> }
>>
>> I got err msg:
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to org.apache.flink.types.Row
>>
>>
>> Looks like the record I got from snowflake is a string. The error
>> prevents me from doing something like
>> sqlQuery("select fieldName.a from table")
>>
>> Any help is appreciated!
>>
>> Thanks,
>> Fanbin
>>
>


Re: Flink 1.9 measuring time taken by each operator in DataStream API

2019-10-25 Thread Fabian Hueske
Hi Komal,

Measuring latency is always a challenge. The problem here is that your
functions are chained, meaning that the result of a function is directly
passed on to the next function and only when the last function emits the
result, the first function is called with a new record.
This makes measuring the processing time of a single function difficult,
because measuring the processing time of a single record is often not
possible because the time for processing a single record is usually too
small to be measured.

Regarding the second point:
Flink does not synchronize the collection of metrics because this would add
too much overhead. In the given screenshot, some records might be still on
the wire or waiting in buffers to be processed.

Best, Fabian

Am Do., 24. Okt. 2019 um 09:16 Uhr schrieb Komal Mariam <
komal.mar...@gmail.com>:

> Hello,
>
> I have a few questions regarding flink’s dashboard and monitoring tools.
>
> I have a fixed number of records that I process through the datastreaming
> API on my standalone cluster and want to know how long it takes to process
> them. My questions are:
>
> 1)How can I see the time taken in milliseconds individually for each
> operator (filter, filter, map and keyed aggregate) to process these records
> during run time? I essentially want to know which operator causes the most
> latency in the pipeline.
>
> 2)While viewing the records and metrics on the dashboard there is a
> discrepancy between the number of records sent and received between two
> operators in my job graph. Why exactly are the number of records received
> by my second operator less than those sent by customsource->map and where
> are they stored? Image attached below for reference.
> [image: image.png]
> Best regards,
> Komal
>
>
>


Re: Issue with writeAsText() to S3 bucket

2019-10-25 Thread Fabian Hueske
Hi Michael,

One reason might be that S3's file listing command is only eventually
consistent.
It might take some time until the file appears and is listed.

Best, Fabian

Am Mi., 23. Okt. 2019 um 22:41 Uhr schrieb Nguyen, Michael <
michael.nguye...@t-mobile.com>:

> Hello all,
>
>
>
> I am running into issues at the moment trying to print my DataStreams to
> an S3 bucket using writeAsText(“s3://bucket/result.json”) in my Flink job.
> I used print() on the same DataStream and I see the output I am looking for
> in standard output. I first confirm that my datastream has data by looking
> at the standard output, then I cancel my Flink job. After cancelling the
> job, result.json only gets created in my S3 bucket some of the time. It
> does not always gets created, but I confirmed that I see my data in
> standard output.
>
>
>
> I understand writeAsText() should be used for debugging purposes only
> according to Flink’s documentation, but I’m just curious as to why I can’t
> get writeAsText() to always work every time I cancel my job.
>
>
>
> Thank you for your help,
>
> Michael
>


Re: [Problem] Unable to do join on TumblingEventTimeWindows using SQL

2019-10-25 Thread Fabian Hueske
Hi,

the exception says: "Rowtime attributes must not be in the input rows of a
regular join. As a workaround you can cast the time attributes of input
tables to TIMESTAMP before.".

The problem is that your query first joins the two tables without a
temporal condition and then wants to do a windowed grouping.
Joins without temporal condition are not able to preserve the rowtime
attribute.
You should try to change the join into a time-windowed join [1] [2] by
adding a BETWEEN predicate on the rowtime attributes of both tables.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins

Am Mi., 23. Okt. 2019 um 09:18 Uhr schrieb Manoj Kumar :

>
> *Hi All,*
>
> *[Explanation]*
>
> Two tables say lineitem and orders:
>
> Table
> orderstbl=bsTableEnv.fromDataStream(orders,"a,b,c,d,e,f,g,h,i,orders.rowtime");
> Table
> lineitemtbl=bsTableEnv.fromDataStream(lineitem,"a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,lineitem.rowtime");
>
> bsTableEnv.registerTable("Orders",orderstbl);
> bsTableEnv.registerTable("Lineitem",lineitemtbl)
>
> *#Rgular tumble window works*
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Orders.a) FROM Orders
> GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>  Table sqlResult = bsTableEnv.sqlQuery("Select count(Lineitem.a) FROM
> Lineitem GROUP BY TUMBLE(lineitem, INTERVAL '5' SECOND)");
>
> *#Datastream TumblingEventTimeWindows joins also works fine*
>
> lineitem.join(orders).where(...).equalTo(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(...)
>
> *But when I try to join them over same window it gives me error, it might
> possible I am writing wrong SQL :(*
>
> Table sqlResult  = bsTableEnv.sqlQuery("SELECTcount(Lineitem.a) FROM "
> + "Orders,Lineitem where Lineitem.a=Orders.a "
> + "GROUP BY TUMBLE(orders, INTERVAL '5' SECOND)");
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:
>
> FlinkLogicalSink(name=[sink], fields=[b])
> +- FlinkLogicalWindowAggregate(group=[{}], EXPR$0=[COUNT($1)],
> window=[TumblingGroupWindow], properties=[])
>+- FlinkLogicalCalc(select=[orders, a0])
>   +- FlinkLogicalJoin(condition=[=($2, $0)], joinType=[inner])
>  :- FlinkLogicalCalc(select=[a, orders])
>  :  +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_3]])
>  +- FlinkLogicalCalc(select=[a])
> +-
> FlinkLogicalDataStreamTableScan(table=[[Unregistered_DataStream_6]])
>
> Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:147)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:327)
> at
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
> at bzflink.StreamingTable.main(StreamingTable.java:65)
> Caused by: 

Re: Increasing number of task slots in the task manager

2019-10-02 Thread Fabian Hueske
Hi Vishwas,

First of all, 8 GB for 60 cores is not a lot.
You might not be able to utilize all cores when running Flink.

However, the memory usage depends on several things.
Assuming your are using Flink for stream processing, the type of the state
backend is important. If you use the FSStateBackend, all state is stored on
the JVM heap which is certainly not want you want.
However, also the RocksDBStateBackend which stores data on the hard disk
has memory requirements. IIRC a few hundred MBs per keyed state and task.
So 60 tasks with keyed state might not possible due to the memory
requirements of RocksDB.

In addition to the state backend, you need to configure a sufficient number
of network buffers.
These also depend on the number of slots that the TaskManager process
offers.

Besides the state backend and network buffers, there are some fixed memory
costs per TaskManager process. A slot itself (without any user code
running) does not take much memory.
Note that Flink does not enforce any memory budgets for slots. Every slot
can take as much JVM memory as it wants. However, if there's not enough
memory, the whole TaskManager crashes with an OutOfMemoryError.

This is not a definitive answer to your question, but hopefully helps you
to configure the cluster for your use case.

Best,
Fabian

Am Di., 1. Okt. 2019 um 20:59 Uhr schrieb Vishwas Siravara <
vsirav...@gmail.com>:

> Hi guys,
> I get java heap space error when I have 1 GB of TM memory and 4 slots(we
> have 4 cores in our lower environment) per TM , each slot has 1/4GB of
> managed memory.
> From the flink doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
>  I
> see that slots are allocated memory statically . When I change the TM
> memory to 8GB , my job works fine without any heap issues with 4 slots. So
> here each slot gets around 2GB of heap . In another environment we have 60
> cores. Does it make sense for me to have 60 slots in the task manager for
> 8GB of TM heap ? I assume that I will get heap space error since each slot
> will have 8/60 GB of memory . Is my assumption correct ?
>
> Thanks,
> Vishwas
>


Re: Fencing token exceptions from Job Manager High Availability mode

2019-10-02 Thread Fabian Hueske
Hi Bruce,

I haven't seen such an exception yet, but maybe Till (in CC) can help.

Best,
Fabian

Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <
bruce.han...@here.com>:

> Hi all,
>
>
>
> We are running some of our Flink jobs with Job Manager High Availability.
> Occasionally we get a cluster that comes up improperly and doesn’t respond.
> Attempts to submit the job seem to hang and when we hit the /overview REST
> endpoint in the Job Manager we get a 500 error and a fencing token
> exception like this:
>
>
>
> *2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428]
> level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  -
> Implementation error: Unhandled exception.*
>
> *org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
> token not set: Ignoring message LocalFencedMessage(null,
> LocalRpcInvocation(requestResourceOverview(Time))) sent to
> akka.tcp://fl...@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager
> because the fencing token is null.*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)*
>
> *at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)*
>
> *at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)*
>
> *at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)*
>
> *at akka.actor.Actor$class.aroundReceive(Actor.scala:502)*
>
> *at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)*
>
> *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)*
>
> *at akka.actor.ActorCell.invoke(ActorCell.scala:495)*
>
> *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)*
>
> *at akka.dispatch.Mailbox.run(Mailbox.scala:224)*
>
> *at akka.dispatch.Mailbox.exec(Mailbox.scala:234)*
>
> *at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>
> *at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>
> *at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>
> *at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>
>
>
>
>
> We are running Flink 1.7.1 in Kubernetes and run each job in its own
> namespace with a three-node Zookeeper cluster and two Job Managers, plus
> one or more Task Managers. I have been able to replicate the issue, but
> don’t find any difference in the logs between a failing cluster and a good
> one.
>
>
>
> Does anyone here have any ideas as to what’s happening, or what I should
> be looking for?
>
>
>
> -Bruce
>
>
>
>
>
>
>
>
>
> [image: cid:image001.png@01D2B473.0F7F85E0]
>
>
>
> *Bruce Hanson*
>
> *Principal Engineer*
>
> *M: +1 425 681 0422*
>
>
>
> HERE Technologies
>
> 701 Pike Street, Suite 2000
>
> Seattle, WA 98101 USA
>
> *47° 36' 41" N 122° 19' 57" W*
>
>
>
> [image: cid:image002.png@01D2B473.0F7F85E0] [image:
> cid:image003.png@01D2B473.0F7F85E0]    [image:
> cid:image004.png@01D2B473.0F7F85E0] [image:
> cid:image005.png@01D2B473.0F7F85E0]
> [image:
> cid:image006.png@01D2B473.0F7F85E0] 
>
>
>
>
>


Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

2019-10-02 Thread Fabian Hueske
Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the
window operator?

>From your description I assume that the state size does not depend (solely)
on the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be
a bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data
generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <
o.kost...@adbglobal.com>:

> Hi,
>
> I'm no sure what you mean by windowState.clear(). As far as I understand
> you correctly it's a windowState from ProcessWindowFunction Context which
> is KeyedStateStore. KeyedStateStore is managing registered keyed states
> that I don't have, so without a descriptor I can't access any clear()
> method. There is no state that I manage explicitly as you can see here:
> https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java
>
> With best regards
>
> Oliwer
> On 01.10.2019 07:48, Congxian Qiu wrote:
>
> Hi Oliwer,
>
> From the description, Seems the state didn't be cleared, maybe you could
> check how many {{windowState.clear()}} was triggered in
> {{WindowOperator#processElement}}, and try to figure it out why the state
> did not be cleared.
>
> Best,
> Congxian
>
>
> Oliwer Kostera  于2019年9月27日周五 下午4:14写道:
>
>> Hi all,
>>
>>
>> I'm using *ProcessWindowFunction* in a keyed stream with the following
>> definition:
>>
>> final SingleOutputStreamOperator processWindowFunctionStream =
>>  
>> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>> .process(new CustomProcessWindowFunction())
>> .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>> .name("Process window function");
>>
>> My checkpointing configuration is set to use RocksDB state backend with
>> incremental checkpointing and EXACTLY_ONCE mode.
>>
>> In a runtime I noticed that even though data ingestion is static - same
>> keys and frequency of messages the size of the process window operator
>> keeps increasing. I tried to reproduce it with minimal similar setup here:
>>  https://github.com/loliver1234/flink-process-window-function and was
>> successful to do so.
>>
>> Testing conditions:
>>
>>- RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>>- RabbitMQ sink to collect messages
>>- Simple ProcessWindowFunction that only pass messages through
>>- Stream time characteristic set to TimeCharacteristic.ProcessingTime
>>
>> Testing scenario:
>>
>>- Start flink job and check initial state size - State Size: 127 KB
>>- Start sending messages, 1000 same unique keys every 1s (they are
>>not falling into defined time window gap set to 100ms, each message should
>>create new window)
>>- State of the process window operator keeps increasing - after 1mln
>>messages state ended up to be around 2mb
>>- Stop sending messages and wait till rabbit queue is fully consumed
>>and few checkpoints go by
>>- Was expected to see state size to decrease to base value but it
>>stayed at 2mb
>>- Continue to send messages with the same keys and state kept
>>increasing trend.
>>
>> What I checked:
>>
>>- Registration and deregistration of timers set for time windows -
>>each registration matched its deregistration
>>- Checked that in fact there are no window merges
>>- Tried custom Trigger disabling window merges and setting
>>onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>>behavior
>>
>> Tested with:
>>
>>- Local Flink Mini Cluster running from IDE
>>- Flink ha standalone cluster  run in docker
>>
>> On staging environment, we noticed that state for that operator keeps
>> increasing indefinitely, after some months reaching even 1,5gb for 100k
>> unique keys
>>
>> With best regards
>>
>> Oliwer
>> adbglobal.com 
>>
>> *This message (including any attachments) may contain confidential,
>> proprietary, privileged and/or private information. The information is
>> intended for the use of the individual or entity designated above. If you
>> are not the intended recipient of this message, please notify the sender
>> immediately, and delete the message and any attachments. Any disclosure,
>> reproduction, distribution or other use of this message or any attachments
>> by an individual or entity other than the intended recipient is STRICTLY
>> PROHIBITED.*
>>
>> *Please note that ADB protects your privacy. Any personal information we
>> collect from you is used in accordance with our Privacy Policy

Re: Broadcast state

2019-10-02 Thread Fabian Hueske
Hi,

State is always associated with a single task in Flink.
The state of a task cannot be accessed by other tasks of the same operator
or tasks of other operators.
This is true for every type of state, including broadcast state.

Best, Fabian


Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan <
reachnavnee...@gmail.com>:

> Hi,
>
> I can use redis but I’m still having hard time figuring out how I can
> eliminate duplicate data. Today without broadcast state in 1.4 I’m using
> cache to lazy load the data. I thought the broadcast state will be similar
> to that of kafka streams where I have read access to the state across the
> pipeline. That will indeed solve a lot of problems. Is there some way I can
> do the same with flink?
>
> Thanks!
>
> On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu 
> wrote:
>
>> Hi,
>>
>> Could you use some cache system such as HBase or Reids to storage this
>> data, and query from the cache if needed?
>>
>> Best,
>> Congxian
>>
>>
>> Navneeth Krishnan  于2019年10月1日周二 上午10:15写道:
>>
>>> Thanks Oytun. The problem with doing that is the same data will be have
>>> to be stored multiple times wasting memory. In my case there will around
>>> million entries which needs to be used by at least two operators for now.
>>>
>>> Thanks
>>>
>>> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez  wrote:
>>>
 This is how we currently use broadcast state. Our states are re-usable
 (code-wise), every operator that wants to consume basically keeps the same
 descriptor state locally by processBroadcastElement'ing into a local state.

 I am open to suggestions. I see this as a hard drawback of dataflow
 programming or Flink framework?



 ---
 Oytun Tez

 *M O T A W O R D*
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com


 On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez  wrote:

> You can re-use the broadcasted state (along with its descriptor) that
> comes into your KeyedBroadcastProcessFunction, in another operator
> downstream. that's basically duplicating the broadcasted state whichever
> operator you want to use, every time.
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Is it possible to access a broadcast state across the pipeline? For
>> example, say I have a KeyedBroadcastProcessFunction which adds the 
>> incoming
>> data to state and I have downstream operator where I need the same state 
>> as
>> well, would I be able to just read the broadcast state with a readonly
>> view. I know this is possible in kafka streams.
>>
>> Thanks
>>
>


Re: Flink- Heap Space running out

2019-09-26 Thread Fabian Hueske
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal
boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1]
but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table
function.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat :

> You can configure the task manager memory in the config.yaml file.
> What is the current configuration?
>
> On Thu, Sep 26, 2019, 17:14 Nishant Gupta 
> wrote:
>
>>  am running a query to join a stream and a table as below. It is running
>> out of heap space. Even though it has enough heap space in flink cluster
>> (60GB * 3)
>>
>> Is there an eviction strategy needed for this query ?
>>
>> *SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON
>> sourceKafka.CC=DefaulterTable.CC;  *
>>
>> Thanks
>>
>> Nishant
>>
>


Re: Flink job manager doesn't remove stale checkmarks

2019-09-25 Thread Fabian Hueske
Hi,

You enabled incremental checkpoints.
This means that parts of older checkpoints that did not change since the
last checkpoint are not removed because they are still referenced by the
incremental checkpoints.
Flink will automatically remove them once they are not needed anymore.

Are you sure that the size of your application's state is not growing too
large?

Best, Fabian

Am Di., 24. Sept. 2019 um 10:47 Uhr schrieb Clay Teeter <
clay.tee...@maalka.com>:

> Oh geez,  checkmarks  = checkpoints... sorry.
>
> What i mean by stale "checkpoints" are checkpoints that should be reaped
> by: "state.checkpoints.num-retained: 3".
>
> What is happening is that directories:
>   - state.checkpoints.dir: file:///opt/ha/49/checkpoints
>   - high-availability.storageDir: file:///opt/ha/49/ha
> are growing with every checkpoint and i'm running out of disk space.
>
> On Tue, Sep 24, 2019 at 4:55 AM Biao Liu  wrote:
>
>> Hi Clay,
>>
>> Sorry I don't get your point. I'm not sure what the "stale checkmarks"
>> exactly means. The HA storage and checkpoint directory left after shutting
>> down cluster?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 03:12, Clay Teeter  wrote:
>>
>>> I'm trying to get my standalone cluster to remove stale checkmarks.
>>>
>>> The cluster is composed of a single job and task manager backed by
>>> rocksdb with high availability.
>>>
>>> The configuration on both the job and task manager are:
>>>
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///opt/ha/49/checkpoints
>>> state.backend.incremental: true
>>> state.checkpoints.num-retained: 3
>>> jobmanager.heap.size: 1024m
>>> taskmanager.heap.size: 2048m
>>> taskmanager.numberOfTaskSlots: 24
>>> parallelism.default: 1
>>> high-availability.jobmanager.port: 6123
>>> high-availability.zookeeper.path.root: _49
>>> high-availability: zookeeper
>>> high-availability.storageDir: file:///opt/ha/49/ha
>>> high-availability.zookeeper.quorum: **t:2181
>>>
>>> Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via
>>> NFS and are owned by the flink user.  Also, there are no errors that i can
>>> find.
>>>
>>> Does anyone have any ideas that i could try?
>>>
>>>


Re: Joins Usage Clarification

2019-09-25 Thread Fabian Hueske
Hi Nishant,

To answer your questions:
1) yes, the SQL time-windowed join and the DataStream API Interval Join are
the same (with different implementations though)
2) DataStream Session-window joins are not directly supported in SQL. You
can play some tricks to make it work, but it wouldn't be elegant and to be
honest, the semantics of the session-window join are not really meaningful,
IMO.

Fabian

Am Mi., 25. Sept. 2019 um 11:26 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Team,
>
> There are 3 types of window join (Tumbling, Session, and Sliding) and 1
> interval Join as mentioned in (For Table API)
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
> 
>
> Plus, there is 1 Time window Join as mentioned in (For SQL)
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins
>
>
> Need some clarifications/answers to below doubts
> 1. Can some one help me understand how Time window join is different from
> Interval Join. Looks like same to me
> 2. How do I implement session window join in Flink SQL? With an example
> would be appreciated.
>
> Thanks
> Nishant
>
>
>
>
>
>
>
>
>


Re: Question about reading ORC file in Flink

2019-09-25 Thread Fabian Hueske
Thank you very much for coming back and reporting the good news! :-)
If you think that there is something that we can do to improve Flink's ORC
input format, for example log a warning, please open a Jira.

Thank you,
Fabian

Am Mi., 25. Sept. 2019 um 05:14 Uhr schrieb 163 :

> Hi Fabian,
>
> After debugging in local mode, I found that Flink orc connector is no
> problem, but some fields in our schema is in capital form,so these fields
> can not be matched.
> But the program directly read orc file using includeColumns method, which
> will use equalsIgnoreCase to match the column, so it can read the fields.
>
> Thanks for your Help!
>
> Qi Shu
>
>
> 在 2019年9月24日,下午4:36,Fabian Hueske  写道:
>
> Hi QiShu,
>
> It might be that Flink's OrcInputFormat has a bug.
> Can you open a Jira issue to report the problem?
> In order to be able to fix this, we need as much information as possible.
> It would be great if you could create a minimal example of an ORC file and
> a program that reproduces the issue.
> If that's not possible, we need the schema of an Orc file that cannot be
> correctly read.
>
> Thanks,
> Fabian
>
> Am Mo., 23. Sept. 2019 um 11:40 Uhr schrieb ShuQi :
>
>> Hi Guys,
>>
>> The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS
>> and the job is executed successfully, no any exception or error. But some
>> fields(such as tagIndustry) are always null, actually these fields are not
>> null. I can read these fields by direct reading it. Below is my code:
>>
>> //main
>>  final ParameterTool params = ParameterTool.fromArgs(args);
>>
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>
>> env.getConfig().setGlobalJobParameters(params);
>>
>> Configuration config = new Configuration();
>>
>>
>> OrcTableSource orcTableSource = OrcTableSource
>> .builder()
>> .path(params.get("input"))
>> .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
>> .withConfiguration(config)
>> .build();
>>
>> DataSet dataSet = orcTableSource.getDataSet(env);
>>
>> DataSet> counts = dataSet.flatMap(new
>> Tokenizer()).groupBy(0).sum(1);
>>
>> //read field
>> public void flatMap(Row row, Collector> out) {
>>
>> String content = ((String) row.getField(6));
>> String tagIndustry = ((String) row.getField(35));
>>
>> LOGGER.info("arity: " + row.getArity());
>> LOGGER.info("content: " + content);
>> LOGGER.info("tagIndustry: " + tagIndustry);
>> LOGGER.info("===");
>>
>> if (Strings.isNullOrEmpty(content) ||
>> Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
>> return;
>> }
>> // normalize and split the line
>> String[] tokens = content.toLowerCase().split("\\W+");
>>
>> // emit the pairs
>> for (String token : tokens) {
>> if (token.length() > 0) {
>> out.collect(new Tuple2<>(token, 1));
>> }
>> }
>> }
>>
>> Thanks for your help!
>>
>> QiShu
>>
>>
>>
>>
>>
>
>


Re: How do I create a temporal function using Flink Clinet SQL?

2019-09-24 Thread Fabian Hueske
Hi,

It's not possible to create a temporal table function from SQL, but you can
define it in the config.yaml of the SQL client as described in the
documentation [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html#temporal-tables

Am Di., 24. Sept. 2019 um 16:49 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> Hi,
>
> I'm running time based joins, dynamic table over temporal function.
> Is there a way I could create temporal table using flink SQL. And I'm
> using v1.9.
>
> Thanks
> Srikanth
>


Re: Approach to match join streams to create unique streams.

2019-09-24 Thread Fabian Hueske
Hi,

AFAIK, Flink SQL Temporal table function joins are only supported as inner
equality joins.
An extension to left outer joins would be great, but is not on the
immediate roadmap AFAIK.

If you need the inverse, I'd recommend to implement the logic in a
DataStream program with a KeyedCoProcessFunction.

Best, Fabian

Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

>  Hi there,
>
> I've two streams source Kafka. Stream1 is a continuous data and stream2 is
> a periodic update. Stream2 contains only one column.
>
> *Use case*: Every entry from stream1 should verify if the stream2 has any
> match.
> The matched and unmatched records should be separated into new unique
> streams. For example: column1, column10 from stream1 match/unmatch check on
> stream2 column to put to a new stream safeStream and unSafeStream
> respectively.
>
> *Implemented solution*: stream2 as temporal function to join over stream1
> which is a dynamic table.
>
>- Ran a time based query where stream1.column1 = stream2.column and
>stream1.column10 = stream2.column ; Working
>
>
>- Ran a time based query where stream1.column1 <> stream1.column and
>tream1.column10 <> stream1.column ; Not working.
>
> Would like to ask if there's a possibility that I could load the stream as
> a list so I could do a *contains*? OR any other approach?
>
> Help appreciated.
>
> Thanks
> Srikanth
>
>


Re: Question about reading ORC file in Flink

2019-09-24 Thread Fabian Hueske
Hi QiShu,

It might be that Flink's OrcInputFormat has a bug.
Can you open a Jira issue to report the problem?
In order to be able to fix this, we need as much information as possible.
It would be great if you could create a minimal example of an ORC file and
a program that reproduces the issue.
If that's not possible, we need the schema of an Orc file that cannot be
correctly read.

Thanks,
Fabian

Am Mo., 23. Sept. 2019 um 11:40 Uhr schrieb ShuQi :

> Hi Guys,
>
> The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS
> and the job is executed successfully, no any exception or error. But some
> fields(such as tagIndustry) are always null, actually these fields are not
> null. I can read these fields by direct reading it. Below is my code:
>
> //main
>  final ParameterTool params = ParameterTool.fromArgs(args);
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setGlobalJobParameters(params);
>
> Configuration config = new Configuration();
>
>
> OrcTableSource orcTableSource = OrcTableSource
> .builder()
> .path(params.get("input"))
> .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
> .withConfiguration(config)
> .build();
>
> DataSet dataSet = orcTableSource.getDataSet(env);
>
> DataSet> counts = dataSet.flatMap(new
> Tokenizer()).groupBy(0).sum(1);
>
> //read field
> public void flatMap(Row row, Collector> out) {
>
> String content = ((String) row.getField(6));
> String tagIndustry = ((String) row.getField(35));
>
> LOGGER.info("arity: " + row.getArity());
> LOGGER.info("content: " + content);
> LOGGER.info("tagIndustry: " + tagIndustry);
> LOGGER.info("===");
>
> if (Strings.isNullOrEmpty(content) ||
> Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
> return;
> }
> // normalize and split the line
> String[] tokens = content.toLowerCase().split("\\W+");
>
> // emit the pairs
> for (String token : tokens) {
> if (token.length() > 0) {
> out.collect(new Tuple2<>(token, 1));
> }
> }
> }
>
> Thanks for your help!
>
> QiShu
>
>
>
>
>


Re: How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-24 Thread Fabian Hueske
Hi,

To expand on Dian's answer.
You should not add Flink's core libraries (APIs, core, runtime, etc.) to
your fat JAR. However, connector dependencies (like Kafka, Cassandra, etc.)
should be added.

If all your jobs require the same dependencies, you can also add JAR files
to the ./lib folder of your job and taskmanager.
All JARs in the ./lib folder will be loaded into the system class loader.
Note, that this means that they are shared across all jobs running on a
taskmanager which reduces the isolation of jobs.

Best, Fabian

Am Mo., 23. Sept. 2019 um 11:37 Uhr schrieb Dian Fu :

> Hi Qi Kang,
>
> You don't need and also should not package the dependencies of Flink to
> the job jar. Only application specific dependencies are enough.
>
> Regards,
> Dian
>
> > 在 2019年9月23日,下午5:17,Qi Kang  写道:
> >
> > Hi,
> >
> > According to the documentation of Flink, it seems that fat JAR is
> recommended when submitting a Flink job. However, the Flink dependencies
> (as well as other dependencies like Hadoop) are too big in size, thus
> producing a fat JAR which exceeds 100MB. Is there some way to separate the
> 'common' dependencies and app-specific dependencies and submit a thinner
> JAR? Many thanks.
> >
> >
> >
>
>


Re: Time Window Flink SQL join

2019-09-20 Thread Fabian Hueske
Hi,

This looks OK on the first sight.
Is it doing what you expect?

Fabian

Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for the information.
> I have been reading about it and doing the same as a part of flink job
> written in Java
>
> I am using proctime for both the tables. Can you please verify once the
> implementation of temporal tables
>
>
> here is the snippet.
> 
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> ParameterTool params = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> Properties kafkaConsumerProperties = new Properties();
>
> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "cg54");
> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "latest");
>
>
> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> DataStream badipStream = env.addSource(new
> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
> kafkaConsumerProperties));
>
> DataStream badipStreamM =
> badipStream
> .map(new MapFunction() {
>private static final long serialVersionUID = -686775202L;
>@Override
>public String map(String value) throws Exception {
> try {
> String[] v = value.split("\\t");
> if(v.length > 1) {
> return v[0].toString();
> } else
> return "0.0.0.0";
> } catch (Exception e) {
> System.err.println(e);
> return "0.0.0.0";
> }
>}
>});
>
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
> r_proctime.proctime");*
>
> tableEnv.registerTable("BadIP", badipTable);
> TemporalTableFunction badIPTT =
> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
> tableEnv.registerFunction("BadIPTT", badIPTT);
>
>
>
> DataStream inKafkaStream = env
> .addSource(new FlinkKafkaConsumer<>("tests", new
> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
> DataStream> inKafkaStreamM =
> inKafkaStream
> .rebalance()
> .filter(value -> value != null)
> .map(new MapFunction>() {
>private static final long serialVersionUID = -6867120202L;
>@Override
>public Tuple2 map(ObjectNode node) throws Exception
> {
> try {
> ObjectNode nodeValue = (ObjectNode) node.get("value");
> return new Tuple2<>(nodeValue.get("source.ip").asText(),
> nodeValue.get("destination.ip").asText());
> } catch (Exception e) {
> System.err.println(e);
> System.out.println(node);
> return null;
> }
>}
>});
>
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
> destinationIp, k_proctime.proctime"*);
> tableEnv.registerTable("KafkaSource", kafkaSource);
> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>
> TupleTypeInfo> tupleType = new TupleTypeInfo<>(
>  Types.STRING(),
>  Types.STRING());
>
> DataStream> outStreamMalicious =
> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>
> Properties kafkaProducerProperties = new Properties();
>
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> ObjectMapper mapper = new ObjectMapper();
> DataStream sinkStreamMaliciousData = outStreamMalicious
> .map(new MapFunction,String>() {
> private static final long serialVersionUID = -6347120202L;
> @Override
> public String map(Tuple2 tuple) throws Exception {
> try {
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", tuple.f0);
> node.put("destination.ip", tuple.f1

Re: Best way to compute the difference between 2 datasets

2019-09-20 Thread Fabian Hueske
Btw. there is a set difference or minus operator in the Table API [1] that
might be helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html#set-operations

Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske :

> Hi Juan,
>
> Both, the local execution environment and the remote execution environment
> run the same code to execute the program.
> The implementation of the sortPartition operator was designed to scale to
> data sizes that exceed the memory.
> Internally, it serializes all records into byte arrays and sorts the
> serialized data. This is of course more expensive than keeping all objects
> on the heap and sorting them there.
> Hence, a certain performance difference is to be expected. However,
> something that should not happen is that the program fails.
>
> What's the magnitude of the performance difference?
> Can you post a stack trace of the error?
>
> Thanks,
> Fabian
>
> Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi Ken,
>>
>> Thanks for the suggestion, that idea should also work for implementing a
>> data set difference operation, which is what concerns me here. However, I
>> was also curious about why there is so much performance difference between
>> using sortPartition and sorting in memory by partition, for datasets as
>> small as 20 elements and running in local mode. For that data set sizes I
>> would expect no relevant performance difference, but with sortPartition the
>> program crashes, so I must be doing something wrong here.
>>
>> Thanks in any case for the idea.
>>
>> Greetings,
>>
>> Juan
>>
>> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler 
>> wrote:
>>
>>> Hi Juan,
>>>
>>> If you want to deduplicate, then you could group by the record, and use
>>> a (very simple) reduce function to only emit a record if the group contains
>>> one element.
>>>
>>> There will be performance issues, though - Flink will have to generate
>>> all groups first, which typically means spilling to disk if the data set
>>> has any significant size.
>>>
>>> — Ken
>>>
>>> PS - I assume that you’ve implemented a valid hashCode()/equals() for
>>> the record.
>>>
>>>
>>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I've been trying to write a function to compute the difference between 2
>>> datasets. With that I mean computing a dataset that has all the elements of
>>> a dataset that are not present in another dataset. I first tried using
>>> coCogroup, but it was very slow in a local execution environment, and often
>>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>>> results. I then tried the following:
>>>
>>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>
>>>   val all = selfMarked.union(otherMarked)
>>> .partitionByHash(0) // so occurrences of the same value in both 
>>> datasets go to the same partition
>>> .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>>> Collector[T]) =>
>>> var latestOtherOpt: Option[T] = None
>>> partitionIter.foreach {
>>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>   case (selfElem, true) =>
>>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>> }
>>>   }
>>> }
>>>
>>>
>>> This is basically the idea of removing duplicates in a collection by
>>> first sorting it, and then traversing it from beginning to end, removing
>>> the elements that are consecutive to an element we just saw. That is
>>> extended here to mark whether an element is coming from `self` or from
>>> `other`, keeping only elements from `self` that are not following another
>>> occurrence of the same element in `other`. That code is also really slow on
>>> a local execution environment, and crashes a lot. But when I replace
>>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>>> it works ok with the local execution environment.
>>>
>>&

Re: changing flink/kafka configs for stateful flink streaming applications

2019-09-20 Thread Fabian Hueske
Hi,

It depends.

There are many things that can be changed. A savepoint in Flink contains
only the state of the application and not the configuration of the system.
So an application can be migrated to another cluster that runs with a
different configuration.
There are some exceptions like the configuration of the default state
backend (in case it is not configured in the application itself) and the
checkpoint techniques.

If it is about the configuration of the application itself (and not the
system), you can do a lot of things in Flink.
You can even implement the application in a way that it reconfigures itself
while it is running.

Since the last release (Flink 1.9), Flink features the Savepoint Processor
API which allows to create or modify savepoints with a batch program.
This can be used to adjust or bootstrap savepoints.

Best, Fabian


Am Mi., 18. Sept. 2019 um 18:56 Uhr schrieb Abrar Sheikh <
abrar200...@gmail.com>:

> Hey all,
>
> One of the known things with Spark Stateful Streaming application is that
> we cannot alter Spark Configurations or Kafka Configurations after the
> first run of the stateful streaming application, this has been explained
> well in
> https://www.linkedin.com/pulse/upgrading-running-spark-streaming-application-code-changes-prakash/
>
> Is this also something Stateful Flink Application share in common with
> Spark?
>
> Thanks,
>
> --
> Abrar Sheikh
>


Re: Window metadata removal

2019-09-20 Thread Fabian Hueske
Hi,

Oh, now I understand your problem.
I dont' think that Flink is able to remove the metadata early. The
implementation is designed for the general case which needs to support the
case where the window data is not purged.
Something that might work is to not configure the window operator with
allowed lateness (hence dropping all late records).
Instead you duplicate the stream before the window operator and have
another operator (based on a ProcessFunction) that drops all "in-time" data
and only forwards data that is at most 7 days old.

Alternatively, you can of course also scale out the program to more
machines to add more memory.

Best,
Fabian

Am Mi., 18. Sept. 2019 um 08:39 Uhr schrieb gil bl :

> Hi Fabian,
> Thank you for your reply.
>
> I'm not sure my question was clear enough so I'll try to explain our
> scenario:
>
>1. We are working in “event time” mode.
>2. We want to handle ‘late data’ up to last X days (for example last 7
>days)
>3. For each incoming event:
>   1. The event is being aggregated using window function.
>   2. When the window if “fired”, the accumulated data is forwarded to
>   “sink” function and all data is being purged from the window.
>4. If late data is arriving to the same windows, the same logic (as in
>section 3) is being applied. When a window is fired the data is accumulated
>from scratch, sent to a “sink” and purged from the window.
>5. we are not using the default trigger.
>
> We expect the flow above to result in fragmented data, i.e. several
> outputs with the same  which aggregate different sets of
> events.
>
> We encounter the following problem:
> Since we have a huge number of different , the metadata
> (WindowOperator, InternalTimer) is being kept in memory until the end of
> ‘allowed lateness’ period. This causes our job to run out of memory.
> Here is a calculation of the required memory consumption only for the
> window metadata -
> Metadata size for each  is at least 64 bytes.
> If we have 200,000,000  per day and the allowed lateness is
> set to 7 days:
> 200,000,000 * 64 * 7 = ~83GB
>
> *For the scenario above the window metadata is useless*.
> Is there a possibility to *keep using window API*, *set allowed lateness*
> and *not keep the window metadata* until the end of allowed lateness
> period?
> (maybe as a new feature ?)
>
>
> 05.09.2019, 13:04, "Fabian Hueske" :
>
> Hi,
>
> A window needs to keep the data as long as it expects new data.
> This is clearly the case before the end time of the window was reached. If
> my window ends at 12:30, I want to wait (at least) until 12:30 before I
> remove any data, right?
>
> In case you expect some data to be late, you can configure
> allowedLateness.
> Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
> would keep the metadata of the window that closes at 12:30 until 12:40.
> The data is kept to be able to update the result of the window until
> allowedLateness has passed.
> If we for example receive a late record at 12:38, we can still update the
> result of the window because we kept all required data.
>
> If you don't need allowedLateness, don't configure it (the default is 0).
>
> Best, Fabian
>
> Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl :
>
> Hi,
>
> I'm interested in why metadata like WindowOperator and InternalTimer are
> being kept for windowSize + allowedLateness period per each pane.
>
>- What is the purpose of keeping this data if no new events are
>expected to enter the pane?
>- Is there any way this metadata can be released earlier?
>
>


Re: Add Bucket File System Table Sink

2019-09-20 Thread Fabian Hueske
Hi Jun,

Thank you very much for your contribution.

I think a Bucketing File System Table Sink would be a great addition.

Our code contribution guidelines [1] recommend to discuss the design with
the community before opening a PR.
First of all, this ensures that the design is aligned with Flink's codebase
and the future features.
Moreover, it helps to find a committer who can help to shepherd the PR.

Something that is always a good idea is to split a contribution in multiple
smaller PRs (if possible).
This allows for faster review and progress.

Best, Fabian

[1] https://flink.apache.org/contributing/contribute-code.html

Am Di., 17. Sept. 2019 um 04:39 Uhr schrieb Jun Zhang <825875...@qq.com>:

> Hello everyone:
> I am a user and fan of flink. I also want to join the flink community. I
> contributed my first PR a few days ago. Can anyone help me to review my
> code? If there is something wrong, hope I would be grateful if you can give
> some advice.
>
> This PR is mainly in the process of development, I use sql to read data
> from kafka and then write to hdfs, I found that there is no suitable
> tablesink, I found the document and found that File System Connector is
> only experimental (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#file-system-connector),
> so I wrote a Bucket File System Table Sink that supports writing stream
> data. Hdfs, file file system, data format supports json, csv, parquet,
> avro. Subsequently add other format support, such as protobuf, thrift, etc.
>
> In addition, I also added documentation, python api, units test,
> end-end-test, sql-client, DDL, and compiled on travis.
>
> the issue is https://issues.apache.org/jira/browse/FLINK-12584
> thank you very much
>
>
>


Re: Best way to compute the difference between 2 datasets

2019-09-20 Thread Fabian Hueske
Hi Juan,

Both, the local execution environment and the remote execution environment
run the same code to execute the program.
The implementation of the sortPartition operator was designed to scale to
data sizes that exceed the memory.
Internally, it serializes all records into byte arrays and sorts the
serialized data. This is of course more expensive than keeping all objects
on the heap and sorting them there.
Hence, a certain performance difference is to be expected. However,
something that should not happen is that the program fails.

What's the magnitude of the performance difference?
Can you post a stack trace of the error?

Thanks,
Fabian

Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Ken,
>
> Thanks for the suggestion, that idea should also work for implementing a
> data set difference operation, which is what concerns me here. However, I
> was also curious about why there is so much performance difference between
> using sortPartition and sorting in memory by partition, for datasets as
> small as 20 elements and running in local mode. For that data set sizes I
> would expect no relevant performance difference, but with sortPartition the
> program crashes, so I must be doing something wrong here.
>
> Thanks in any case for the idea.
>
> Greetings,
>
> Juan
>
> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler 
> wrote:
>
>> Hi Juan,
>>
>> If you want to deduplicate, then you could group by the record, and use a
>> (very simple) reduce function to only emit a record if the group contains
>> one element.
>>
>> There will be performance issues, though - Flink will have to generate
>> all groups first, which typically means spilling to disk if the data set
>> has any significant size.
>>
>> — Ken
>>
>> PS - I assume that you’ve implemented a valid hashCode()/equals() for the
>> record.
>>
>>
>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>> Hi,
>>
>> I've been trying to write a function to compute the difference between 2
>> datasets. With that I mean computing a dataset that has all the elements of
>> a dataset that are not present in another dataset. I first tried using
>> coCogroup, but it was very slow in a local execution environment, and often
>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>> results. I then tried the following:
>>
>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>
>>   val all = selfMarked.union(otherMarked)
>> .partitionByHash(0) // so occurrences of the same value in both datasets 
>> go to the same partition
>> .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>> Collector[T]) =>
>> var latestOtherOpt: Option[T] = None
>> partitionIter.foreach {
>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>   case (selfElem, true) =>
>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>> }
>>   }
>> }
>>
>>
>> This is basically the idea of removing duplicates in a collection by
>> first sorting it, and then traversing it from beginning to end, removing
>> the elements that are consecutive to an element we just saw. That is
>> extended here to mark whether an element is coming from `self` or from
>> `other`, keeping only elements from `self` that are not following another
>> occurrence of the same element in `other`. That code is also really slow on
>> a local execution environment, and crashes a lot. But when I replace
>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>> it works ok with the local execution environment.
>>
>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] 
>> = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>   val all = selfMarked.union(otherMarked)
>> .partitionByHash(0) // so occurrences of the same value in both datasets 
>> go to the same partition
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>> Collector[T]) =>
>> val sortedPartition = {
>>   val partition = partitionIter.toArray
>>   util.Sorting.quickSort(partition)
>>   partition
>> }
>> var latestOtherOpt: Option[T] = None
>> sortedPartition.foreach {
>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>   case (selfElem, true) =>
>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>> }
>>   }
>> }
>>
>>
>> I'm surprised by such a big difference. This is my code
>> 

Re: Batch mode with Flink 1.8 unstable?

2019-09-19 Thread Fabian Hueske
Hi Ken,

Changing the parallelism can affect the generation of input splits.
I had a look at BinaryInputFormat, and it adds a bunch of empty input
splits if the number of generated splits is less than the minimum number of
splits (which is equal to the parallelism).

See -->
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java#L133

Maybe these empty splits cause the failure.
IIRC, this was done because there was at some point (like several years
ago...) the requirement that each source task would receive a split.
I don't think this is still true. I'd try to remove these lines and see
what happens.

If that doesn't help, I'd try to add a bunch of log statements in the
InputFormat to identify the point where it fails.

Hope this helps,
Fabian


Am Do., 19. Sept. 2019 um 09:25 Uhr schrieb Till Rohrmann <
trohrm...@apache.org>:

> Good to hear that some of your problems have been solved Ken. For the
> UTFDataFormatException it is hard to tell. Usually it says that the input
> has been produced using `writeUTF`. Cloud you maybe provide an example
> program which reproduces the problem? Moreover, it would be helpful to see
> how the input is generated and what AdText exactly does.
>
> Cheers,
> Till
>
> On Wed, Sep 18, 2019 at 9:17 PM Ken Krugler 
> wrote:
>
>> Hi Till,
>>
>> I tried out 1.9.0 with my workflow, and I no longer am running into the
>> errors I described below, which is great!
>>
>> Just to recap, this is batch, per-job mode on YARN/EMR.
>>
>> Though I did run into a new issue, related to my previous problem when
>> reading files written via SerializedOutputFormat.
>>
>> I would always get errors that look like:
>>
>> 2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading
>> from split #100 of file 's3://path-to-file/19' from 0 (state
>> 28683/156308, block size 67108864)
>> 2019-09-16 20:58:21,397 ERROR
>> org.apache.flink.runtime.operators.BatchTask  - Error in
>> task code:  CHAIN DataSource (at
>> makePreparedDataSet(com.company.MyWorkflow.java:67)
>> (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
>> java.io.UTFDataFormatException: malformed input around byte 51
>> at java.io.DataInputStream.readUTF(DataInputStream.java:656)
>> at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>> at com.company.AdText.read(AdText.java:170)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
>> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
>> at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Which would imply (again) an issue with the read block size not being the
>> same as what was used to write it.
>>
>> But I’d run this same data through a different workflow, without any
>> issues.
>>
>> When I reduced the read parallelism of the failing workflow to match the
>> succeeding workflow (was 12, dropped it to 4), the errors went away.
>>
>> So…don’t know what’s the root issue, but I have a workaround for now.
>>
>> Though it’s a reproducible problem, which I’d like to use to help solve
>> the problem.
>>
>> Any suggestions for how to debug further?
>>
>> Thanks,
>>
>> — Ken
>>
>>
>>
>> On Jul 1, 2019, at 2:57 AM, Till Rohrmann  wrote:
>>
>> Hi Ken,
>>
>> in order to further debug your problems it would be helpful if you could
>> share the log files on DEBUG level with us.
>>
>> For problem (2), I suspect that it has been caused by Flink releasing TMs
>> too early. This should be fixed with FLINK-10941 which is part of Flink
>> 1.8.1. The 1.8.1 release should be released very soonish. It would be great
>> if you could try your program with this version or even the 1.8.1 RC to see
>> whether the problem still occurs. But it could also be caused by using fine
>> grained recovery. So it might be worth a try to disable this feature if you
>> turned it on.
>>
>> Thanks a lot!
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu  wrote:
>>
>>> Hi Ken again,
>>>
>>> In regard to TimeoutException, I just realized that there is no
>>> akka.remote.OversizedPayloadException in your log file. There might be some
>>> other reason caused this.
>>> 1. Have you ever tried increasing the configuration "akka.ask.timeout"?
>>> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need
>>> to enable printing GC log first.
>>>
>>>
>>> Biao Liu  于2019年6月27日周四 上午11:38写道:
>>>
 Hi Ken,

Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi Nishant,

You should model the query as a join with a time-versioned table [1].
The bad-ips table would be the time-time versioned table [2].
Since it is a time-versioned table, it could even be updated with new IPs.

This type of join will only keep the time-versioned table (the bad-ips in
state) and not the other (high-volume) table.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html

Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for your reply
> I have a continuous stream of kafka coming and static table of badips. I
> wanted to segregate records having bad ip.
>
> So therefore i was joining it. But with that 60 gb memory getting run out
>
> So i used below query.
> Can u please guide me in this regard
>
> On Wed, 18 Sep 2019 at 5:53 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> The query that you wrote is not a time-windowed join.
>>
>> INSERT INTO sourceKafkaMalicious
>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>> sourceKafka.`source.ip`=badips.ip
>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
>> '15' MINUTE AND CURRENT_TIMESTAMP;
>>
>> The problem is the use of CURRENT_TIMESTAMP instead of a processing time
>> (or event time) attribute of badips.
>>
>> What exactly are you trying to achieve with the query?
>>
>> Best, Fabian
>>
>> Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
>> nishantgupta1...@gmail.com>:
>>
>>> Hi Team,
>>>
>>> I am running a query for Time Window Join as below
>>>
>>> INSERT INTO sourceKafkaMalicious
>>> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
>>> sourceKafka.`source.ip`=badips.ip
>>> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP -
>>> INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP;
>>>
>>> Time windowed join, Flink SQL should automatically clear older records, Some
>>> how the query does not clear the heapspace and fails with error after
>>> sometime.
>>>
>>> Can you please let me know what could go wrong, or is it a issue
>>>
>>> Environment File chunks
>>>
>>> --
>>> tables:
>>>   - name: sourceKafka
>>> type: source-table
>>> update-mode: append
>>> connector:
>>>   type: kafka
>>>   version: "universal"
>>>   topic: test-data-flatten
>>>   properties:
>>> - key: zookeeper.connect
>>>   value: x.x.x.x:2181
>>> - key: bootstrap.servers
>>>   value: x.x.x.x:9092
>>> - key: group.id
>>>   value: testgroup
>>> format:
>>>   type: json
>>>   fail-on-missing-field: false
>>>   json-schema: >
>>> {
>>>   type: 'object',
>>>   properties: {
>>> 'source.ip': {
>>>type: 'string'
>>> },
>>> 'source.port': {
>>>type: 'string'
>>> }
>>>   }
>>> }
>>>   derive-schema: false
>>> schema:
>>>   - name: ' source.ip '
>>> type: VARCHAR
>>>   - name: 'source.port'
>>> type: VARCHAR
>>>
>>>   - name: sourceKafkaMalicious
>>> type: sink-table
>>> update-mode: append
>>> connector:
>>>   type: kafka
>>>   version: "universal"
>>>   topic: test-data-mal
>>>   properties:
>>> - key: zookeeper.connect
>>>   value: x.x.x.x:2181
>>> - key: bootstrap.servers
>>>   value: x.x.x.x:9092
>>> - key: group.id
>>>   value: testgroupmal
>>> format:
>>>   type: json
>>>   fail-on-missing-field: false
>>>   json-schema: >
>>> {
>>>   type: 'object',
>>>   properties: {
>>> 'source.ip': {
>>>type: 'string'
>>> },
>>

Re: Time Window Flink SQL join

2019-09-18 Thread Fabian Hueske
Hi,

The query that you wrote is not a time-windowed join.

INSERT INTO sourceKafkaMalicious
SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
sourceKafka.`source.ip`=badips.ip
WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
'15' MINUTE AND CURRENT_TIMESTAMP;

The problem is the use of CURRENT_TIMESTAMP instead of a processing time
(or event time) attribute of badips.

What exactly are you trying to achieve with the query?

Best, Fabian

Am Mi., 18. Sept. 2019 um 14:02 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Team,
>
> I am running a query for Time Window Join as below
>
> INSERT INTO sourceKafkaMalicious
> SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON
> sourceKafka.`source.ip`=badips.ip
> WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL
> '15' MINUTE AND CURRENT_TIMESTAMP;
>
> Time windowed join, Flink SQL should automatically clear older records, Some
> how the query does not clear the heapspace and fails with error after
> sometime.
>
> Can you please let me know what could go wrong, or is it a issue
>
> Environment File chunks
>
> --
> tables:
>   - name: sourceKafka
> type: source-table
> update-mode: append
> connector:
>   type: kafka
>   version: "universal"
>   topic: test-data-flatten
>   properties:
> - key: zookeeper.connect
>   value: x.x.x.x:2181
> - key: bootstrap.servers
>   value: x.x.x.x:9092
> - key: group.id
>   value: testgroup
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'source.ip': {
>type: 'string'
> },
> 'source.port': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
> schema:
>   - name: ' source.ip '
> type: VARCHAR
>   - name: 'source.port'
> type: VARCHAR
>
>   - name: sourceKafkaMalicious
> type: sink-table
> update-mode: append
> connector:
>   type: kafka
>   version: "universal"
>   topic: test-data-mal
>   properties:
> - key: zookeeper.connect
>   value: x.x.x.x:2181
> - key: bootstrap.servers
>   value: x.x.x.x:9092
> - key: group.id
>   value: testgroupmal
> format:
>   type: json
>   fail-on-missing-field: false
>   json-schema: >
> {
>   type: 'object',
>   properties: {
> 'source.ip': {
>type: 'string'
> },
> 'source.port': {
>type: 'string'
> }
>   }
> }
>   derive-schema: false
> schema:
>   - name: ' source.ip '
> type: VARCHAR
>   - name: 'source.port'
> type: VARCHAR
>
>   - name: badips
> type: source-table
> #update-mode: append
> connector:
>   type: filesystem
>   path: "/home/cyanadmin/ipsum/levels/badips.csv"
> format:
>   type: csv
>   fields:
> - name: ip
>   type: VARCHAR
>   comment-prefix: "#"
> schema:
>   - name: ip
> type: VARCHAR
>
> execution:
>   planner: blink
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 100
>   parallelism: 3
>   max-parallelism: 128
>   min-idle-state-retention: 0
>   max-idle-state-retention: 0
>   restart-strategy:
> type: fallback
>
> configuration:
>   table.optimizer.join-reorder-enabled: true
>   table.exec.spill-compression.enabled: true
>   table.exec.spill-compression.block-size: 128kb
>  Properties that describe the cluster to which table programs are
> submitted to.
>
> deployment:
>   response-timeout: 5000
>
>
> --
>


Re: Is Idle state retention time in SQL client possible?

2019-09-17 Thread Fabian Hueske
Hi,

This can be set via the environment file.
Please have a look at the documentation [1] (see "execution:
min-idle-state-retention: " and "execution: max-idle-retention: " keys).

Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html#environment-files

Am Di., 17. Sept. 2019 um 11:01 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> Hi there,
>
> I've come across this link
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html
> >
> for Idle state retention time. Would like to know if I could configure this
> for SQL client and how?
>
> Thanks
> Srikanth
>


Re: Compound Keys Using Temporal Tables

2019-09-16 Thread Fabian Hueske
Hi,

No, this is not possible at the moment. You can only pass a single
expression as primary key.
A work around might be to put the two fields in a nested field (haven't
tried if this works) or combine them in a single attribute, for example by
casting them to VARCHAR and concating them.

Best, Fabian

Am Fr., 13. Sept. 2019 um 17:49 Uhr schrieb Yuval Itzchakov <
yuva...@gmail.com>:

> Hi,
>
> Given table X with an event time, A, B and C columns, is there a way to
> pass a compound key, i.e. A and B as the primaryKey argument of
> Table.createTemporalFunction? My attempts so far yield a runtime exception
> where the String doesn't match a given regex.
>


Re: Uncertain result when using group by in stream sql

2019-09-13 Thread Fabian Hueske
Hi,

A GROUP BY query on a streaming table requires that the result is
continuously updated.
Updates are propagated as a retraction stream (see
tEnv.toRetractStream(table, Row.class).print(); in your code).

A retraction stream encodes the type of the update as a boolean flag, the
"true" and "false" values in your result. "true" means the record was added
to the result, "false" means the record is removed from the result.
If you follow the output, it is the same in both cases: (bj, 9).

The different "result paths" result from the parallel (multi-threaded)
processing of the query.
If you set the parallelism to 1 ( env.setParallelism(1);) the "result path"
should be the same every time.

Best, Fabian



Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 :

>   I use flink stream sql to write a demo about "group by".  The
> records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and
> sum the second element.
>   Every time I run the program, the result is different. It seems that
> the records are out of order. Even sometimes record is lost. I am confused
> about that.
>   The code is as below:
>
> public class Test {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.getTableEnvironment(env);
>
>   DataStream> dataStream = env.fromElements(
> Tuple2.of("bj", 1L),
> Tuple2.of("bj", 3L),
> Tuple2.of("bj", 5L));
>   tEnv.registerDataStream("person", dataStream);
>
>   String sql = "select f0, sum(f1) from person group by f0";
>   Table table = tEnv.sqlQuery(sql);
>   tEnv.toRetractStream(table, Row.class).print();
>
>   env.execute();
>}
> }
>
>   The results may be as below:
> 1> (true,bj,1)
> 1> (false,bj,1)
> 1> (true,bj,4)
> 1> (false,bj,4)
> 1> (true,bj,9)
>
> 1> (true,bj,5)
> 1> (false,bj,5)
> 1> (true,bj,8)
> 1> (false,bj,8)
> 1> (true,bj,9)
>


Re: How to handle avro BYTES type in flink

2019-09-13 Thread Fabian Hueske
Thanks for reporting back Catlyn!

Am Do., 12. Sept. 2019 um 19:40 Uhr schrieb Catlyn Kong :

> Turns out there was some other deserialization problem unrelated to this.
>
> On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong  wrote:
>
>> Hi fellow streamers,
>>
>> I'm trying to support avro BYTES type in my flink application. Since
>> ByteBuffer isn't a supported type, I'm converting the field to an
>> Array[Byte]:
>>
>> case Type.BYTES =>
>>   (avroObj: AnyRef) => {
>>  if (avroObj == null) {
>>null
>>  } else {
>>val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>>val bytes = new Array[Byte](byteBuffer.remaining())
>>byteBuffer.get(bytes)
>>bytes
>>}
>>  }
>>
>> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this
>> field.
>> I'm getting ArrayIndexOutOfBoundsException:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
>> at
>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>> at
>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>>
>> Does anyone have experience with deserializing BYTES type from avro and
>> make it compatible with the table api? Wondering if it's cause I didn't use
>> the correct type or maybe I need to verify if there's enough data left in
>> the source?
>>
>> Any input is appreciated.
>>
>> Thanks!
>> Catlyn
>>
>>


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Fabian Hueske
Congrats Zili Chen :-)

Cheers, Fabian

Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu :

> Congrats Zili!
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
>
>> Congratulations!
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
>>
>>> Congratulations!
>>>
>>>
>>> Best,
>>>
>>> Jiayi Liao
>>>
>>>  Original Message
>>> *Sender:* Till Rohrmann
>>> *Recipient:* dev; user
>>> *Date:* Wednesday, Sep 11, 2019 17:22
>>> *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
>>>
>>> Hi everyone,
>>>
>>> I'm very happy to announce that Zili Chen (some of you might also know
>>> him as Tison Kun) accepted the offer of the Flink PMC to become a committer
>>> of the Flink project.
>>>
>>> Zili Chen has been an active community member for almost 16 months now.
>>> He helped pushing the Flip-6 effort over the finish line, ported a lot of
>>> legacy code tests, removed a good part of the legacy code, contributed
>>> numerous fixes, is involved in the Flink's client API refactoring, drives
>>> the refactoring of Flink's HighAvailabilityServices and much more. Zili
>>> Chen also helped the community by PR reviews, reporting Flink issues,
>>> answering user mails and being very active on the dev mailing list.
>>>
>>> Congratulations Zili Chen!
>>>
>>> Best, Till
>>> (on behalf of the Flink PMC)
>>>
>>


Re: Checkpointing is not performing well

2019-09-11 Thread Fabian Hueske
Hi,

There is no upper limit for state size in Flink. There are applications
with 10+ TB state.
However, it is natural that checkpointing time increases with state size as
more data needs to be serialized (in case of FSStateBackend) and written to
stable storage.
(The same is btw true for recovery when the state needs to be loaded back.)

There are a few tricks to reduce checkpointing time like using incremental
checkpoints which you tried already.
You can also scale out the application to use more machines and therefore
bandwidth + CPU (for serialization) during checkpoints.

Fabian

Am Mi., 11. Sept. 2019 um 09:38 Uhr schrieb Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com>:

> What is the upper limit of checkpoint size of Flink System?
>
> Regards,
> Ravi
>
> On Wed 11 Sep, 2019, 06:48 Vijay Bhaskar, 
> wrote:
>
>> You crossed  the upper limits of the check point system of Flink a way
>> high. Try to distribute events equally over time by adding some sort of
>> controlled back pressure after receiving data from kinesis streams.
>> Otherwise the spike coming during 5 seconds time would always create
>> problems. Tomorrow it may double so best solution in your case is to
>> deliver at configurable constant rate after receiving messages from kinesis
>> streams. Otherwise i am sure its always the problem whatever the kind of
>> streaming engine you use. Tune your configuration to get the optimal rate
>> so that flink checkpoint state is healthier.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> @Rohan - I am streaming data to kafka sink after applying business
>>> logic. For checkpoint, I am using s3 as a distributed file system. For
>>> local recovery, I am using Optimized iops ebs volume.
>>>
>>> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB
>>> per minute compressed(lz4) avro message. Generally 90% correlated events
>>> come within 5 seconds and 10% of the correlated events get extended to 65
>>> minute. Due to this business requirement, the state size keep growing till
>>> 65 minutes, after that the state size becomes more or less stable. As the
>>> state size is growing and is around 350gb at peak load, checkpoint is not
>>> able to complete within 1 minutes. I want to check as quick as possible
>>> like every 5 second.
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, 
>>> wrote:
>>>
 For me task count seems to be huge in number with the mentioned
 resource count. To rule out the possibility of issue with state backend can
 you start writing sink data as  , i.e., data ignore sink. And
 try whether you could run it for longer duration without any issue. You can
 start decreasing the task manager count until you find descent count of it
 without having any side effects. Use that value as task manager count and
 then start adding your state backend. First you can try with Rocks DB. With
 reduced task manager count you might get good results.

 Regards
 Bhaskar

 On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
 rohan.thimma...@gmail.com> wrote:

> Ravi, have you looked at the io operation(iops) rate of the disk? You
> can monitoring the iops performance and tune it accordingly with your work
> load. This helped us in our project when we hit the wall tuning prototype
> much all the parameters.
>
> Rohan
>
>
> --
> *From:* Ravi Bhushan Ratnakar 
> *Sent:* Saturday, September 7, 2019 5:38 PM
> *To:* Rafi Aroch
> *Cc:* user
> *Subject:* Re: Checkpointing is not performing well
>
> Hi Rafi,
>
> Thank you for your quick response.
>
> I have tested with rocksdb state backend. Rocksdb required
> significantly more taskmanager to perform as compare to filesystem state
> backend. The problem here is that checkpoint process is not fast enough to
> complete.
>
> Our requirement is to do checkout as soon as possible like in 5
> seconds to flush the output to output sink. As the incoming data rate is
> high, it is not able to complete quickly. If I increase the checkpoint
> duration, the state size grows much faster and hence takes much longer 
> time
> to complete checkpointing. I also tried to use AT LEAST ONCE mode, but 
> does
> not improve much. Adding more taskmanager to increase parallelism also 
> does
> not improve the checkpointing performance.
>
> Is it possible to achieve checkpointing as short as 5 seconds with
> such high input volume?
>
> Regards,
> Ravi
>
> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>
>> Hi Ravi,
>>
>> Consider moving to RocksDB state backend, where you can enable
>> incremental checkpointing. This will make you checkpoints size stay 
>> pretty
>> much 

Re:

2019-09-11 Thread Fabian Hueske
Hi,

This is clearly a Scala version issue.
You need to make sure that all Flink dependencies have the same version and
are compiled for Scala 2.11.
The "_2.11" postfix in the dependency name indicates that it is a Scala
2.11 dependency ("_2.12 indicates Scala 2.12 compatibility).

Best, Fabian

Am Mi., 11. Sept. 2019 um 05:53 Uhr schrieb Ben Yan <
yan.xiao.bin.m...@gmail.com>:

> The following is the environment I use:
> 1. flink.version: 1.9.0
> 2. java version "1.8.0_212"
> 3. scala version: 2.11.12
>
> When I wrote the following code in the scala programming language, I found
> the following error:
>
> // set up the batch execution environment
> val bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
> error: Static methods in interface require -target:jvm-1.8
> [ERROR] val bbTableEnv = TableEnvironment.create(bbSettings)
>
> But when I use the java programming language or the version of scala in 2.12, 
> there is no problem.
>
> If I use the version of scala2.11, is there any way to solve this problem? 
> thanks
>
>
> Best,
>
> Ben
>
>


Re: Filter events based on future events

2019-09-11 Thread Fabian Hueske
Hi Theo,

I would implement this with a KeyedProcessFunction.
These are the important points to consider:

1) partition the output of the Kafka source by Kafka partition (or the
attribute that determines the partition). This will ensure that the data
stay in order (per partition).
2) The KeyedProcessFunction needs state to buffer the data of one minute.
It depends on the amount of data that you expect to buffer which state is
the most efficient. If you expect that one minute can be easily hold in
memory, I'd use a FS state backend which keeps all state on the JVM heap.
You could use a ValueState with an appropriate data structure (Queue,
PrioQueue, ...). The data structure would be held as regular Java object on
the heap and hence provide efficient access. If you expect the one minute
to be too much data to be held in memory, you need to go for the RocksDB
state backend. Since this causes de/serialization with every read and write
access, it's more difficult to identify an efficient state primitive /
access pattern. I won't go into the details here, assuming that the
buffered data fits into memory and you can go for the FS state backend. If
that's not the case, let me know and I can share some tips on the RocksDB
state backend approach. The KeyedProcessFunction would add records to the
buffer state when processElement() is called and emit all buffered records
that have a timestamp of less than the timestamp of the currently added
record - 1 minute.

Note, since the timestamps are monotonically increasing, we do not need
watermarks and event-time but can rely on the timestamps of the records.
Hence, the application won't block if one partition stalls providing the
same benefits that per-key watermarks would offer (if they were supported
by Flink).

Best, Fabian

Am Di., 10. Sept. 2019 um 23:06 Uhr schrieb
theo.diefent...@scoop-software.de :

> Hi there,
>
> I have the following use case:
>
> I get transaction logs from multiple servers. Each server puts its logs
> into its own Kafka partition so that within each partition the elements are
> monothonically ordered by time.
>
> Within the stream of transactions, we have some special events. Let's call
> them A. (roughly 1-10% in distribution have this type).
>
> An A event can have an Anti-A event later on in time. That is an event
> which has all the same attributes (like username, faculty,..) but differs
> in one boolean attribute indicating that it is an anti event. Kind of a
> retraction.
>
> Now I want to emit almost all events downstream (including neither A nor
> Anti-A, let's call them simpy B), preserving the monothonical order of
> events. There is just one special case in which I want to filter out an
> element: If the stream has an A event followed by an Anti-A event within
> one minute time, only the Anti-A event shall go downstream, not A itself.
> But if there is no Anti-A event, A shall be emitted and shall still be
> within timestamp order of events.
>
> I'm wrangling my head around it a lot and don't come up with a proper
> (performant) solution. It seems to be obvious that in the end, I need to
> buffer all records over 1 minute so that order can be preserved. But I have
> no idea how to implement this in Flink efficiently.
>
> My thoughts thus far:
>
> 1. I could give CEP a try. But in that CEP I would need to write something
> like match all B events in any case. And match A also but only if there is
> no anti A => doesn`t that produce a lot of state? And are all B events
> considered in the breadth first rule match approach, I. E. Tons of
> unnecessary comparisons against A? Any pseudo code on how I could do this
> with CEP?
>
> 2. If I key data by partition and all other attributes except for the
> retract boolean so that A and anti A always fall into the same keyed stream
> but no other event in that stream, I probably get much better comparison
> capabilities. But how much overhead do I produce with it? Will Flink
> reshuffle the data even if the first key stays the same? And can I
> backpartiton to my "global" per partition order? Note that some events have
> the exact event time timestamp but I still want to have them in their
> original order later on.
>
> 3. Could I work with session windows somehow? Putting A and Anti A in the
> same session and in window emit I would just not collect the A event if
> there is an Anti A? Would it be more or less overhead compared to CEP?
>
> 4. Do you have any other idea on how to approach this? Sadly, I have no
> way to manipulate the input stream, so that part of the pipeline is fixed.
>
> Best regards
> Theo
>


Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

2019-09-10 Thread Fabian Hueske
Hi,

that would be regular SQL cast syntax:

SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ...


Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes :

> Hi.
>
> Can you give me an example of the actual syntax of such a cast?
>
> On Tue, 10 Sep 2019, 16:30 Fabian Hueske,  wrote:
>
>> Hi Niels,
>>
>> I think (not 100% sure) you could also cast the event time attribute to
>> TIMESTAMP before you emit the table.
>> This should remove the event time property (and thereby the
>> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
>> types.
>>
>> Best, Fabian
>>
>> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes > >:
>>
>>> Hi,
>>>
>>> It has taken me quite a bit of time to figure this out.
>>> This is the solution I have now (works on my machine).
>>>
>>> Please tell me where I can improve this.
>>>
>>> Turns out that the schema you provide for registerDataStream only needs
>>> the 'top level' fields of the Avro datastructure.
>>> With only the top fields there you can still access nested fields with
>>> something like "topfield.x.y.z" in the SQL statement.
>>>
>>> What I found is that the easiest way to make this all work is to ensure
>>> the rowtime field in the structure is at the top level (which makes sense
>>> in general) and generate the fields string where I only need to know the
>>> name of the "rowtime" field.
>>>
>>> So I have
>>>
>>> DataStream inputStream = ...
>>>
>>>
>>> then I register the stream with
>>>
>>>
>>> TypeInformation typeInformation = 
>>> TypeInformation.of(Measurement.class);
>>> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>>>
>>> List rootSchema = new ArrayList<>();
>>> for (String fieldName: fieldNames) {
>>> if (rowtimeFieldName.equals(fieldName)) {
>>> rootSchema.add(fieldName + ".rowtime");
>>> } else {
>>> rootSchema.add(fieldName);
>>> }
>>> }
>>>
>>> tableEnv.registerDataStream("MeasurementStream", inputStream, 
>>> String.join(",", rootSchema));
>>>
>>>
>>> Now after the actual SQL has been executed I have a
>>>
>>> Table resultTable = ...
>>>
>>> Now simply feeding this into a DataStream with something like this fails
>>> badly.
>>>
>>> TypeInformation tupleType = new 
>>> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> will result in
>>>
>>> org.apache.flink.table.api.TableException: The time indicator type is 
>>> an internal type only.
>>>at 
>>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>>>
>>> Turns out that the schema of the output contains a field that was
>>> created by TUMBLE_START which is of type TimeIndicatorTypeInfo
>>>
>>> So I have to do it this way (NASTY!):
>>>
>>> final TypeInformation[] fieldTypes = 
>>> resultTable.getSchema().getFieldTypes();
>>> int index;
>>> for(index = 0 ; index < fieldTypes.length ; index++) {
>>> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>>>fieldTypes[index] = SQL_TIMESTAMP;
>>> }
>>> }
>>> TypeInformation tupleType = new RowTypeInfo(fieldTypes);
>>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> Which gives me the desired DataStream.
>>>
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther  wrote:
>>>
>>>> Hi Niels,
>>>>
>>>> if you are coming from DataStream API, all you need to do is to write a
>>>> timestamp extractor.
>>>>
>>>> When you call:
>>>>
>>>> tableEnv.registerDataStream("TestStream", letterStream,
>>>> "EventTime.rowtime, letter, counter");
>>>>
>>>> The ".rowtime" means that the framework will extract the rowtime 

Re: Join with slow changing dimensions/ streams

2019-09-10 Thread Fabian Hueske
Hi Hanan,

BroadcastState and CoMap (or CoProcessFunction) have both advantages and
disadvantages.

Broadcast state is better if the broadcasted side is small (only low data
rate).
Its records are replicated to each instance but the other (larger) stream
does not need to be partitioned and stays on the partitions.

The CoMapFunction approach is better if both side are similar in size.
Their records are not replicated but repartitioned and sent over the
network.

This is the common trade-off between broadcast-forward and
repartition-repartition joins that query optimizer of distributed database
systems have to deal with.

Best,
Fabian

Am Do., 5. Sept. 2019 um 13:37 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Thanks Fabian.
>
>
> is there any advantage using broadcast state  VS using just CoMap function
> on 2 connected streams ?
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Thursday, September 5, 2019 12:59 PM
> *To:* Hanan Yehudai 
> *Cc:* flink-u...@apache.org
> *Subject:* Re: Join with slow changing dimensions/ streams
>
>
>
> Hi,
>
>
>
> Flink does not have good support for mixing bounded and unbounded streams
> in its DataStream API yet.
>
> If the dimension table is static (and small enough), I'd use a
> RichMapFunction and load the table in the open() method into the heap.
>
> In this case, you'd probably need to restart the job (can be done with a
> savepoint and restart) to load a new table. You can also use a
> ProcessFunction and register a timer to periodically load a new table.
>
>
>
> If the dimension table is (slowly) changing, you might want to think about
> the broadcast state.
>
> With this setup you can propagate updates by sending them to the
> broadcasted channel.
>
>
>
> I would not use the join operator because it would also buffer the actual
> stream in state.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have a very common use case -enriching the stream with  some
> dimension tables.
>
> e.g   the events stream has a SERVER_ID ,  and another files have the
> LOCATION  associated with e SERVER_ID. ( a dimension table  csv file)
>
> in SQL I would  simply join.
> but hen using Flink  stream API ,  as far as I see,  there are several
> option and I wondered which would be optimal.
>
>
>
> 1. Use the JOIN operator,,  from the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fstream%2Foperators%2Fjoining.html=01%7C01%7Chanan.yehudai%40radcom.com%7C04b58a965a6e46a467b008d731e7b852%7C0eb9e2d98763412e97093f539e9e25bc%7C1=q%2BlDzMlF2WQHTuo%2BjtmzL8zRMrU2uAZ4KweLLK2rfqc%3D=0>
> )
> this is always has some time aspect  to the join .  unless I use an
> interval join with very large upper bound and associate the dimension
> stream record with  an old timestamp.
>
>
>
> 2. just write a mapper function the gets the NAME from the dimesion
> records – that are preloaded on the mapFunction  loading method.
>
>
>
> 3. use a broadcast state – this way I can also listen to the changes on
> the dimension  tables  and do the actual join in the processElement
> ducntion.
>
>
>
> What soul be the most efficient way to do this from mem and Cpu
> consumption perspective ?
>
>
>
> Or is there another , better way ?
>
>


Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

2019-09-10 Thread Fabian Hueske
Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to
TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the
TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes :

> Hi,
>
> It has taken me quite a bit of time to figure this out.
> This is the solution I have now (works on my machine).
>
> Please tell me where I can improve this.
>
> Turns out that the schema you provide for registerDataStream only needs
> the 'top level' fields of the Avro datastructure.
> With only the top fields there you can still access nested fields with
> something like "topfield.x.y.z" in the SQL statement.
>
> What I found is that the easiest way to make this all work is to ensure
> the rowtime field in the structure is at the top level (which makes sense
> in general) and generate the fields string where I only need to know the
> name of the "rowtime" field.
>
> So I have
>
> DataStream inputStream = ...
>
>
> then I register the stream with
>
>
> TypeInformation typeInformation = 
> TypeInformation.of(Measurement.class);
> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>
> List rootSchema = new ArrayList<>();
> for (String fieldName: fieldNames) {
> if (rowtimeFieldName.equals(fieldName)) {
> rootSchema.add(fieldName + ".rowtime");
> } else {
> rootSchema.add(fieldName);
> }
> }
>
> tableEnv.registerDataStream("MeasurementStream", inputStream, 
> String.join(",", rootSchema));
>
>
> Now after the actual SQL has been executed I have a
>
> Table resultTable = ...
>
> Now simply feeding this into a DataStream with something like this fails
> badly.
>
> TypeInformation tupleType = new 
> RowTypeInfo(resultTable.getSchema().getFieldTypes());
> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> will result in
>
> org.apache.flink.table.api.TableException: The time indicator type is an 
> internal type only.
>at 
> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>
> Turns out that the schema of the output contains a field that was created
> by TUMBLE_START which is of type TimeIndicatorTypeInfo
>
> So I have to do it this way (NASTY!):
>
> final TypeInformation[] fieldTypes = 
> resultTable.getSchema().getFieldTypes();
> int index;
> for(index = 0 ; index < fieldTypes.length ; index++) {
> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>fieldTypes[index] = SQL_TIMESTAMP;
> }
> }
> TypeInformation tupleType = new RowTypeInfo(fieldTypes);
> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> Which gives me the desired DataStream.
>
>
> Niels Basjes
>
>
>
>
>
> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther  wrote:
>
>> Hi Niels,
>>
>> if you are coming from DataStream API, all you need to do is to write a
>> timestamp extractor.
>>
>> When you call:
>>
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>> The ".rowtime" means that the framework will extract the rowtime from the
>> stream record timestamp. You don't need to name all fields again but could
>> simply construct a string from letterStream.getTypeInfo().getFieldNames().
>> I hope we can improve this further in the future as part of FLIP-37.
>>
>> Regards,
>> Timo
>>
>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>
>> Hi,
>>
>> Experimenting with the StreamTableEnvironment I build something like this:
>>
>> DataStream> letterStream = ...
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>>
>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>> as the rowtime and has the DATETIME so I can do this
>>
>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>
>>
>> So far so good.
>>
>> Working towards a more realistic scenario I have a source that produces a
>> stream of records that have been defined using Apache Avro.
>>
>> So I have a Measurement.avdl that (among other things) contains something
>> like this:
>>
>> record Measurement {
>>/** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>> event occurred */
>> longtimestamp;
>> string  letter;
>> longpageviews;
>> }
>>
>>
>> Now because the registerDataStream call can also derive the schema from
>> the provided data I can do this:
>>
>> DataStream inputStream = ...
>> tableEnv.registerDataStream("DataStream", inputStream);
>>
>>
>> This is very nice because any real schema is big (few hundred columns)
>> and changes over time.
>>
>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
>> 

Re: Implementing CheckpointableInputFormat

2019-09-06 Thread Fabian Hueske
Hi,

CheckpointableInputFormat is only relevant if you plan to use the
InputFormat in a MonitoringFileSource, i.e., in a streaming application.
If you plan to use it in a DataSet (batch) program, InputFormat is fine.

Btw. the latest release Flink 1.9.0 has major improvements for the recovery
of batch jobs.

Best, Fabian

Am Do., 5. Sept. 2019 um 19:01 Uhr schrieb Lu Niu :

> Hi, Team
>
> I am implementing a custom InputFormat. Shall I
> implement CheckpointableInputFormat interface? If I don't, does that mean
> the whole job has to restart given only one task fails? I ask because I
> found all InputFormat implements CheckpointableInputFormat, which makes me
> confused. Thank you!
>
> Best
> Lu
>


Re: TABLE API + DataStream outsourcing schema or Pojo?

2019-09-06 Thread Fabian Hueske
Hi Steve,

The memory catalog does not persist metadata and needs to be repopulated
everytime.
However, you can implement a catalog that persists the metadata to a file
or a database.

There is an effort to implement a Catalog interface of Hive's metastore.
A preview is available in the latest release (1.9.0)

Best, Fabian

Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert <
contact.steverob...@gmail.com>:

> Hi Fabian ,
>
> thank you for your answer it is indeed the solution that I am currently
> testing
> i use TypeInformation convert =
> JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the
> flink-json  and provide the TypeFormation to the operatorStream
> its look like to work :) with this solution my schema can be outside my
> package
>
> one additional question about .  GenericMemoryCatalog
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html
>  .
> catalog can be use accross multiple job running on the same cluster ? or
> the catalog are  scoped on the job session only ?
>
> DataStream dataStreamJson = dataStream.map(new MapFunction JsonNode>() {
> @Override
> public JsonNode map(String s) throws Exception {
> ObjectMapper objectMapper = new ObjectMapper();
> JsonNode node = objectMapper.readTree(s);
> return node;
> }
> });
> DataStream dataStreamRow = dataStreamJson.map(new MapFunction Row>() {
> @Override
> public Row map(JsonNode jsonNode) throws Exception {
> int pos = 0;
> Row row = new Row(jsonNode.size());
> Iterator iterator = jsonNode.fieldNames();
> while (iterator.hasNext()) {
> String key = iterator.next();
> row.setField(pos, jsonNode.get(key).asText());
> pos++;
> }
> return row;
> }
> }).returns(convert);
>
> Table tableA = tEnv.fromDataStream(dataStreamRow);
>
>
> Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske  a écrit :
>
>> Hi Steve,
>>
>> Maybe you could implement a custom TableSource that queries the data from
>> the rest API and converts the JSON directly into a Row data type.
>> This would also avoid going through the DataStream API just for ingesting
>> the data.
>>
>> Best, Fabian
>>
>> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
>> contact.steverob...@gmail.com>:
>>
>>> Hi guys ,
>>>
>>> It's been a while since I'm studying TABLE APIs for integration into my
>>> system.
>>> when i take a look on this documentation
>>> :
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>>
>>>
>>> I understand that it is possible to apply a JSON FORMAT on the connector
>>> and apply a JSON-SCHEMA without any hardcoded  java pojo
>>> .jsonSchema(
>>>   "{" +
>>>   "  type: 'object'," +
>>>   "  properties: {" +
>>>   "lon: {" +
>>>   "  type: 'number'" +
>>>   "}," +
>>>   "rideTime: {" +
>>>   "  type: 'string'," +
>>>   "  format: 'date-time'" +
>>>   "}" +
>>>   "  }" +
>>>   "}"
>>> )
>>>
>>>
>>> but my problematic is the following my data comes from REST-API , so
>>> I have to process the data and transmit it via a DataStream
>>> the problem is that between the conversation of a dataStream and a
>>> table must pass through a Java Pojo. Datastream  input
>>>  Table table=tEnv.fromDataStream(input);
>>> I tried a trick while making a conversation from my JSON to AVRO
>>> using a GenericRecord but it does not seem possible .
>>>
>>> my user case and being able to add REST-API processing  in runtime
>>> and be able to outsource and dynamically load my Pojo / Schema without
>>> harcode an Java-Pojo object
>>>
>>>
>>> Do you have an approach to suggest me ?
>>>
>>>
>>> Thank a lot
>>>
>>


[ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Fabian Hueske
Hi everyone,

I'm very happy to announce that Kostas Kloudas is joining the Flink PMC.
Kostas is contributing to Flink for many years and puts lots of effort in
helping our users and growing the Flink community.

Please join me in congratulating Kostas!

Cheers,
Fabian


Re: Exception when trying to change StreamingFileSink S3 bucket

2019-09-05 Thread Fabian Hueske
Hi,

Kostas (in CC) might be able to help.

Best, Fabian

Am Mi., 4. Sept. 2019 um 22:59 Uhr schrieb sidhartha saurav <
sidsau...@gmail.com>:

> Hi,
>
> Can someone suggest a workaround so that we do not get this issue while
> changing the S3 bucket ?
>
> On Thu, Aug 22, 2019 at 4:24 PM sidhartha saurav 
> wrote:
>
>> Hi,
>>
>> We are trying to change our StreamingFileSink S3 bucket, say from s3://
>> *eu1/output_old* to s3://*eu2/output_new*. When we do so we get an
>> exception and the taskmanger goes into a restart loop.
>>
>> We suspect that it tries to restore state and gets the bucketid from
>> saved state [* final BucketID bucketId =
>> recoveredState.getBucketId()*]. Flink then tries to read output_old from
>> eu2 and gets an AccessDeniedError. Rightly so as it has permission for
>> s3://eu2/output_new and not s3://eu2/output_old. We are not sure why is
>> Flink trying to access the old bucket and how to avoid this exception.
>>
>> Logs:
>>
>> > "S3Committer.java","line":"87","message":"Failed to commit after
>> recovery output_old/2019-08-22/18/part-3-40134 with MPU ID
>> 7adJKrKCqFJnFhI2agC8BiMnLdHUoaGrIfnhJ00ezgGINvguYJtGmjsp4P64.qkAiC0khB6me7ZuU.qWzC8jTcUvULym1lScNNfkgcoRP2tq4BDIb4.HyMSgAmkmbtj7.
>> Checking if file was committed before...",
>>
>> > "Task.java","line":"910","message":"... switched from RUNNING to
>> FAILED."
>>
>> > java.nio.file.AccessDeniedException: output_old/2019-08-22/18/part-3-40134:
>> getObjectMetadata on output_old/2019-08-22/18/part-3-40134:
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
>> Request ID: 79F1AEE53131FB66; S3 Extended Request ID:
>> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=),
>> S3 Extended Request ID:
>> 8mXPS2r03aaQIp4b+cFSehTMVRleC6k5gAC6/KWmmwMxr3Gxr/jy4gX06ZHH/+P7SRT9uNxtA1U=:403
>> Forbidden
>>
>> flink-taskmanager at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>> flink-taskmanager at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObjectMetadata(HadoopS3AccessHelper.java:126)
>> flink-taskmanager at
>> org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:92)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:128)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
>> flink-taskmanager at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> flink-taskmanager at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
>> *restoreFunctionState*(StreamingFunctionUtils.java:160)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> flink-taskmanager at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>> flink-taskmanager at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> flink-taskmanager at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> flink-taskmanager at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> flink-taskmanager at java.lang.Thread.run(Thread.java:748)
>>
>>
>> We are using Flink 1.8 and externalized checkpoint. The S3 bucket for
>> externalized checkpoint have not been modified.
>>
>> Thanks
>> Sidhartha
>>
>


Re: TABLE API + DataStream outsourcing schema or Pojo?

2019-09-05 Thread Fabian Hueske
Hi Steve,

Maybe you could implement a custom TableSource that queries the data from
the rest API and converts the JSON directly into a Row data type.
This would also avoid going through the DataStream API just for ingesting
the data.

Best, Fabian

Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
contact.steverob...@gmail.com>:

> Hi guys ,
>
> It's been a while since I'm studying TABLE APIs for integration into my
> system.
> when i take a look on this documentation
> :
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
>
> I understand that it is possible to apply a JSON FORMAT on the connector
> and apply a JSON-SCHEMA without any hardcoded  java pojo
> .jsonSchema(
>   "{" +
>   "  type: 'object'," +
>   "  properties: {" +
>   "lon: {" +
>   "  type: 'number'" +
>   "}," +
>   "rideTime: {" +
>   "  type: 'string'," +
>   "  format: 'date-time'" +
>   "}" +
>   "  }" +
>   "}"
> )
>
>
> but my problematic is the following my data comes from REST-API , so I
> have to process the data and transmit it via a DataStream
> the problem is that between the conversation of a dataStream and a
> table must pass through a Java Pojo. Datastream  input
>  Table table=tEnv.fromDataStream(input);
> I tried a trick while making a conversation from my JSON to AVRO using
> a GenericRecord but it does not seem possible .
>
> my user case and being able to add REST-API processing  in runtime and
> be able to outsource and dynamically load my Pojo / Schema without harcode
> an Java-Pojo object
>
>
> Do you have an approach to suggest me ?
>
>
> Thank a lot
>


Re: error in my job

2019-09-05 Thread Fabian Hueske
Hi,

Are you getting this error repeatedly or was this a single time?

If it's just a single time error, it's probably caused by a task manager
process that died for some reason (as suggested by the error message).
You should have a look at the TM logs whether you can finds something that
would explain why the process died.

Best, Fabian

Am Di., 3. Sept. 2019 um 12:58 Uhr schrieb yuvraj singh <
19yuvrajsing...@gmail.com>:

> Hi all ,
>
> i am facing a problem in my flink job , i am getting given exception
>
>
> 2019-09-03 12:02:04,278 ERROR
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
> Encountered error while consuming partitions
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 2019-09-03 12:02:04,324 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Window(TumblingEventTimeWindows(6),
> EventTimeTrigger, CityAverage, PassThroughWindowFunction) -> Map -> Flat
> Map -> Sink: Unnamed (50/60) (9c621519b14b3b40c73f5037311b2206) switched
> from RUNNING to FAILED.
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '/10.14.58.77:35531'.
> This might indicate that the remote task manager was lost.
>
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
>
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822)
>
> at
> 

Re: understanding task manager logs

2019-09-05 Thread Fabian Hueske
Hi Vishwas,

This is a log statement from Kafka [1].
Not sure how when AppInfoParser is created (the log message is written by
the constructor).

For Kafka versions > 1.0, I'd recommend the universal connector [2].

Not sure how well it works if producers and consumers have different
versions.
Maybe Gordon (in CC) has some experience with that.

Best, Fabian

[1]
https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java#L117
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector

Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara <
vsirav...@gmail.com>:

> Hi guys,
> I am using flink 1.7.2 and my application consumes from a kafka topic and
> publish to another kafka topic which is in its own kafka environment
> running a different kafka version,. I am using FlinkKafkaConsumer010 from
> this dependency
> *"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. *
>
> In the task manager log I see these lines:
>
> 2019-09-02 02:57:59,840 INFO  
> org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
> logged in.
> 2019-09-02 02:57:59,841 INFO  
> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh thread 
> started.
> 2019-09-02 02:57:59,842 INFO  
> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid starting 
> at: Mon Sep 02 02:57:59 GMT 2019
> 2019-09-02 02:57:59,843 INFO  
> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Mon 
> Sep 02 12:57:59 GMT 2019
> 2019-09-02 02:57:59,843 INFO  
> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh 
> sleeping until: Mon Sep 02 11:14:13 GMT 2019
> 2019-09-02 02:57:59,919 WARN  
> org.apache.kafka.clients.consumer.ConsumerConfig  - The 
> configuration 'zookeeper.connect' was supplied but isn't a known 
> config.*2019-09-02 02:57:59,919 INFO  
> org.apache.kafka.common.utils.AppInfoParser   - Kafka version 
> : 0.10.2.0
> *2019-09-02 02:57:59,919 INFO  org.apache.kafka.common.utils.AppInfoParser
>- Kafka commitId : 576d93a8dc0cf421
>
> Here if you see the Kafka version is 0.10.2.0. Is this the version the broker 
> is running or is this coming from flink ? I have forced the kafka-client 
> version
>
> to be 2.2.0
>
> "org.apache.kafka" % "kafka-clients" % "2.2.0" force()
>
> I also don't see 0.10.2.0 in the dependency tree of my build.
>
> Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ? What 
> should I do if the consumer broker and producer broker are on different 
> versions of kafka ?
>
>
> Thanks,
>
> Vishwas
>
>
> Thanks,
>
> Vishwas
>
>
>
>


Re: Window metadata removal

2019-09-05 Thread Fabian Hueske
Hi,

A window needs to keep the data as long as it expects new data.
This is clearly the case before the end time of the window was reached. If
my window ends at 12:30, I want to wait (at least) until 12:30 before I
remove any data, right?

In case you expect some data to be late, you can configure allowedLateness.
Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
would keep the metadata of the window that closes at 12:30 until 12:40.
The data is kept to be able to update the result of the window until
allowedLateness has passed.
If we for example receive a late record at 12:38, we can still update the
result of the window because we kept all required data.

If you don't need allowedLateness, don't configure it (the default is 0).

Best, Fabian

Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl :

> Hi,
>
> I'm interested in why metadata like WindowOperator and InternalTimer are
> being kept for windowSize + allowedLateness period per each pane.
>
>- What is the purpose of keeping this data if no new events are
>expected to enter the pane?
>- Is there any way this metadata can be released earlier?
>
>


Re: Join with slow changing dimensions/ streams

2019-09-05 Thread Fabian Hueske
Hi,

Flink does not have good support for mixing bounded and unbounded streams
in its DataStream API yet.
If the dimension table is static (and small enough), I'd use a
RichMapFunction and load the table in the open() method into the heap.
In this case, you'd probably need to restart the job (can be done with a
savepoint and restart) to load a new table. You can also use a
ProcessFunction and register a timer to periodically load a new table.

If the dimension table is (slowly) changing, you might want to think about
the broadcast state.
With this setup you can propagate updates by sending them to the
broadcasted channel.

I would not use the join operator because it would also buffer the actual
stream in state.

Best, Fabian

Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> I have a very common use case -enriching the stream with  some
> dimension tables.
>
> e.g   the events stream has a SERVER_ID ,  and another files have the
> LOCATION  associated with e SERVER_ID. ( a dimension table  csv file)
>
> in SQL I would  simply join.
> but hen using Flink  stream API ,  as far as I see,  there are several
> option and I wondered which would be optimal.
>
>
>
> 1. Use the JOIN operator,,  from the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
> )
> this is always has some time aspect  to the join .  unless I use an
> interval join with very large upper bound and associate the dimension
> stream record with  an old timestamp.
>
>
>
> 2. just write a mapper function the gets the NAME from the dimesion
> records – that are preloaded on the mapFunction  loading method.
>
>
>
> 3. use a broadcast state – this way I can also listen to the changes on
> the dimension  tables  and do the actual join in the processElement
> ducntion.
>
>
>
> What soul be the most efficient way to do this from mem and Cpu
> consumption perspective ?
>
>
>
> Or is there another , better way ?
>


[ANNOUNCE] Flink Forward training registration closes on September 30th

2019-09-05 Thread Fabian Hueske
Hi all,

The registration for the Flink Forward Europe training sessions closes in
four weeks.
The training takes place in Berlin at October 7th and is followed by two
days of talks by speakers from companies like Airbus, Goldman Sachs,
Netflix, Pinterest, and Workday [1].

The following four training sessions are available [2]:
* Developer Training (Java/Scala)
* Operations Training
* SQL Developer Training
* Tuning & Trouble Shooting Training

If you'd like to participate in one of the sessions, you should register
soon at
> *https://europe-2019.flink-forward.org/register
*

*ATTENTION*
Members of the Flink community get a 50% discount on training and
conference tickets if they register with the code *FFEU19-MailingList*
Apache committers (regardless of which project they contribute to) get a
free ticket if they register with their Apache email address and use the
code *FFEU19-ApacheCommitter*

Best,
Fabian

[1] https://europe-2019.flink-forward.org/conference-program
[2] https://europe-2019.flink-forward.org/training-program


Re: tumbling event time window , parallel

2019-09-02 Thread Fabian Hueske
I meant to not use Flink's built-in windows at all but implement your logic
in a KeyedProcessFunction.

So basically:
myDataStream.keyBy(...).process(new MyKeyedProcessFunction)
instead of:
myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction)

Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Im not sure what you mean by use process function and not window process
> function ,  as the window operator takes in a windowprocess function..
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 1:33 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> I would use a regular ProcessFunction, not a WindowProcessFunction.
>
>
>
> The final WM depends on how the records were partitioned at the watermark
> assigner (and the assigner itself).
>
> AFAIK, the distribution of files to source reader tasks is not
> deterministic. Hence, the final WM changes from run to run.
>
>
>
> Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> You said “ You can use a custom ProcessFunction and compare the timestamp
> of each record with the current watermark.”.
>
>
>
> Does the  window  process function has all the events – even the ones that
> are dropped due to lateness?
> from what I’m understand the “ iterable”  argument I contains the record
> that were inserted into the window  and NOT the ones dropped.   Isn’t that
> correct ?
>
>
>
>
>
> Also,
>
> when looking on Flink’s monitoring page  - for  the  watermarks  I see
> different vales  even after all my files were processed.  Which is
> something I would not expect
> I would expect that eventually   the WM will be the highest EVENT_TIME on
> my set of files..
>
>
>
>
>
> thanks
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 12:38 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> The paths of the files to read are distributed across all reader / source
> tasks and each task reads the files in order of their modification
> timestamp.
>
> The watermark generator is not aware of any files and just looks at the
> stream of records produced by the source tasks.
>
> You need to chose the WM generator strategy such that you minimize the
> number of late records.
>
>
>
> I'd recommend to first investigate how many late records you are dealing
> with.
>
> You can use a custom ProcessFunction and compare the timestamp of each
> record with the current watermark.
>
>
>
> AllowedLateness is also not a magical cure. It will just emit updates
> downstream, i.e., you need to remove the results that were updated by a
> more complete result.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the re

Re: End of Window Marker

2019-09-02 Thread Fabian Hueske
Hi Padarn,

Regarding your throughput concerns: A sink task may write to multiple
partitions, but each partition may only be written by a single task.

@Eduardo: Thanks for sharing your approach! Not sure if I understood it
correctly, but I think that the approach does not guarantee that all
results of a window are emitted before the end-of-window marker is written.
Since the sink operator and the single-task-operator are separate
operators, the output records might get stuck (or be bufffered) in one of
the sink tasks and the single-task would still emit an end-of-window marker
record because it doesn't know about the sink task.

Best,
Fabian

Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com>:

> Hi,
>
> I'll chip in with an approach I'm trying at the moment that seems to work,
> and I say seems because I'm only running this on a personal project.
>
> Personally, I don't have anything against end-of-message markers per
> partition, Padarn you seem to not prefer this option as it overloads the
> meaning of the output payload. My approach is equally valid when producing
> watermarks/end-of-message markers on a side output though.
>
> The main problem of both approaches is knowing when the window has
> finished across all partitions without having to wait for the start of the
> next window.
>
> I've taken the approach of sending all output messages of the window to 1.
> the sink but also 2. a single task operator. The single task operator
> registers an event time based timer at the time of the end of the window.
> You have the confidence of the task's timer triggering only once at the
> right time because all the post-window watermarks go through to the same
> task. At that point I make the task send an end-of-message marker to every
> partition. I don't need to send the count because Kafka messages are
> ordered. AND IF you prefer to not overload the semantic of your original
> Kafka topic you can post the message to a separate location of your choice.
>
> While this does mean that the end of marker message only gets sent through
> once the window has finished across all substreams (as opposed to per
> stream), it does mean you don't need to wait for the next window to start
> AND the watermark gap between substreams should never grow that much anyway.
>
> This approach should be particularly useful when the number of partitions
> or keying mechanism is different between the input and output topics.
>
> Hopefully that doesn't sound like a terrible idea.
>
> eduardo
>
>
>
>
> On Wed, 28 Aug 2019, 02:54 Padarn Wilson,  wrote:
>
>> Hi again Fabian,
>>
>> Thanks for pointing this out to me. In my case there is no need for keyed
>> writing - but I do wonder if having each kafka task write only to a single
>> partition would significantly affect performance.
>>
>> Actually now that I think about it, the approach to just wait for the
>> first records of the next window is also subject to the problem you mention
>> above: a producer lagging behind the rest could end up with a partition
>> containing element out of ‘window order’.
>>
>> I was also thinking this problem is very similar to that of checkpoint
>> barriers. I intended to dig into the details of the exactly once Kafka sink
>> for some inspiration.
>>
>> Padarn
>>
>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske  wrote:
>>
>>> Hi Padarn,
>>>
>>> Yes, this is quite tricky.
>>> The "problem" with watermarks is that you need to consider how you write
>>> to Kafka.
>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
>>> written by multiple producers), you need to broadcast the watermarks to
>>> each partition, i.e., each partition would receive watermarks from each
>>> parallel sink task. So in order to reason about the current watermark of a
>>> partition, you need to observe them and take the minimum WM across all
>>> current sink task WMs.
>>> Things become much easier, if each partition is only written by a single
>>> task but this also means that data is not key-partitioned in Kafka.
>>> In that case, the sink task only needs to write a WM message to each of
>>> its assigned partitions.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>>
>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
>>> pad...@gmail.com>:
>>>
>>>> Hi Fabian, thanks for your input
>>>>
>>>> Exactly. Actually my first instinct was to see if it was possible to
>>>> publish the watermarks somehow 

Re: checkpoint failure in forever loop suddenly even state size less than 1 mb

2019-09-02 Thread Fabian Hueske
Hi Sushant,

It's hard to tell what's going on.
Maybe the thread pool of the async io operator is too small for the
ingested data rate?
This could cause the backpressure on the source and eventually also the
failing checkpoints.

Which Flink version are you using?

Best, Fabian


Am Do., 29. Aug. 2019 um 12:07 Uhr schrieb Sushant Sawant <
sushantsawant7...@gmail.com>:

> Hi Fabian,
> Sorry for one to one mail.
> Could you help me out with this m stuck with this issue over a week now.
>
> Thanks & Regards,
> Sushant Sawant
>
>
>
> On Tue, 27 Aug 2019, 15:23 Sushant Sawant, 
> wrote:
>
>> Hi, firstly thanks for replying.
>>
>> Here it is.. configuration related to checkpoint.
>>
>> CheckpointingMode checkpointMode =
>> CheckpointingMode.valueOf(‘AT_LEAST_ONCE’);
>>
>> Long checkpointInterval =
>> Long.valueOf(parameterMap.get(Checkpoint.CHECKPOINT_INTERVAL.getKey()));
>>
>> StateBackend sb=new FsStateBackend(file:);
>>
>> env.setStateBackend(sb);
>>
>> env.enableCheckpointing(30, checkpointMode);
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>>
>> env.getCheckpointConfig().setCheckpointTimeout(18);
>>
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> Thanks & Regards,
>> Sushant Sawant
>>
>> On Tue, 27 Aug 2019, 14:09 pengcheng...@bonc.com.cn, <
>> pengcheng...@bonc.com.cn> wrote:
>>
>>> Hi,What's your checkpoint config?
>>>
>>> --
>>> pengcheng...@bonc.com.cn
>>>
>>>
>>> *From:* Sushant Sawant 
>>> *Date:* 2019-08-27 15:31
>>> *To:* user 
>>> *Subject:* Re: checkpoint failure suddenly even state size less than 1
>>> mb
>>> Hi team,
>>> Anyone for help/suggestion, now we have stopped all input in kafka,
>>> there is no processing, no sink but checkpointing is failing.
>>> Is it like once checkpoint fails it keeps failing forever until job
>>> restart.
>>>
>>> Help appreciated.
>>>
>>> Thanks & Regards,
>>> Sushant Sawant
>>>
>>> On 23 Aug 2019 12:56 p.m., "Sushant Sawant" 
>>> wrote:
>>>
>>> Hi all,
>>> m facing two issues which I believe are co-related though.
>>> 1. Kafka source shows high back pressure.
>>> 2. Sudden checkpoint failure for entire day until restart.
>>>
>>> My job does following thing,
>>> a. Read from Kafka
>>> b. Asyncio to external system
>>> c. Dumping in Cassandra, Elasticsearch
>>>
>>> Checkpointing is using file system.
>>> This flink job is proven under high load,
>>> around 5000/sec throughput.
>>> But recently we scaled down parallelism since, there wasn't any load in
>>> production and these issues started.
>>>
>>> Please find the status shown by flink dashboard.
>>> The github folder contains image where there was high back pressure and
>>> checkpoint failure
>>>
>>> https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
>>> and  after restart, "everything is fine" images in this folder,
>>>
>>> https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing
>>>
>>> --
>>> Could anyone point me towards direction what would have went wrong/
>>> trouble shooting??
>>>
>>>
>>> Thanks & Regards,
>>> Sushant Sawant
>>>
>>>
>>>
>>>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-27 Thread Fabian Hueske
Hi everyone,

The Docker images for Flink 1.9.0 with Scala 2.12 are available now :-)

Cheers, Fabian

Oytun Tez  schrieb am Di., 27. Aug. 2019, 21:18:

> Thank you, Fabian! We are migrating soon once 2.12 is available.
>
> Cheers,
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Tue, Aug 27, 2019 at 8:11 AM Fabian Hueske  wrote:
>
>> Hi all,
>>
>> Flink 1.9 Docker images are available at Docker Hub [1] now.
>> Due to some configuration issue, there are only Scala 2.11 issues at the
>> moment but this was fixed [2].
>> Flink 1.9 Scala 2.12 images should be available soon.
>>
>> Cheers,
>> Fabian
>>
>> [1] https://hub.docker.com/_/flink
>> [2]
>> https://github.com/docker-flink/docker-flink/commit/01e41867c9270cd4dd44970cdbe53ff665e0c9e3
>>
>> Am Mo., 26. Aug. 2019 um 20:03 Uhr schrieb Oytun Tez > >:
>>
>>> Thanks Till and Zili!
>>>
>>> I see that docker-flink repo now has 1.9 set up, we are only waiting for
>>> it to be pushed to Docker Hub. We should be fine once that is done.
>>>
>>> Thanks again!
>>>
>>>
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Mon, Aug 26, 2019 at 4:04 AM Zili Chen  wrote:
>>>
>>>> Hi Oytun,
>>>>
>>>> I think it intents to publish flink-queryable-state-client-java
>>>> without scala suffix since it is scala-free. An artifact without
>>>> scala suffix has been published [2].
>>>>
>>>> See also [1].
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-12602
>>>> [2]
>>>> https://mvnrepository.com/artifact/org.apache.flink/flink-queryable-state-client-java/1.9.0
>>>>
>>>>
>>>>
>>>> Till Rohrmann  于2019年8月26日周一 下午3:50写道:
>>>>
>>>>> The missing support for the Scala shell with Scala 2.12 was documented
>>>>> in the 1.7 release notes [1].
>>>>>
>>>>> @Oytun, the docker image should be updated in a bit. Sorry for the
>>>>> inconveniences. Thanks for the pointer that
>>>>> flink-queryable-state-client-java_2.11 hasn't been published. We'll upload
>>>>> this in a bit.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/release-notes/flink-1.7.html#scala-shell-does-not-work-with-scala-212
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sat, Aug 24, 2019 at 12:14 PM chaojianok 
>>>>> wrote:
>>>>>
>>>>>> Congratulations and thanks!
>>>>>> At 2019-08-22 20:03:26, "Tzu-Li (Gordon) Tai" 
>>>>>> wrote:
>>>>>> >The Apache Flink community is very happy to announce the release of
>>>>>> Apache
>>>>>> >Flink 1.9.0, which is the latest major release.
>>>>>> >
>>>>>> >Apache Flink® is an open-source stream processing framework for
>>>>>> >distributed, high-performing, always-available, and accurate data
>>>>>> streaming
>>>>>> >applications.
>>>>>> >
>>>>>> >The release is available for download at:
>>>>>> >https://flink.apache.org/downloads.html
>>>>>> >
>>>>>> >Please check out the release blog post for an overview of the
>>>>>> improvements
>>>>>> >for this new major release:
>>>>>> >https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>>>>>> >
>>>>>> >The full release notes are available in Jira:
>>>>>> >
>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601
>>>>>> >
>>>>>> >We would like to thank all contributors of the Apache Flink
>>>>>> community who
>>>>>> >made this release possible!
>>>>>> >
>>>>>> >Cheers,
>>>>>> >Gordon
>>>>>>
>>>>>


Re: I'm not able to make a stream-stream Time windows JOIN in Flink SQL

2019-08-27 Thread Fabian Hueske
Hi Theo,

Re your first approach:
TUMBLE_START is treated as a special function.It can only be used in the
SELECT clause if there is a TUMBLE function call in the GROUP BY cause.
If you use FLOOR(s1.ts TO DAY) == FLOOR(s2.ts TO DAY) it should work.
You can also drop one of the BETWEEN predicates because the condition is
symmetric.

So basically, you need to add the FLOOR ... predicate to your second
approach and that should do the trick.
In principle, your query runs a Flink-supported time-window join and you
add a more restrictive predicate to it.

Best,
Fabian

Am Fr., 23. Aug. 2019 um 14:23 Uhr schrieb Theo Diefenthal <
theo.diefent...@scoop-software.de>:

> Hi Fabian, Hi Zhenghua
>
> Thank you for your suggestions and telling me that I was on the right
> track. And good to know how to find out whether something yields to
> time-bounded or regular join.
>
> @Fabian: Regarding your suggested first option: Isn't that exactly what my
> first try was? With this TUMBLE_START... That sadly didn't work due to "
> Rowtime attributes must not be in the input rows of a regular join ". But
> I'll give option 2 a try by just adding another attribute.
>
> And some addition: Regarding my second try: I wrote that the reduced query
> didn't produce any data, but that was indeed my mistake. I fiddled around
> too much with my data so that I manipulated the original data in a way that
> the query couldn't output a result any more when testing all of those
> combinations. Now the second attempt works but isn't really what I wanted
> to query (as the "same day"-predicate is still missing).
>
> Best regards
> Theo
>
> --
> *Von: *"Fabian Hueske" 
> *An: *"Zhenghua Gao" 
> *CC: *"Theo Diefenthal" , "user" <
> user@flink.apache.org>
> *Gesendet: *Freitag, 16. August 2019 10:05:45
> *Betreff: *Re: I'm not able to make a stream-stream Time windows JOIN in
> Flink SQL
>
> Hi Theo,
>
> The main problem is that the semantics of your join (Join all events that
> happened on the same day) are not well-supported by Flink yet.
>
> In terms of true streaming joins, Flink supports the time-windowed join
> (with the BETWEEN predicate) and the time-versioned table join (which does
> not apply here).
> The first does not really fit because it puts the windows "around the
> event", i.e., if you have an event at 12:35 and a window of 10 mins earlier
> and 15 mins later, it will join with events between 12:25 and 12:50.
> An other limitation of Flink is that you cannot modify event-time
> attributes (well you can, but they lose their event-time property and
> become regular TIMESTAMP attributes).
> This limitation exists, because we must ensure that the attributes are
> still aligned with watermarks after they were modified (or adjusting the
> watermarks accordingly).
> Since analyzing expressions that modify timestamps to figure out whether
> they preserve watermark alignment is very difficult, we opted to always
> remove event-time property when an event-time attribute is modified.
>
> I see two options for your use case:
>
> 1) use the join that you described before with the -24 and +24 hour window
> and apply more fine-grained predicates to filter out the join results that
> you don't need.
> 2) add an additional time attribute to your input that is a rounded down
> version of the timestamp (rounded to 24h), declare the rounded timestamp as
> your event-time attribute, and join with an equality predicate on the
> rounded timestamp.
>
> Best, Fabian
>
> Am Di., 13. Aug. 2019 um 13:41 Uhr schrieb Zhenghua Gao  >:
>
>> I wrote a demo example for time windowed join which you can pick up [1]
>> [1] https://gist.github.com/docete/8e78ff8b5d0df69f60dda547780101f1
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Tue, Aug 13, 2019 at 4:13 PM Zhenghua Gao  wrote:
>>
>>> You can check the plan after optimize to verify it's a regular join or
>>> time-bounded join(Should have a WindowJoin). The most direct way is
>>> breakpoint at optimizing phase [1][2].
>>> And you can use your TestData and create an ITCase for debugging [3]
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala#L148
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/StreamOptimizer.scala#L68
>>> [3]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org

Re: Are there any news on custom trigger support for SQL/Table API?

2019-08-27 Thread Fabian Hueske
Hi Theo,

The work on custom triggers has been put on hold due to some major
refactorings (splitting the modules, porting Scala code to Java, new type
system, new catalog interfaces, integration of the Blink planner).
It's also not on the near-time roadmap AFAIK.
To be honest, I'm not sure how much support for custom trigger there will
be.

Best,
Fabian

Am Fr., 23. Aug. 2019 um 14:39 Uhr schrieb Theo Diefenthal <
theo.diefent...@scoop-software.de>:

> Hi there,
>
> I currently evaluate to let our experienced system users write Flink-SQL
> queries directly. Currently, all queries our users need are implemented
> programmatically.
>
> There is one major problem preventing us from just giving SQL to our users
> directly. Almost all queries of our users are "threshold-based". They are
> something like "Notify me directly if there were (>=)10 machine restarts
> today". So the time window is one day, but if there are 10 outages at 10:00
> already, we must trigger the window.
>
> In our programmed pipelines, that's rather easy to accomplish. We just
> build Count-Triggers. In Flink-SQL, this seems to not be possible at all.
> See [1].
>
> That mailing list entry references a word draft document of
> implementations talking about a SQL syntax enrichments (With EMIT clause)
> but as a pre-step, it seems to be planned that Flink could provide trigger
> customization via QueryConfig:
>
> "Flink’s Table API / SQL features a QueryConfig object to configure the
> execution of a streaming query. So far (Flink 1.3), it only allows to
> specify a state retention period but we plan to extend it with trigger
> policies. This way we do not have to extend SQL itself to specify
> triggering policies. However, an EMIT statement could of course override
> the configuration of a QueryConfig. So from our point of view it would be
> an optional alternative."
>
> That mailing list article is >1 year old and that word doc references to
> Flink 1.3. I checked out QueryConfig in my Flink 1.8.0 and it sadly seems
> to still not support this. So my question is: Is there any timeline /
> roadmap / JIRA issue to track this and is this a still planned feature (in
> near/mid/long-term?) Just being able to change the trigger via QueryConfig
> would already be really great for us, and having this EMIT clause would of
> course be awesome.
>
> Best regards
> Theo
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-Do-Not-Support-Custom-Trigger-td20932.html
>


  1   2   3   4   5   6   7   8   9   10   >