Re: AllwindowStream and RichReduceFunction

2020-07-27 Thread Aljoscha Krettek

I think that should work with an aggregate() instead of reduce().

Best,
Aljoscha

On 24.07.20 17:02, Flavio Pompermaier wrote:

In my reduce function I want to compute some aggregation on the sub-results
of a map-partition (that I tried to migrate from DataSet to DataStream
without success).
The original code was something like:

  input.mapPartition(new RowToStringSketches(sketchMapSize)) //
 .reduce(new SketchesStringReducer()) //
 .map(new SketchesStringToStatsPojo(colIndex, topK));

I asked about the simulation of the mapPartition function in the streaming
env in another thread in the mailing list [1] because I was not able to
test it..it seems that the program was exiting before be able to process
anything..
So I gave up on replacing DataSet with DataStream API for the moment..it
seems that there are too many things still to migrate.
Btw, this is the reduce function:

public class SketchesStringReducer extends
RichReduceFunction> {
   private static final long serialVersionUID = 1L;

   private transient ArrayOfItemsSerDe serDe;

   @Override
   public void open(Configuration parameters) throws Exception {
 this.serDe = new ArrayOfStringsSerDe();
   }

   @Override
   public Tuple2 reduce(Tuple2 t1,
Tuple2 t2)
   throws Exception {
 // merge HLL
 final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
 final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
 final Union union = new Union(hll1.getLgConfigK());
 union.update(hll1);
 union.update(hll2);
 final byte[] hllSketchBytes = union.getResult().toCompactByteArray();

 // merge Item
 final ItemsSketch s1 =
ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
 final ItemsSketch s2 =
ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
 final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
 return new Tuple2<>(hllSketchBytes, itemSketchBytes);
   }
}

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767

On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek 
wrote:


What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution.

Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:

Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
wrote:


Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:



https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300


Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but

Flink

says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio













Re: improve the performance of flink sql job which lookup 40+ table

2020-07-27 Thread Jark Wu
Hi,

Yes, currently, multiple lookup join is not parallel and execute one by
one.
Async lookup + cache is the suggested way to improve performance.
If the lookup tables are not large, you can also implement a ALL cache for
the LookupTableSource to cache all the data in the database, and reload
periodically.

In Flink 1.12, we will support temporal join changelog [1] which will join
the changelog stream instead of lookup database,
 this will greatly improve the performance and have lowest latency.

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL




On Tue, 28 Jul 2020 at 11:43, snack white  wrote:

> HI:
>   My  flink  version is 1.10  use per-job mode , my   sql like
>
> ```
> select
>   column1, t2.xx2, t3.xx3,t4.xx4
>   …  t40.xx40
> from
>   main_table
>   left join lookup_1 FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.xx=
> t2.xx
>   left join lookup_2 FOR SYSTEM_TIME AS OF t1.proc_time AS t3 on t1.xx=
> t3.xx
>   left join lookup_3 FOR SYSTEM_TIME AS OF t1.proc_time AS t4 on t1.xx=
> t4.xx
>   left join lookup_4 FOR SYSTEM_TIME AS OF t1.proc_time AS t5 on t1.xx=
> t5.xx
>   left join lookup_5 FOR SYSTEM_TIME AS OF t1.proc_time AS t6 on t1.xx=
> t6.xx
>   left join lookup_6 FOR SYSTEM_TIME AS OF t1.proc_time AS t7 on t1.xx=
> t7.xx
> ...
>
>   left join lookup_40 FOR SYSTEM_TIME AS OF t1.proc_time AS t40 on t1.xx=
> t40.xx
> ```
>
> I have developed the async lookup feature , but that is not enough, maybe
> the current look up table is serial not parallelism ?
>
> Now I need help about how can I  improve the performance of my sql job .
>
> Best
> White
>
>
>


Re: Is there a way to use stream API with this program?

2020-07-27 Thread Piotr Nowojski
MAX_WATERMARK should be emitted automatically by the
WatermarkAssignerOperator.

Piotrek

pon., 27 lip 2020 o 09:16 Flavio Pompermaier 
napisał(a):

> Yes it could..where should I emit the MAX_WATERMARK and how do I detect
> that the input reached its end?
>
> On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
> wrote:
>
>> In this use case, couldn't the custom trigger register an event time
>> timer for MAX_WATERMARK, which would be triggered when the bounded input
>> reaches its end?
>>
>> David
>>
>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm afraid that there is not out of the box way of doing this. I've
>>> created a ticket [1] to write down and document a discussion that we had
>>> about this issue in the past.
>>>
>>> The issue is that currently, untriggered processing time timers are
>>> ignored on end of input and it seems like there might be no one single
>>> perfect way to handle it for all of the cases, but it probably needs to be
>>> customized.
>>>
>>> Maybe you could:
>>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>>> 2. implement
>>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>>> `MyWindowOperator`
>>> 3. Inside `MyWindowOperator#endInput`  invoke
>>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>>   b) delete manually triggered timer
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>
>>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
>>> napisał(a):
>>>
 Hi to all,
 I was trying to port another job we have that use dataset API to
 datastream.
 The legacy program was doing basically a
 dataset.mapPartition().reduce() so I tried to replicate this thing with a

  final BasicTypeInfo columnType =
 BasicTypeInfo.DOUBLE_TYPE_INFO;
   final DataStream input = env.fromElements(//
 Row.of(1.0), //
 Row.of(2.0), //
 Row.of(3.0), //
 Row.of(5.0), //
 Row.of(6.0)).returns(new RowTypeInfo(columnType));
  inputStream.map(new SubtaskIndexAssigner(columnType))
 .keyBy(t -> t.f0)
 .window(GlobalWindows.create())

 .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
 100L))).
 .process(..)

 Unfortunately the program exits before reaching the Process function
 (moreover I need to add another window + trigger after it before adding the
 reduce function).
 Is there a way to do this with the DataStream API or should I still use
 DataSet API for the moment (when the batch will be fully supported)? I
 append to the footer all the code required to test the job.

 Best,
 Flavio

 -

 package org.apache.flink.stats.sketches;

 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import
 org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;

 public class Test {
   public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 env.setParallelism(1);

 final BasicTypeInfo columnType =
 BasicTypeInfo.DOUBLE_TYPE_IN

Re: How to stream CSV from S3?

2020-07-27 Thread Jingsong Li
Hi John,

Do you mean you want to read S3 CSV files using partition/bucket pruning?

If just using the DataSet API, you can use CsvInputFormat to read csv files.

If you want to use Table/Sql API, In 1.10, Csv format in table not support
partitioned table. So the only way is specific the partition/bucket path,
and read single directory.

In 1.11, the Table/Sql filesystem connector with csv format supports
partitioned table, complete support partition semantics.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 27, 2020 at 10:54 PM John Smith  wrote:

> Hi, using Flink 1.10
>
> 1- How do we go about reading CSV files that are copied to s3 buckets?
> 2- Is there a source that can tail S3 and start reading a CSV when it is
> copied to S3?
> 3- Is that part of the table APIs?
>


-- 
Best, Jingsong Lee


improve the performance of flink sql job which lookup 40+ table

2020-07-27 Thread snack white
HI:
  My  flink  version is 1.10  use per-job mode , my   sql like 

```
select
  column1, t2.xx2, t3.xx3,t4.xx4
  …  t40.xx40
from
  main_table 
  left join lookup_1 FOR SYSTEM_TIME AS OF t1.proc_time AS t2 on t1.xx= t2.xx
  left join lookup_2 FOR SYSTEM_TIME AS OF t1.proc_time AS t3 on t1.xx= t3.xx
  left join lookup_3 FOR SYSTEM_TIME AS OF t1.proc_time AS t4 on t1.xx= t4.xx
  left join lookup_4 FOR SYSTEM_TIME AS OF t1.proc_time AS t5 on t1.xx= t5.xx
  left join lookup_5 FOR SYSTEM_TIME AS OF t1.proc_time AS t6 on t1.xx= t6.xx
  left join lookup_6 FOR SYSTEM_TIME AS OF t1.proc_time AS t7 on t1.xx= t7.xx
...

  left join lookup_40 FOR SYSTEM_TIME AS OF t1.proc_time AS t40 on t1.xx= t40.xx
```

I have developed the async lookup feature , but that is not enough, maybe the 
current look up table is serial not parallelism ?  

Now I need help about how can I  improve the performance of my sql job .

Best 
White 




答复: Flink Session TM Logs

2020-07-27 Thread 范超
Hi Rechard
Maybe you can try using cli
“yarn logs –applicationId yourYarnAppId”
to check your logs or just to find your app logs in the yarn webui

发件人: Richard Moorhead [mailto:richard.moorh...@gmail.com]
发送时间: 2020年7月24日 星期五 23:55
收件人: user 
主题: Flink Session TM Logs

When running a flink session on YARN, task manager logs for a job are not 
available after completion. How do we locate these logs?



Unable to submit high parallelism job in cluster

2020-07-27 Thread Annemarie Burger
Hi,

I am running Flink on a cluster with 24 workers, each with 16 cores.
Starting the cluster works fine and the Web interface confirms there are 384
slots working. Executing my code with parallelism 24 works fine, but when I
try a higher parallelism, eg. 384, the job never succeeds in submitting.
Also submitting from the web interface does not start the job, nor gives any
errors. I also tried starting 4 1-slot taskmanagers on each machine, and
executing with parallelism 96, but same problem. The code is not very
complicated, with the logical graph having only 3 steps. 
Attached is a file with the jstacks of the CliFrontend that is using CPU,
and the StandaloneSessionClusterEntrypoint, as well as the jstack of the
TaskManagerRunner on a remote machine(cloud-12). The jstacks are all from
this last scenario, when executing from command line.
 
My relevant conf is as follows: 

queryable-state.enable: true
jobmanager.rpc.address: cloud-11
jobmanager.rpc.port: 6123
taskmanager.heap.mb: 28672
jobmanager.heap.mb: 14240
taskmanager.memory.fraction: 0.7
taskmanager.network.numberOfBuffers: 16384
taskmanager.network.bufferSizeInBytes: 16384
taskmanager.memory.task.off-heap.size: 4000m
taskmanager.memory.managed.size: 1m
#taskmanager.numberOfTaskSlots: 16 #for normal setup
taskmanager.numberOfTaskSlots: 1 #for when setting multiple taskmanagers per
machine. 

Am I doing something wrong?
Thanks in advance!

  jstack.jstack

  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi Al,
I am looking at the Custom User Metrics to count incoming records by an
incomng attribute, event_name and aggregate it over 5 secs.
I looked at
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
.
I am trying to figure out which one to use Counter or Meter.
If using Counter, how do I reset it after 5 secs.
If using Meter which measures avg throughput, How do i specify a
duration like 5 secs ? markEvent(long n) ???

I am also trying to collect total count of events across all TaskManagers.
Do I collect at flink_taskmanager_job_task__numrecordsIn
or
flink_taskmanager_job_task_operator__numrecordsIn  ?? (so
at task or operator level

Or should I use User variables like below:

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue") //specify my value for
each custom event_name here- I might not know all custom event_names
in advance
  .counter("myCounter");


Pardon my confusion here.
TIA,

On Mon, Jul 27, 2020 at 10:00 AM Vijay Balakrishnan 
wrote:

> Hi David,
> Thanks for your reply.
> I am already using the PrometheusReporter. I am trying to figure out how
> to dig into the application data and count grouped by an attribute called
> event_name in the incoming application data and report to Grafana via
> Prometheus.
>
> I see the following at a high level
> task_numRecordsIn
> task_numRecordsOut
> ..operator_numLateRecordsDropped
>
> Trying to dig in deeper than this numRecordsIn to get groped by event_name
> attribute coming in the Input record every 5 secs.
> TIA,
>
> On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
> wrote:
>
>> Setting up a Flink metrics dashboard in Grafana requires setting up and
>> configuring one of Flink's metrics reporters [1] that is supported by
>> Grafana as a data source. That means your options for a metrics reporter
>> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>>
>> If you want reporting every 5 seconds, with the push based reporters
>> that's something you would configure in flink-conf.yaml, whereas with
>> Prometheus you'll need to configure the scrape interval in the prometheus
>> config file. For more on using Flink with Prometheus, see the blog post by
>> Maximilian Bode [2].
>>
>> Best,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
>> [2]
>> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>>
>> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> I am trying to figure out how many records came into the Flink App from
>>> KDS and how many records got moved to the next step or was dropped by the
>>> watermarks.
>>>
>>> I see on the Ui Table for *Source. Records Sent* with a total and the
>>> next step *Filter->FlatMap operator with a Records Received *total. How
>>> can I get these metric values for me to display In Grafana for eg. as I
>>> want to know a count for each 5 secs, how many records came in and how many
>>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>>
>>> I looked at the breakdown of the Source__Custom_Source in Metrics as
>>> show in the attached pic. It has values like 0.NumRecordsIn and
>>> 0.NumRecordsOut and so on from 0 to 9 for the parallelism 10 I specified.
>>> It also has various breakdowns like 0.Timestamps/Watermarks.numRecordsIn
>>> and 0.Timestamps/Watermarks.numRecordsOut
>>>
>>> Attached are some screenshots of the Flink DashBoard UI.
>>>
>>> TIA,
>>>
>>>


Re: Count of records coming into the Flink App from KDS and at each step through Execution Graph

2020-07-27 Thread Vijay Balakrishnan
Hi David,
Thanks for your reply.
I am already using the PrometheusReporter. I am trying to figure out how to
dig into the application data and count grouped by an attribute called
event_name in the incoming application data and report to Grafana via
Prometheus.

I see the following at a high level
task_numRecordsIn
task_numRecordsOut
..operator_numLateRecordsDropped

Trying to dig in deeper than this numRecordsIn to get groped by event_name
attribute coming in the Input record every 5 secs.
TIA,

On Sat, Jul 25, 2020 at 10:55 AM David Anderson 
wrote:

> Setting up a Flink metrics dashboard in Grafana requires setting up and
> configuring one of Flink's metrics reporters [1] that is supported by
> Grafana as a data source. That means your options for a metrics reporter
> are Graphite, InfluxDB, Prometheus, or the Prometheus push reporter.
>
> If you want reporting every 5 seconds, with the push based reporters
> that's something you would configure in flink-conf.yaml, whereas with
> Prometheus you'll need to configure the scrape interval in the prometheus
> config file. For more on using Flink with Prometheus, see the blog post by
> Maximilian Bode [2].
>
> Best,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter
> [2]
> https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html
>
> On Fri, Jul 24, 2020 at 12:57 AM Vijay Balakrishnan 
> wrote:
>
>> Hi,
>> I am trying to figure out how many records came into the Flink App from
>> KDS and how many records got moved to the next step or was dropped by the
>> watermarks.
>>
>> I see on the Ui Table for *Source. Records Sent* with a total and the
>> next step *Filter->FlatMap operator with a Records Received *total. How
>> can I get these metric values for me to display In Grafana for eg. as I
>> want to know a count for each 5 secs, how many records came in and how many
>> were filtered out by the watermark or my Custom Filter operator etc  ?
>>
>> I looked at the breakdown of the Source__Custom_Source in Metrics as show
>> in the attached pic. It has values like 0.NumRecordsIn and 0.NumRecordsOut
>> and so on from 0 to 9 for the parallelism 10 I specified. It also has
>> various breakdowns like 0.Timestamps/Watermarks.numRecordsIn and
>> 0.Timestamps/Watermarks.numRecordsOut
>>
>> Attached are some screenshots of the Flink DashBoard UI.
>>
>> TIA,
>>
>>


[ANNOUNCE] Weekly Community Update 2020/29-30

2020-07-27 Thread Konstantin Knauf
Dear community,

happy to share an update for the last two weeks with the release of Apache
Flink 1.11.1, planning for Flink 1.12, a proposal for better
interoperability with Microsoft Azure services, a few blog posts and more.

Flink Development
==

* [releases] Flink 1.11.1 was released as a quick follow up to the Flink
1.11.0 release mostly fixing some critical issues in the Table API/SQL
ecosystem. [1]

* [releases] Robert started a thread to collect the different
topics/features that are planned for Flink 1.12. Robert & Dian will be our
release managers for this one. They propose a feature freeze around the end
of September. [2]

* [connectors] Israel Ekpo started a thread to discuss the contribution of
multiple connectors for Microsoft Azure services including Data Lake Store
Gen 2 (Filesystem), Azure Cosmos DB  (DataStream) and Azure Event Hub
(DataStream). [3]

* [sql] Seth has started a small discussion on how to handle timestamps if
a "datagen" table is created based on an existing table using the LIKE
clause. [4]

* [connectors] Benchao raised the point that the semantic of
InputFormat#nextRecord returning null is inconsistent throughout the code
case and would like to align these. No feedback so far. [5]

* [development process] Andrey reminds everyone to assign the "starter"
label to Jira issues, which are a good pick for new contributors to Apache
Flink. [6]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-1-released-tp43335.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Planning-Flink-1-12-tp43348.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-Azure-Platform-Support-in-DataStream-Table-and-SQL-Connectors-tp43342.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Handling-of-Timestamp-in-DataGen-table-created-via-LIKE-td43433.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Align-the-semantic-of-returning-null-from-InputFormat-nextRecord-tp43379.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/REMINDER-Use-starter-labels-for-Jira-issues-where-it-makes-sense-tp43284.html

Notable Bugs
==

* [FLINK-18705] [FLINK-18700] [1.11.1] For those of you, who are trying out
the new Debezium format, check out the limitations reported in [7,8].
* [FLINK-18656] [1.11.1] The checkpoint start delay metric is always zero
when unaligned checkpoints are used. [9]

[7] https://issues.apache.org/jira/browse/FLINK-18705
[8] https://issues.apache.org/jira/browse/FLINK-18700
[9] https://issues.apache.org/jira/browse/FLINK-18656

Events, Blog Posts, Misc
===

 * Two new posts on the Flink blog:
*  Dawid gives an overview over (external) catalogs (e.g.
HiveMetastore, PostgreSQL) in Flink. [10]
*  Kostas introduces the newly added "Application Mode" and contrasts
it to the two existing modes: "Session Mode" & "Per-Job Mode". [11]

* In this blog post Eric J. Bruno of Dell explains in detail how Apache
Flink can be used for complex event processing and streaming analytics. [12]

* On the 23rd, the Apache Flink meetup group in Seoul hosted a virtual
session with talks by SK Telecom (Korean), HyperConnect (Korean) and
Ververica (English). It is available on Youtube [13].

* We have published the training program for Flink Forward Global taking
place on the 20th & 21st of October. [14] There will be six different
courses offered over these two days:
* Flink Development (2 days)
* SQL Development (2 days)
* Runtime & Operations (1 day)
* Stateful Functions (1 day)
* Tuning & Troubleshooting (introduction and advanced, 1 day each).

[10] https://flink.apache.org/2020/07/23/catalogs.html
[11] https://flink.apache.org/news/2020/07/14/application-mode.html
[12]
https://blogs.oracle.com/javamagazine/streaming-analytics-with-java-and-apache-flink
[13] https://www.youtube.com/watch?v=HWTb5kn4LvE
[14] https://www.flink-forward.org/global-2020/training-program

Cheers,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


JobManager refusing connections when running many jobs in parallel?

2020-07-27 Thread Hailu, Andreas
Hi team,

We've observed that when we submit a decent number of jobs in parallel from a 
single Job Master, we encounter job failures due with Connection Refused 
exceptions. We've seen this behavior start at 30 jobs running in parallel. It's 
seemingly transient, however, as upon several retries the job succeeds. The 
surface level error varies, but digging deeper in stack traces it looks to stem 
from the Job Manager no longer accepting connections.

I've included a couple of examples below from failed jobs' driver logs, with 
different errors stemming from a connection refused error:

First example: 15 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 30 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-563.dc.gs.com: Using job manager web tracking 
url http://d43723-563.dc.gs.com:41268";> Job Manager Web Interface  
(http://d43723-563.dc.gs.com:41268) 
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 1dfef6303cf0e888231d4c57b4b4e0e6)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
...
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:273)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:341)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
... 1 more
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: d43723-563.dc.gs.com/10.47.126.221:41268
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 16 more
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: d43723-563.dc.gs.com/10.47.126.221:41268
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection refused

Second example: 30 Task Managers/2 cores/4096 Job Manager memory/12288 Task 
Manager memory - 60 jobs submitted in parallel, each with parallelism of 1
Job Manager is running @ d43723-484.dc.gs.com: Using job manager web tracking 
url http://d

Re:  problem with build from source flink 1.11

2020-07-27 Thread Timo Walther

Great to hear. Thanks for letting us know.

Regards,
Timo

On 27.07.20 17:58, Felipe Lolas wrote:

Seems fixed!

I was replacing only flink-dist.jar. When replaced all the compiled 
jar's from flink-1.1.0-bin fixed the issue.


Thanks!

El 27 de julio de 2020 4:28, Felipe Lolas  escribió:


Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 

Re: problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas

Seems fixed!

I was replacing only flink-dist.jar. When replaced all the compiled jar's from 
flink-1.1.0-bin fixed the issue.

Thanks!

El 27 de julio de 2020 4:28, Felipe Lolas  escribió:

Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.(SpanningRecordSerial

Re: problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas

Hi!! Timo and Chesnay:

Thanks for helping!!!

Here is the full stack trace:
2020-07-27 05:27:38,661 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
insert-into_default_catalog.default_database.print_table 
(ca40bd10a729f5cad56a7db6bef17a6f) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_112]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_112]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
at 
org.apache.flink.core.memory.DataOutputSerializer.wrapAsByteBuffer(DataOutputSerializer.java:65)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.(SpanningRecordSerializer.java:50)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.(RecordWriter.java:98)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
  

How to stream CSV from S3?

2020-07-27 Thread John Smith
Hi, using Flink 1.10

1- How do we go about reading CSV files that are copied to s3 buckets?
2- Is there a source that can tail S3 and start reading a CSV when it is
copied to S3?
3- Is that part of the table APIs?


Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-27 Thread Timo Walther

Hi Dmytro,

aggregate functions will support the new type system in Flink 1.12. 
Until then, they cannot be used with the new `call()` syntax as 
anonymous functions. In order to use the old type system, you need to 
register the function explicilty using SQL `CREATE FUNCTION a AS 
'myFunc'` and then use them in `call("myFunc", ...)`.


The mentioned "No match found for function signature fun()" was 
a bug that got fixed in 1.11.1:


https://issues.apache.org/jira/browse/FLINK-18520

This bug only exists for catalog functions, not temporary system functions.

Regards,
Timo


On 27.07.20 16:35, Dmytro Dragan wrote:

Hi All,

I see strange behavior of UDAF functions:

Let`s say we have a simple table:

EnvironmentSettings settings = 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();

TableEnvironment t = TableEnvironment./create/(settings);

Table table = t.fromValues(DataTypes./ROW/(
     DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()),
DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull())
     ),
/row/(1.0, "S"), /row/(2.0, "S"));
t.createTemporaryView("A", table);

As example we will use build-in function with a new name:

t.createTemporaryFunction("max_value", new 
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());


Using Table API we can write:

t.createTemporaryView("B", table
     .groupBy(/$/("symbol"))
     .select(/$/("symbol"),/call/("max_value", /$/("price")))
);

and get:

org.apache.flink.table.api.TableException: Aggregate functions are not 
updated to the new type system yet.


Using SQL API we can write:

t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A 
group by symbol"));


and get:

org.apache.flink.table.api.ValidationException: SQL validation failed. 
 From line 1, column 8 to line 1, column 23: No match found for function 
signature max_value()


Calling build-in max function instead of provided alias will produce 
correct results.


In addition,  non-retract implementation of max function 
(MaxAggFunction.DoubleMaxAggFunction) would produce:


org.apache.flink.table.api.ValidationException: Could not register 
temporary catalog function 'default_catalog.default_database.max_value' 
due to implementation errors.


Cause DoubleMaxAggFunction is not serializable.

Am I missing something?





Flink 1.11.0 UDAF fails with "No match found for function signature fun()"

2020-07-27 Thread Dmytro Dragan
Hi All,

I see strange behavior of UDAF functions:

Let`s say we have a simple table:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment t = TableEnvironment.create(settings);


Table table = t.fromValues(DataTypes.ROW(
DataTypes.FIELD("price", DataTypes.DOUBLE().notNull()),
DataTypes.FIELD("symbol", DataTypes.STRING().notNull())
),
row(1.0, "S"), row(2.0, "S"));
t.createTemporaryView("A", table);

As example we will use build-in function with a new name:

t.createTemporaryFunction("max_value", new 
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());

Using Table API we can write:

t.createTemporaryView("B", table
.groupBy($("symbol"))
.select($("symbol"),call("max_value", $("price")))
);
and get:
org.apache.flink.table.api.TableException: Aggregate functions are not updated 
to the new type system yet.

Using SQL API we can write:

t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A group by 
symbol"));
and get:
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 1, column 8 to line 1, column 23: No match found for function signature 
max_value()

Calling build-in max function instead of provided alias will produce correct 
results.

In addition,  non-retract implementation of max function 
(MaxAggFunction.DoubleMaxAggFunction) would produce:
org.apache.flink.table.api.ValidationException: Could not register temporary 
catalog function 'default_catalog.default_database.max_value' due to 
implementation errors.
Cause DoubleMaxAggFunction is not serializable.

Am I missing something?






Re: problem with build from source flink 1.11

2020-07-27 Thread Chesnay Schepler

@Timo maven 3.2.5 is the recommended Maven version for building Flink.

@Felipe Can you provide us the full stacktrace? This could be a library 
issue in regards to JDK compatibility.


On 27/07/2020 15:23, Timo Walther wrote:

Hi Felipe,

are you sure that Maven and the TaskManagers are using the JDK version 
that you mentioned?


Usually, a `mvn clean install` in the `.../flink/` directory should 
succeed without any problems. Also your Maven version seems pretty 
old. I'm using Apache Maven 3.6.3 for example.


The NoSuchMethodError indicates that there is some version mismatch. 
It seems that this version mismatch is related to your JDK version. 
Maybe your task managers run a different version?


Let me know if this helped.

Regards,
Timo


On 27.07.20 12:09, Felipe Lolas wrote:

Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark 
streaming workloads).


Now, I currently testing some changes into flink-yarn, but when using 
my builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;


*env(build)*
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

*command(into flink-parent)*

mvn clean install -DskipTests -Dfast

*env*
yarn application mode
cdh 6.2.1
*
*
can anyone help me?

Thank you!
Cheers,
Felipe L







Re: problem with build from source flink 1.11

2020-07-27 Thread Timo Walther

Hi Felipe,

are you sure that Maven and the TaskManagers are using the JDK version 
that you mentioned?


Usually, a `mvn clean install` in the `.../flink/` directory should 
succeed without any problems. Also your Maven version seems pretty old. 
I'm using Apache Maven 3.6.3 for example.


The NoSuchMethodError indicates that there is some version mismatch. It 
seems that this version mismatch is related to your JDK version. Maybe 
your task managers run a different version?


Let me know if this helped.

Regards,
Timo


On 27.07.20 12:09, Felipe Lolas wrote:

Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark 
streaming workloads).


Now, I currently testing some changes into flink-yarn, but when using my 
builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;


*env(build)*
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

*command(into flink-parent)*

mvn clean install -DskipTests -Dfast

*env*
yarn application mode
cdh 6.2.1
*
*
can anyone help me?

Thank you!
Cheers,
Felipe L




来自kandy.wang的邮件

2020-07-27 Thread kandy.wang



Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Hi,
You can use the `set_python_requirements` method to specify your
requirement.txt which you can refer to the documentation[1] for details

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/dependency_management.html#python-dependency

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午8:29写道:

> Hi,
> And I've got another question.
> If I use user-defined function in pyflink, which only depends library A.
> And
> what the flink does is using the udf in tables.
> Does that mean I only need to install library A on the slaves?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
Hi,
And I've got another question.
If I use user-defined function in pyflink, which only depends library A. And
what the flink does is using the udf in tables.
Does that mean I only need to install library A on the slaves?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Parquet batch table sink in Flink 1.11

2020-07-27 Thread Flavio Pompermaier
I think that's not true when you need to integrate Flink into an existing
data-lake..I think it should be very straightforward (in my opinion) to
read/ write Parquet data with objects serialized with
avro/thrift/protobuf...or at least reuse hadoop input/output formats with
table API. At the moment I have to pass through a lot of custom code that
uses the Hadoop formats and is a lto of code just to read and write thrift
or avro serialized objects in parquet folders.

On Wed, Jul 22, 2020 at 3:35 AM Jingsong Li  wrote:

> In table/SQL,
>
> I think we don't need a source/sink for `AvroParquetOutputFormat`, because
> the data structure is always Row or RowData, should not be a avro object.
>
> Best,
> Jingsong
>
> On Tue, Jul 21, 2020 at 8:09 PM Flavio Pompermaier 
> wrote:
>
>> This is what I actually do but I was hoping to be able to get rid of the
>> HadoopOutputForma and be able to use a  more comfortable Source/Sink
>> implementation.
>>
>> On Tue, Jul 21, 2020 at 12:38 PM Jingsong Li 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> AvroOutputFormat only supports writing Avro files.
>>> I think you can use `AvroParquetOutputFormat` as a hadoop output format,
>>> and wrap it through Flink `HadoopOutputFormat`.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Fri, Jul 17, 2020 at 11:59 PM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,
 is there a way to write out Parquet-Avro data using
 BatchTableEnvironment with Flink 1.11?
 At the moment I'm using the hadoop ParquetOutputFormat but I hope to be
 able to get rid of it sooner or later..I saw that there's the
 AvroOutputFormat but no support for it using Parquet.

 Best,
 Flavio

>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Yes. You are right.

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午6:30写道:

> Hi, Xingbo
> Thanks for your reply.
> So the point is that simply link the source or the sink to the master's
> local file system will cause the error that the slaves cannot read the
> source/sink files? Thus the simplest solution is to make sure that slaves
> have access to the master's local filesystem (by nfs or hdfs)?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


答复: How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread 范超
Thanks Jake , I’ll try it out. It worked!

发件人: Jake [mailto:ft20...@qq.com]
发送时间: 2020年7月27日 星期一 18:33
收件人: 范超 
抄送: user (user@flink.apache.org) 
主题: Re: How to get CLI parameters when deploy on yarn cluster

Hi fanchao

You can use params after jar file.

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar param1 param2 param3

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html

Jake


On Jul 27, 2020, at 6:19 PM, 范超 mailto:fanc...@mgtv.com>> 
wrote:

Hi, Flink community

I’m starter at Flink ,and don’t know how to passing parameters to my jar file, 
where I want to start the job in detached mode on the yarn cluster.
Here is my shell code:

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar -runat=test 2>&1

In my jar file, the code will use different config.properties file by reading 
the “runat” CLI parameter, but I don’t know how to get this CLI parameter
or could you please tell me if I’ve two environment one for testing and the 
other for production environment property files, how can I start it using cli 
option?

Thanks a lot , Any help is appreciated.

Chao fan




Re: How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread Jake
Hi fanchao

You can use params after jar file.

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar param1 param2 param3

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
 


Jake

> On Jul 27, 2020, at 6:19 PM, 范超  wrote:
> 
> Hi, Flink community
> 
> I’m starter at Flink ,and don’t know how to passing parameters to my jar 
> file, where I want to start the job in detached mode on the yarn cluster.
> Here is my shell code:
> 
> /usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
> ~/project/test/app/test.jar -runat=test 2>&1
> 
> In my jar file, the code will use different config.properties file by reading 
> the “runat” CLI parameter, but I don’t know how to get this CLI parameter
> or could you please tell me if I’ve two environment one for testing and the 
> other for production environment property files, how can I start it using cli 
> option?
> 
> Thanks a lot , Any help is appreciated.
> 
> Chao fan
> 



Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Tom Fennelly
No, I think David answered the specific question that I asked i.e. is it
okay (or not) for operators other than sinks and side outputs to do I/O.
Purging DLQ entries is something we'll need to be able to do anyway (for
some scenarios - aside from successful checkpoint retries) and I
specifically wasn't asking about sink functions.

On Mon, Jul 27, 2020 at 10:08 AM Stephen Connolly 
wrote:

> I am not 100% certain that David is talking about the same pattern of
> usage that you are Tom.
>
> David, the pattern Tom is talking about is something like this...
>
> try {
>   do something with record
> } catch (SomeException e) {
>   push record to DLQ
> }
>
> My concern is that if we have a different failure, or even a restart from
> checkpoint because say the task manager OOM'd or was killed... now the
> record is replayed... and this time the "do something with record"
> succeeded... but it's still on the DLQ from last time
>
> If the DLQ is a flink native output that pushes to an exactly-once sink
> then you do not have that issue. When you roll the side-output behind
> flinks back... then you have to take all those potentials into account
> which significantly complicates the code
>
> On 2020/07/27 07:45:27, Tom Fennelly  wrote:
> > Thank you David.
> >
> > In the case we have in mind it should only happen literally on the very
> > rare Exception i.e. in some cases if somehow an uncaught exception
> occurs,
> > we want to send the record to a DLQ and handle the retry manually Vs
> > checkpointing and restarting.
> >
> > Regards,
> >
> > Tom.
> >
> >
> > On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
> > wrote:
> >
> > > Every job is required to have a sink, but there's no requirement that
> all
> > > output be done via sinks. It's not uncommon, and doesn't have to cause
> > > problems, to have other operators that do I/O.
> > >
> > > What can be problematic, however, is doing blocking I/O. While your
> user
> > > function is blocked, the function will exert back pressure, and
> checkpoint
> > > barriers will be unable to make any progress. This sometimes leads to
> > > checkpoint timeouts and job failures. So it's recommended to make any
> I/O
> > > you do asynchronous, using an AsyncFunction [1] or something similar.
> > >
> > > Note that the asynchronous i/o function stores the records for
> in-flight
> > > asynchronous requests in checkpoints, and restores/re-triggers the
> requests
> > > when recovering from a failure. This might lead to duplicate results
> if you
> > > are using it to do non-idempotent database writes. If you need
> > > transactions, use a sink that offers them.
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html
> >
> > >
> > > Best,
> > > David
> > >
> > > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly  >
> > > wrote:
> > >
> > >> Hi.
> > >>
> > >> What are the negative side effects of (for example) a filter function
> > >> occasionally making a call out to a DB ? Is this a big no-no and
> should all
> > >> outputs be done through sinks and side outputs, no exceptions ?
> > >>
> > >> Regards,
> > >>
> > >> Tom.
> > >>
> > >
> >
>


Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
Hi, Xingbo
Thanks for your reply.
So the point is that simply link the source or the sink to the master's
local file system will cause the error that the slaves cannot read the
source/sink files? Thus the simplest solution is to make sure that slaves
have access to the master's local filesystem (by nfs or hdfs)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to get CLI parameters when deploy on yarn cluster

2020-07-27 Thread 范超
Hi, Flink community

I’m starter at Flink ,and don’t know how to passing parameters to my jar file, 
where I want to start the job in detached mode on the yarn cluster.
Here is my shell code:

/usr/local/flink/bin/flink run -m yarn-cluster -d -p 3 
~/project/test/app/test.jar -runat=test 2>&1

In my jar file, the code will use different config.properties file by reading 
the “runat” CLI parameter, but I don’t know how to get this CLI parameter
or could you please tell me if I’ve two environment one for testing and the 
other for production environment property files, how can I start it using cli 
option?

Thanks a lot , Any help is appreciated.

Chao fan
<>

Re: Pyflink 1.10.0 issue on cluster

2020-07-27 Thread Xingbo Huang
Hi rookieCOder,
You need to make sure that your files can be read by each slaves, so an
alternative solution is to put your files on hdfs

Best,
Xingbo

rookieCOder  于2020年7月27日周一 下午5:49写道:

> 'm coding with pyflink 1.10.0 and building cluster with flink 1.10.0
> I define the source and the sink as following:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2674/%E6%97%A0%E6%A0%87%E9%A2%98.png>
>
> When I run this code only on master, it's OK. When I run this code on
> cluster, with 1 master and 1 salve, and I submit the task on master like
> this:
> sudo flink-1.10.0/bin/flink run -py main.py
> And error occurs like:
> Caused by: java.io.FileNotFoundException: The provided file path
> /opt/raw_data/input_json_65adbe54-cfdc-11ea-9d47-020012970011 does not
> exist.
> This file is stored on master's local file system. It seems that the slaves
> read
> their own file system instead of the master's. Or maybe there are other
> reasons.
> The question is how can I avoid the error?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


problem with build from source flink 1.11

2020-07-27 Thread Felipe Lolas
Hi,

Im Felipe, just started learning flink a few weeks ago(moving spark streaming 
workloads). 

Now, I currently testing some changes into flink-yarn, but when using my 
builded flink-dist.jar, the Job in TaskManager fails because of: 
java.lang.NoSuchMethodError: 
java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

env(build)
flink-1.11.0
maven 3.2.5
jdk 1.8
macox

command(into flink-parent)

mvn clean install -DskipTests -Dfast

env
yarn application mode
cdh 6.2.1

can anyone help me?

Thank you!
Cheers,
Felipe L

Pyflink 1.10.0 issue on cluster

2020-07-27 Thread rookieCOder
'm coding with pyflink 1.10.0 and building cluster with flink 1.10.0
I define the source and the sink as following:

 
When I run this code only on master, it's OK. When I run this code on
cluster, with 1 master and 1 salve, and I submit the task on master like
this:
sudo flink-1.10.0/bin/flink run -py main.py 
And error occurs like:
Caused by: java.io.FileNotFoundException: The provided file path
/opt/raw_data/input_json_65adbe54-cfdc-11ea-9d47-020012970011 does not
exist.
This file is stored on master's local file system. It seems that the slaves
read
their own file system instead of the master's. Or maybe there are other
reasons.
The question is how can I avoid the error?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther

Hi Dian,

we had this discussion in the past. Yes, it might help in certain cases. 
But on the other hand also helps in finding version mismatches when 
people misconfigured there dependencies. Different JVM versions should 
not result incompatible classes as the default serialVersionUID is 
standadized, no?


Regards,
Timo

On 27.07.20 10:53, Dian Fu wrote:
@Wojtek Just find that it has not defined the serialVersionUID in 
org.apache.flink.table.types.logical.RowType$RowField and so you have to 
make sure that the JDK version is the same between the client side and 
the server side. Could you check that?


@Timo I think we should define the serialVersionUID for all the classes 
which implements Serializable. What do you think?


Regards,
Dian

在 2020年7月27日,下午4:38,Timo Walther > 写道:


Hi,

the InvalidClassException indicates that you are using different 
versions of the same class. Are you sure you are using the same Flink 
minor version (including the Scala suffix) for all dependencies and 
Kubernetes?


Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:

Hi,
when I try it locally it runs well. The problem is when I run it 
using Kubernetes. I don't know how to make Flink and Kubernetes go 
well together in that case.

Best, Wojtek
pt., 24 lip 2020 o 17:51 Xingbo Huang > napisał(a):

   Hi Wojciech,
   In many cases, you can make sure that your code can run correctly in
   local mode, and then submit the job to the cluster for testing. For
   how to add jar packages in local mode, you can refer to the doc[1].
   Besides, you'd better use blink planner which is the default
   planner. For how to use blink planner, you can refer to the doc[2]
   [1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
   [2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
   Best,
   Xingbo
   Wojciech Korczyński 

   > 于2020年7月24日周五 下午
   9:40写道:
   Hi,
   I've done like you recommended:
   from pyflink.datastreamimport StreamExecutionEnvironment
   from pyflink.datasetimport ExecutionEnvironment
   from pyflink.tableimport TableConfig, DataTypes, 
BatchTableEnvironment, StreamTableEnvironment, ScalarFunction
   from pyflink.table.descriptorsimport Schema, OldCsv, 
FileSystem, Kafka, Json, Csv

   from pyflink.table.udfimport udf
   exec_env = StreamExecutionEnvironment.get_execution_environment()
   t_config = TableConfig()
   t_env = StreamTableEnvironment.create(exec_env, t_config)
   INPUT_TABLE ="my_topic"
   INPUT_TOPIC ="my-topic"
   LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
   OUTPUT_TABLE ="my_topic_output"
   OUTPUT_TOPIC ="my-topic-output"
   ddl_source =f"""
   CREATE TABLE {INPUT_TABLE}(
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{INPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """
   ddl_sink =f"""
   CREATE TABLE {OUTPUT_TABLE}(
   message STRING
   ) WITH (
   'connector' = 'kafka',
   'topic' = '{OUTPUT_TOPIC}',
   'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
   'format' = 'json'
   )
   """
   t_env.execute_sql(ddl_source)
   t_env.execute_sql(ddl_sink)
   result = t_env.execute_sql(f"""
   INSERT INTO {OUTPUT_TABLE}
   SELECT message
   FROM {INPUT_TABLE}
   """)
   result.get_job_client().get_job_execution_result().result()
   I think it is correctly written.
   However after deploying that job I'm getting an error:
   wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 
-py kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar

   WARNING: An illegal reflective access operation has occurred
   WARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar) 
to field java.util.Properties.serialVersionUID
   WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
   WARNING: Use --illegal-access=warn to enable warnings of 
further illegal reflective access operations
   WARNING: All illegal access operations will be denied in a 
future release

   Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
   Traceback (most recent call last):
  File "kafka2flink.py", line 62, in 
    result.get_job_client().get_job_execution_result().result()
  File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/fl

Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed

2020-07-27 Thread Timo Walther

Hi Dmytro,

one major difference between legacy and Blink planner is that the Blink 
planner is not build on top of DataStream API. It uses features of lower 
levels (StreamOperator, Transformation). In the mid-term we want to 
remove the check and make Table API and DataStream API 100% back and 
forth compatible for batch and streaming.


"there is no way to create/retract stream": What are you planning to do 
with the created stream? If you want to sink it into an external system, 
the new FLIP-95 sinks support all changelog semantics now.


Regards,
Timo


On 24.07.20 17:49, Dmytro Dragan wrote:

Hi Timo,
Thank you for response.

Well, it was working.
We have a number of pipelines in production which reuse DataStream and Table 
API parts on Flink 1.10, both for stream and batch.
The same that simple case without aggregation would work in Flink 1.11

But let`s assume there are some incompatible changes and such approach would 
not work anymore.

In case of TableEnvironment there is no way to create/retract stream.
I would assume that it is possible to wrapped stream in bounded 
StreamTableSource/ StreamTableSink
and use deprecated TableEnvironment methods to register them, but I`m wonder if 
there is a better way to do it.

It sounds a quite strange that with having Blink planner which optimise 
DataStream pipelines for stream and batch jobs,
there is necessity to write the same things on DataStream and DataSet API.


On 24/07/2020, 15:36, "Timo Walther"  wrote:

 Hi Dmytro,
 
 `StreamTableEnvironment` does not support batch mode currently. Only

 `TableEnvironment` supports the unified story. I saw that you disabled
 the check in the `create()` method. This check exists for a reason.
 
 For batch execution, the planner sets specific properties on the stream

 graph that the StreamExecutionEnvironment cannot handle (e.g. blocking
 inputs). My guess would be that this is the reason for your exception.
 
 Have you tried to use the unified `TableEnvironment`?
 
 Regards,

 Timo
 
 
 
 
 On 23.07.20 15:14, Dmytro Dragan wrote:

 > Hi All,
 >
 > We are working on migration existing pipelines from Flink 1.10 to Flink
 > 1.11.
 >
 > We are using Blink planner and have unified pipelines which can be used
 > in stream and batch mode.
 >
 > Stream pipelines works as expected, but batch once fail on Flink 1.11 if
 > they have any table aggregation transformation.
 >
 > Simple example of failed pipeline:
 >
 > StreamExecutionEnvironment env =
 > StreamExecutionEnvironment./getExecutionEnvironment/();
 > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);
 >
 > TableConfig tableConfig = new TableConfig();
 > tableConfig.setIdleStateRetentionTime(
 >  org.apache.flink.api.common.time.Time./minutes/(10),
 > org.apache.flink.api.common.time.Time./minutes/(30)
 > );
 > EnvironmentSettings settings =
 > 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
 >
 > // is created using work around with ignoring settings.isStreamingMode()
 > check
 > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig);
 >
 > DataStreamSource streamSource = env.fromCollection(/asList/(new
 > A("1"), new A("2")));
 >
 > Table table = tEnv.fromDataStream(streamSource);
 > tEnv.createTemporaryView("A", table);
 >
 > String sql = "select s from A group by s";
 >
 > tEnv
 > .toRetractStream(tEnv.sqlQuery(sql), Row.class)
 >   .flatMap(new RetractFlatMap())
 >   .map(Row::toString)
 >   .addSink(new TestSinkFunction<>());
 >
 > env.execute("");
 >
 > /values/.forEach(System./out/::println);
 >
 > Exception:
 >
 > Caused by: java.lang.IllegalStateException: Trying to consume an input
 > partition whose producer is not ready (result type: BLOCKING, partition
 > consumable: false, producer state: DEPLOYING, partition id:
 > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b).
 >
 >  at
 > 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242)
 >
 >  …
 >
 > Adding StreamTableEnvironment execute does not help.
 >
 > Could you please advise what I`m missing?
 >
 
 
 





Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Stephen Connolly
I am not 100% certain that David is talking about the same pattern of usage 
that you are Tom.

David, the pattern Tom is talking about is something like this...

try {
  do something with record
} catch (SomeException e) {
  push record to DLQ
}

My concern is that if we have a different failure, or even a restart from 
checkpoint because say the task manager OOM'd or was killed... now the record 
is replayed... and this time the "do something with record" succeeded... but 
it's still on the DLQ from last time

If the DLQ is a flink native output that pushes to an exactly-once sink then 
you do not have that issue. When you roll the side-output behind flinks back... 
then you have to take all those potentials into account which significantly 
complicates the code

On 2020/07/27 07:45:27, Tom Fennelly  wrote: 
> Thank you David.
> 
> In the case we have in mind it should only happen literally on the very
> rare Exception i.e. in some cases if somehow an uncaught exception occurs,
> we want to send the record to a DLQ and handle the retry manually Vs
> checkpointing and restarting.
> 
> Regards,
> 
> Tom.
> 
> 
> On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
> wrote:
> 
> > Every job is required to have a sink, but there's no requirement that all
> > output be done via sinks. It's not uncommon, and doesn't have to cause
> > problems, to have other operators that do I/O.
> >
> > What can be problematic, however, is doing blocking I/O. While your user
> > function is blocked, the function will exert back pressure, and checkpoint
> > barriers will be unable to make any progress. This sometimes leads to
> > checkpoint timeouts and job failures. So it's recommended to make any I/O
> > you do asynchronous, using an AsyncFunction [1] or something similar.
> >
> > Note that the asynchronous i/o function stores the records for in-flight
> > asynchronous requests in checkpoints, and restores/re-triggers the requests
> > when recovering from a failure. This might lead to duplicate results if you
> > are using it to do non-idempotent database writes. If you need
> > transactions, use a sink that offers them.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > 
> >
> > Best,
> > David
> >
> > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly 
> > wrote:
> >
> >> Hi.
> >>
> >> What are the negative side effects of (for example) a filter function
> >> occasionally making a call out to a DB ? Is this a big no-no and should all
> >> outputs be done through sinks and side outputs, no exceptions ?
> >>
> >> Regards,
> >>
> >> Tom.
> >>
> >
> 


Re: Unable to deduce RocksDB api calls in streaming.

2020-07-27 Thread Timo Walther

Hi Aviral,

as far as I know we are not calling RocksDB API to perform snapshots. As 
the Stackoverflow answer also indicates most of the snapshotting is done 
outside of RocksDB by just dealing with the SST files. Have you checked 
the available metrics in the web UI?


https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html

But maybe a RocksDB expert (in CC) knows more about this topic.

Regards,
Timo

On 27.07.20 00:02, Aviral Srivastava wrote:

Hi all!

I want to profile the time taken to make snapshot calls to RocksDB when 
using Flink in streaming mode.


I have forked the flink core repo, added the example of fraud detection, 
configured the state backend and checkpointing. The program is running 
successfully.


I have also been able to import the code base into IntelliJ IDE and 
debug it. However, I could not trace the function calls to the RocksDB. 
Is there any documentation that would help me out in this regard? Or can 
someone help me here?


Corresponding question on SO: 
https://stackoverflow.com/questions/63103295/what-are-the-api-calls-made-to-create-snapshot-while-using-rocksdb-as-the-state


Best,
Aviral Srivastava
LinkedIn  | Website 





Re: Kafka connector with PyFlink

2020-07-27 Thread Timo Walther

Hi,

the InvalidClassException indicates that you are using different 
versions of the same class. Are you sure you are using the same Flink 
minor version (including the Scala suffix) for all dependencies and 
Kubernetes?


Regards,
Timo


On 27.07.20 09:51, Wojciech Korczyński wrote:

Hi,

when I try it locally it runs well. The problem is when I run it 
using Kubernetes. I don't know how to make Flink and Kubernetes go well 
together in that case.


Best, Wojtek

pt., 24 lip 2020 o 17:51 Xingbo Huang > napisał(a):


Hi Wojciech,
In many cases, you can make sure that your code can run correctly in
local mode, and then submit the job to the cluster for testing. For
how to add jar packages in local mode, you can refer to the doc[1].
Besides, you'd better use blink planner which is the default
planner. For how to use blink planner, you can refer to the doc[2]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment

Best,
Xingbo

Wojciech Korczyński mailto:wojciech.korczyn...@alphamoon.ai>> 于2020年7月24日周五 下午
9:40写道:

Hi,

I've done like you recommended:

from pyflink.datastreamimport StreamExecutionEnvironment
from pyflink.datasetimport ExecutionEnvironment
from pyflink.tableimport TableConfig, DataTypes, BatchTableEnvironment, 
StreamTableEnvironment, ScalarFunction
from pyflink.table.descriptorsimport Schema, OldCsv, FileSystem, Kafka, 
Json, Csv
from pyflink.table.udfimport udf

exec_env = StreamExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE ="my_topic"
INPUT_TOPIC ="my-topic"
LOCAL_KAFKA ='my-cluster-kafka-bootstrap:9092'
OUTPUT_TABLE ="my_topic_output"
OUTPUT_TOPIC ="my-topic-output"

ddl_source =f"""
CREATE TABLE {INPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{INPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

ddl_sink =f"""
CREATE TABLE {OUTPUT_TABLE}(
message STRING
) WITH (
'connector' = 'kafka',
'topic' = '{OUTPUT_TOPIC}',
'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
'format' = 'json'
)
"""

t_env.execute_sql(ddl_source)
t_env.execute_sql(ddl_sink)

result = t_env.execute_sql(f"""
INSERT INTO {OUTPUT_TABLE}
SELECT message
FROM {INPUT_TABLE}
""")

result.get_job_client().get_job_execution_result().result()

I think it is correctly written.

However after deploying that job I'm getting an error:

wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
/home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by 
org.apache.flink.api.java.ClosureCleaner 
(file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
 to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further 
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future 
release
Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
Traceback (most recent call last):
   File "kafka2flink.py", line 62, in 
     result.get_job_client().get_job_execution_result().result()
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
 line 78, in result
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
 line 147, in deco
   File 
"/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramIn

Re: Flink Session TM Logs

2020-07-27 Thread Yang Wang
Just share another method about how to access the finished TaskManager logs
on Yarn.

Currently, only when a Yarn application finished/failed/killed, the logs
will be aggregated to HDFS. That means
if the Flink application is still running, you could still use the Yarn
NodeManager webUI to access the TM logs. Please
build the url in the following schema.

http:///node/containerlogs/container_xx//taskmanager.log



Best,
Yang

Robert Metzger  于2020年7月27日周一 下午2:42写道:

> Hi Richard,
> thanks for forwarding my answer to the list!
>
> I fear that Flink does not have a built-in solution for serving the logs
> of a finished TaskManager while a YARN session is still running.
>
> I agree with Yangze that you probably have to rely on an external logging
> service, such as ElasticSearch or Splunk to index your log events.
> Maybe there's also some tooling from Cloudera specifically made for YARN?
>
>
>
> On Mon, Jul 27, 2020 at 3:46 AM Yangze Guo  wrote:
>
>> Hi, Richard
>>
>> Before the session has been terminated, you could not fetch the
>> terminated TM logs. One possible solution could be leveraging the
>> log4j2 appenders[1]. Flink uses log4j2 as default in the latest
>> release 1.11.
>>
>> [1] https://logging.apache.org/log4j/2.x/manual/appenders.html
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Jul 25, 2020 at 2:37 AM Richard Moorhead
>>  wrote:
>> >
>> >
>> >
>> > -- Forwarded message -
>> > From: Robert Metzger 
>> > Date: Fri, Jul 24, 2020 at 1:09 PM
>> > Subject: Re: Flink Session TM Logs
>> > To: Richard Moorhead 
>> >
>> >
>> > I accidentally replied to you directly, not to the user@ mailing list.
>> Is it okay for you to publish the thread on the list again?
>> >
>> >
>> >
>> > On Fri, Jul 24, 2020 at 8:01 PM Richard Moorhead <
>> richard.moorh...@gmail.com> wrote:
>> >>
>> >> It is enabled. The issue is that for a long running flink session
>> -which may execute many jobs- the task managers, after a job is completed,
>> are gone, and their logs arent available.
>> >>
>> >> What I have noticed is that when the session is terminated I am able
>> to find the logs in the job history server under the associated yarn
>> application id.
>> >>
>> >> On Fri, Jul 24, 2020 at 12:51 PM Robert Metzger 
>> wrote:
>> >>>
>> >>> Hi Richard,
>> >>>
>> >>> you need to enable YARN log aggregation to access logs of finished
>> YARN applications.
>> >>>
>> >>> On Fri, Jul 24, 2020 at 5:58 PM Richard Moorhead <
>> richard.moorh...@gmail.com> wrote:
>> 
>>  When running a flink session on YARN, task manager logs for a job
>> are not available after completion. How do we locate these logs?
>> 
>>
>


Re: Kafka connector with PyFlink

2020-07-27 Thread Wojciech Korczyński
Hi,

when I try it locally it runs well. The problem is when I run it
using Kubernetes. I don't know how to make Flink and Kubernetes go well
together in that case.

Best, Wojtek

pt., 24 lip 2020 o 17:51 Xingbo Huang  napisał(a):

> Hi Wojciech,
> In many cases, you can make sure that your code can run correctly in local
> mode, and then submit the job to the cluster for testing. For how to add
> jar packages in local mode, you can refer to the doc[1].
> Besides, you'd better use blink planner which is the default planner. For
> how to use blink planner, you can refer to the doc[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/common_questions.html#adding-jar-files
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-tableenvironment
>
> Best,
> Xingbo
>
> Wojciech Korczyński  于2020年7月24日周五
> 下午9:40写道:
>
>> Hi,
>>
>> I've done like you recommended:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, 
>> StreamTableEnvironment, ScalarFunction
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, 
>> Json, Csv
>> from pyflink.table.udf import udf
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_config = TableConfig()
>> t_env = StreamTableEnvironment.create(exec_env, t_config)
>>
>> INPUT_TABLE = "my_topic"
>> INPUT_TOPIC = "my-topic"
>> LOCAL_KAFKA = 'my-cluster-kafka-bootstrap:9092'
>> OUTPUT_TABLE = "my_topic_output"
>> OUTPUT_TOPIC = "my-topic-output"
>>
>> ddl_source = f"""
>>CREATE TABLE {INPUT_TABLE} (
>>message STRING
>>) WITH (
>>'connector' = 'kafka',
>>'topic' = '{INPUT_TOPIC}',
>>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>'format' = 'json'
>>)
>>"""
>>
>> ddl_sink = f"""
>>CREATE TABLE {OUTPUT_TABLE} (
>>message STRING
>>) WITH (
>>'connector' = 'kafka',
>>'topic' = '{OUTPUT_TOPIC}',
>>'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>'format' = 'json'
>>)
>>"""
>>
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> result = t_env.execute_sql(f"""
>> INSERT INTO {OUTPUT_TABLE}
>> SELECT message
>> FROM {INPUT_TABLE}
>> """)
>>
>> result.get_job_client().get_job_execution_result().result()
>>
>> I think it is correctly written.
>>
>> However after deploying that job I'm getting an error:
>>
>> wojtek@wojtek-B365M-D3H:~/PycharmProjects/k8s_kafka_demo$ 
>> /home/wojtek/flink/build-target/bin/flink run -m $(minikube ip):30081 -py 
>> kafka2flink.py -j jar/flink-sql-connector-kafka_2.11-1.11.0.jar
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by 
>> org.apache.flink.api.java.ClosureCleaner 
>> (file:/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/lib/flink-dist_2.11-1.11-SNAPSHOT.jar)
>>  to field java.util.Properties.serialVersionUID
>> WARNING: Please consider reporting this to the maintainers of 
>> org.apache.flink.api.java.ClosureCleaner
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>> Job has been submitted with JobID d23fe59415e9c9d79d15a1cf7e5409aa
>> Traceback (most recent call last):
>>   File "kafka2flink.py", line 62, in 
>> result.get_job_client().get_job_execution_result().result()
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/common/completable_future.py",
>>  line 78, in result
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>>  line 1286, in __call__
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>>  line 147, in deco
>>   File 
>> "/home/wojtek/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>>  line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o52.get.
>> : java.util.concurrent.ExecutionException: 
>> org.apache.flink.client.program.ProgramInvocationException: Job failed 
>> (JobID: d23fe59415e9c9d79d15a1cf7e5409aa)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>>  at 
>> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>>  at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>>  at 
>> java.base/jdk.internal.reflect.NativeMethodAccessor

Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Tom Fennelly
Thank you David.

In the case we have in mind it should only happen literally on the very
rare Exception i.e. in some cases if somehow an uncaught exception occurs,
we want to send the record to a DLQ and handle the retry manually Vs
checkpointing and restarting.

Regards,

Tom.


On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
wrote:

> Every job is required to have a sink, but there's no requirement that all
> output be done via sinks. It's not uncommon, and doesn't have to cause
> problems, to have other operators that do I/O.
>
> What can be problematic, however, is doing blocking I/O. While your user
> function is blocked, the function will exert back pressure, and checkpoint
> barriers will be unable to make any progress. This sometimes leads to
> checkpoint timeouts and job failures. So it's recommended to make any I/O
> you do asynchronous, using an AsyncFunction [1] or something similar.
>
> Note that the asynchronous i/o function stores the records for in-flight
> asynchronous requests in checkpoints, and restores/re-triggers the requests
> when recovering from a failure. This might lead to duplicate results if you
> are using it to do non-idempotent database writes. If you need
> transactions, use a sink that offers them.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> 
>
> Best,
> David
>
> On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly 
> wrote:
>
>> Hi.
>>
>> What are the negative side effects of (for example) a filter function
>> occasionally making a call out to a DB ? Is this a big no-no and should all
>> outputs be done through sinks and side outputs, no exceptions ?
>>
>> Regards,
>>
>> Tom.
>>
>


Re: Is there a way to use stream API with this program?

2020-07-27 Thread Flavio Pompermaier
Yes it could..where should I emit the MAX_WATERMARK and how do I detect
that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
wrote:

> In this use case, couldn't the custom trigger register an event time timer
> for MAX_WATERMARK, which would be triggered when the bounded input reaches
> its end?
>
> David
>
> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I'm afraid that there is not out of the box way of doing this. I've
>> created a ticket [1] to write down and document a discussion that we had
>> about this issue in the past.
>>
>> The issue is that currently, untriggered processing time timers are
>> ignored on end of input and it seems like there might be no one single
>> perfect way to handle it for all of the cases, but it probably needs to be
>> customized.
>>
>> Maybe you could:
>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>> 2. implement
>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>> `MyWindowOperator`
>> 3. Inside `MyWindowOperator#endInput`  invoke
>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>   b) delete manually triggered timer
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
>> napisał(a):
>>
>>> Hi to all,
>>> I was trying to port another job we have that use dataset API to
>>> datastream.
>>> The legacy program was doing basically a dataset.mapPartition().reduce()
>>> so I tried to replicate this thing with a
>>>
>>>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>   final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>> .keyBy(t -> t.f0)
>>> .window(GlobalWindows.create())
>>>
>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>> 100L))).
>>> .process(..)
>>>
>>> Unfortunately the program exits before reaching the Process function
>>> (moreover I need to add another window + trigger after it before adding the
>>> reduce function).
>>> Is there a way to do this with the DataStream API or should I still use
>>> DataSet API for the moment (when the batch will be fully supported)? I
>>> append to the footer all the code required to test the job.
>>>
>>> Best,
>>> Flavio
>>>
>>> -
>>>
>>> package org.apache.flink.stats.sketches;
>>>
>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>> import org.apache.flink.api.common.state.ReducingState;
>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.util.Collector;
>>>
>>> public class Test {
>>>   public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> env.setParallelism(1);
>>>
>>> final BasicTypeInfo columnType =
>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>> final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>> final DataStream out = input.map(