Re: Backpressure and 99th percentile latency

2020-03-05 Thread Arvid Heise
Hi Felipe,

latency under backpressure has to be carefully interpreted. Latency's
semantics actually require that the data source is read in a timely manner;
that is, there is no bottleneck in your pipeline where data is piling up.

Thus, to measure latency in experiments you must ensure that the current
throughput is below the maximum throughput, for example by gradually
increasing the throughput with a generating source or through some
throttles on the external source. Until you reach the maximum throughput,
latencies semantics is exactly like you expect it. Everything after that is
more or less just reciprocal to backpressure.

If you go away from the theoretical consideration and look at actual
setups, you can easily see why this distinction makes sense: if you have a
low-latency application, you are doomed if you have backpressure (cannot
hold SLAs). You would immediately rescale if you see signs of backpressure
(or even earlier). Then, latency always has the desired semantics.

On Fri, Mar 6, 2020 at 5:55 AM Zhijiang  wrote:

> Hi Felipe,
>
> Try to answer your below questions.
>
> > I understand that I am tracking latency every 10 seconds for each
> physical instance operator. Is that right?
>
> Generally right. The latency marker is emitted from source and flow
> through all the intermediate operators until sink. This interval controls
> the emitting frequency of source.
>
> > The backpressure goes away but the 99th percentile latency is still the
> same. Why? Does it have no relation with each other?
>
> The latency might be influenced by buffer flush timeout, network transport
> and load, etc.  In the case of backpressure, there are huge in-flight data
> accumulated in network wire, so the latency marker is queuing to wait for
> network transport which might bring obvious delay. Even the latency marker
> can not be emitted in time from source because of no available buffers
> temporarily.
>
> After the backpressure goes away, that does not mean there are no
> accumulated buffers on network wire, just not reaching the degree of
> backpressure. So the latency marker still needs to be queued with
> accumulated buffers on the wire. And it might take some time to digest the
> previous accumulated buffers completed to relax the latency. I guess it
> might be your case. You can monitor the metrics of "inputQueueLength" and
> "outputQueueLength" for confirming the status. Anyway, the answer is yes
> that it has relation with backpressure, but might have some delay to see
> the changes obviously.
>
> >In the end I left the experiment for more than 2 hours running and only
> after about 1,5 hour the 99th percentile latency got down to milliseconds.
> Is that normal?
>
> I guess it is normal as mentioned above.  After there are no accumulated
> buffers in network stack completely without backpressure, it should go down
> to milliseconds.
>
> Best,
> Zhijiang
>
> --
> From:Felipe Gutierrez 
> Send Time:2020 Mar. 6 (Fri.) 05:04
> To:user 
> Subject:Backpressure and 99th percentile latency
>
> Hi,
>
> I am a bit confused about the topic of tracking latency in Flink [1]. It
> says if I use the latency track I am measuring the Flink’s network stack
> but application code latencies also can influence it. For instance, if I am
> using the metrics.latency.granularity: operator (default) and
> setLatencyTrackingInterval(1). I understand that I am tracking latency
> every 10 seconds for each physical instance operator. Is that right?
>
> In my application, I am tracking the latency of all aggregators. When I
> have a high workload and I can see backpressure from the flink UI the 99th
> percentile latency is 13, 25, 21, and 25 seconds. Then I set my aggregator
> to have a larger window. The backpressure goes away but the 99th percentile
> latency is still the same. Why? Does it have no relation with each other?
>
> In the end I left the experiment for more than 2 hours running and only
> after about 1,5 hour the 99th percentile latency got down to milliseconds.
> Is that normal? Please see the figure attached.
>
> [1]
> https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
>


Re: Backpressure and 99th percentile latency

2020-03-05 Thread Zhijiang
Hi Felipe,

Try to answer your below questions.

> I understand that I am tracking latency every 10 seconds for each physical 
> instance operator. Is that right?

Generally right. The latency marker is emitted from source and flow through all 
the intermediate operators until sink. This interval controls the emitting 
frequency of source.

> The backpressure goes away but the 99th percentile latency is still the same. 
> Why? Does it have no relation with each other?

The latency might be influenced by buffer flush timeout, network transport and 
load, etc.  In the case of backpressure, there are huge in-flight data 
accumulated in network wire, so the latency marker is queuing to wait for 
network transport which might bring obvious delay. Even the latency marker can 
not be emitted in time from source because of no available buffers temporarily. 

After the backpressure goes away, that does not mean there are no accumulated 
buffers on network wire, just not reaching the degree of backpressure. So the 
latency marker still needs to be queued with accumulated buffers on the wire. 
And it might take some time to digest the previous accumulated buffers 
completed to relax the latency. I guess it might be your case. You can monitor 
the metrics of "inputQueueLength" and "outputQueueLength" for confirming the 
status. Anyway, the answer is yes that it has relation with backpressure, but 
might have some delay to see the changes obviously.

>In the end I left the experiment for more than 2 hours running and only after 
>about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
>normal?

I guess it is normal as mentioned above.  After there are no accumulated 
buffers in network stack completely without backpressure, it should go down to 
milliseconds.

Best,
Zhijiang
--
From:Felipe Gutierrez 
Send Time:2020 Mar. 6 (Fri.) 05:04
To:user 
Subject:Backpressure and 99th percentile latency

Hi,

I am a bit confused about the topic of tracking latency in Flink [1]. It says 
if I use the latency track I am measuring the Flink’s network stack but 
application code latencies also can influence it. For instance, if I am using 
the metrics.latency.granularity: operator (default) and 
setLatencyTrackingInterval(1). I understand that I am tracking latency 
every 10 seconds for each physical instance operator. Is that right?

In my application, I am tracking the latency of all aggregators. When I have a 
high workload and I can see backpressure from the flink UI the 99th percentile 
latency is 13, 25, 21, and 25 seconds. Then I set my aggregator to have a 
larger window. The backpressure goes away but the 99th percentile latency is 
still the same. Why? Does it have no relation with each other?

In the end I left the experiment for more than 2 hours running and only after 
about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
normal? Please see the figure attached.

[1] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Actually this use case lead me to start thinking about one question:
If watermark is enabled, could we also support GROUP BY event_time instead
of forcing
user defining a window based on the event_time.

GROUP BY a standalone event_time can also be treated as a special window,
which has
both start_time and end_time equals to event_time. And when watermark
surpass the event_time,
we can still get the complete data of such group and do required
aggregation and then emit
insert only results.

That would ease user's burden for not having to define a window when they
already have event
time and watermark defined.

Best,
Kurt


On Fri, Mar 6, 2020 at 10:26 AM Jark Wu  wrote:

> Hi Gyula,
>
> Does tumbling 5 seconds for aggregation meet your need? For example:
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
> SECOND), sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:
>
>> I see, maybe I just dont understand how to properly express what I am
>> trying to compute.
>>
>> Basically I want to aggregate the quantities of the transactions that
>> happened in the 5 seconds before the query.
>> Every query.id belongs to a single query (event_time, itemid) but still
>> I have to group :/
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:
>>
>>> I think the issue is not caused by event time interval join, but the
>>> aggregation after the join:
>>> GROUP BY t.itemId, q.event_time, q.queryId;
>>>
>>> In this case, there is still no chance for Flink to determine whether
>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>> As a comparison, if you change the grouping key to a window which based
>>> only on q.event_time, then the query would emit insert only results.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:
>>>
 That's exactly the kind of behaviour I am looking for Kurt ("ignore all
 delete messages").

 As for the data completion, in my above example it is basically an
 event time interval join.
 With watermarks defined Flink should be able to compute results once in
 exactly the same way as for the tumbling window.

 Gyula

 On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:

> Back to this case, I assume you are expecting something like "ignore
> all delete messages" flag? With this
> flag turned on, Flink will only send insert messages which
> corresponding current correct results to kafka and
> drop all retractions and deletes on the fly.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>
>> > I also don't completely understand at this point why I can write
>> the result of a group, tumble window aggregate to Kafka and not this 
>> window
>> join / aggregate.
>>
>> If you are doing a tumble window aggregate with watermark enabled,
>> Flink will only fire a final result for
>> each window at once, no modification or retractions will happen after
>> a window is calculated and fired.
>> But with some other arbitrary aggregations, there is not enough
>> information for Flink to determine whether
>> the data is complete or not, so the framework will keep calculating
>> results when receiving new records and
>> retract earlier results by firing retraction/deletion messages.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
>> wrote:
>>
>>> Thanks Benoît!
>>>
>>> I can see now how I can implement this myself through the provided
>>> sink interfaces but I was trying to avoid having to write code for this 
>>> :D
>>> My initial motivation was to see whether we are able to write out
>>> any kind of table to Kafka as a simple stream of "upserts".
>>>
>>> I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
 Hi Gyula,

 I'm afraid conversion to see the retractions vs inserts can't be
 done in pure SQL (though I'd love that feature).

 You might want to go lower level and implement a
 RetractStreamTableSink [1][2] that you would wrap around a 
 KafkaTableSink
 [3]. This will give you a emitDataStream(DataStream>
 dataStream);, in which the Boolean flag will give you an 'accumulate' 
 or
>

RE: Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks Jack. I’ll try removing the inAppendMode() ☺
Regarding the Teradata dialect, crossing my fingers and hoping insert queries 
work.

From: Jark Wu 
Sent: Thursday, March 5, 2020 8:41 PM
To: Norm Vilmer (Contractor) 
Cc: Arvid Heise ; user@flink.apache.org
Subject: EXTERNAL - Re: Re: Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/attachments or replying with sensitive data. If suspicious, forward to 
'suspici...@wnco.com'.

Hi Norm,

Here is a documentation for JDBC connector, you can find the supported 
properties there: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Regarding to your exception, you don't need to call `inAppendMode`. JDBC sink 
support both append-mode and upsert-mode.

Besides of that, flink-jdbc currenlty doesn't support teradata dialect and 
plugin dialect, I guess you have to adjust some code in `JDBCDialects`.

Best,
Jark


On Thu, 5 Mar 2020 at 23:46, Norm Vilmer (Contractor) 
mailto:norm.vil...@wnco.com>> wrote:
Thanks for the reply, Arvid. I changed the property names in my 
ConnectorDescriptor subclass to match what the validator wanted and now get:

“Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
Unsupported property keys:
update-mode”

The method suggested in the link you sent, registerTableSink, is deprecated in 
1.10, so I was trying to use the following:

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

. . .

tableEnv.connect(new Teradata())
.withSchema(new Schema()
.field("f0", DataTypes.VARCHAR(25))
.field("f1", DataTypes.VARCHAR(10240))
.field("f2", DataTypes.VARCHAR(10240)))
.inAppendMode()
.createTemporaryTable("staging");

Table table = tableEnv.fromDataStream(reportData);
table.insertInto("staging");

Using the connect() method, I can see that the code attempts to use the 
JDBCTableSourceSinkFactory, but does not like ‘update-mode’.

Do you have an example using connect() method? Thanks.

From: Arvid Heise mailto:ar...@ververica.com>>
Sent: Thursday, March 5, 2020 1:15 AM
To: Norm Vilmer (Contractor) mailto:norm.vil...@wnco.com>>
Cc: user@flink.apache.org
Subject: EXTERNAL - Re: Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/attachments or replying with sensitive data. If suspicious, forward to 
'suspici...@wnco.com'.

Hi Norm,

the error message already points to the main issue: your property names are not 
correct.
Unsupported property keys:
drivername
update-mode
password
dburl
username

You should use the builder to properly configure the sink [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbcappendtablesink

On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) 
mailto:norm.vil...@wnco.com>> wrote:
Same error with this change:

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put(JDBCValidator.CONNECTOR_DRIVER, "com.teradata.jdbc.TeraDriver");
map.put(JDBCValidator.CONNECTOR_URL, 
"jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put(JDBCValidator.CONNECTOR_USERNAME, "...");
map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!");
return map;
}
}

-Original Message-
From: Norm Vilmer (Contractor) 
mailto:norm.vil...@wnco.com>>
Sent: Wednesday, March 4, 2020 10:37 AM
To: user@flink.apache.org
Subject: EXTERNAL - Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/att

Re: Re: Teradata as JDBC Connection

2020-03-05 Thread Jark Wu
Hi Norm,

Here is a documentation for JDBC connector, you can find the supported
properties there:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector

Regarding to your exception, you don't need to call `inAppendMode`. JDBC
sink support both append-mode and upsert-mode.

Besides of that, flink-jdbc currenlty doesn't support teradata dialect and
plugin dialect, I guess you have to adjust some code in `JDBCDialects`.

Best,
Jark


On Thu, 5 Mar 2020 at 23:46, Norm Vilmer (Contractor) 
wrote:

> Thanks for the reply, Arvid. I changed the property names in my
> ConnectorDescriptor subclass to match what the validator wanted and now get:
>
>
>
> “Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
>
> the classpath.
>
>
>
> Reason: No factory supports all properties.
>
>
>
> The matching candidates:
>
> org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
>
> Unsupported property keys:
>
> update-mode”
>
>
>
> The method suggested in the link you sent, registerTableSink, is
> deprecated in 1.10, so I was trying to use the following:
>
>
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
>
>
> . . .
>
>
>
> tableEnv.connect(new Teradata())
>
> .withSchema(new Schema()
>
> .field("f0", DataTypes.VARCHAR(25))
>
> .field("f1", DataTypes.VARCHAR(10240))
>
> .field("f2", DataTypes.VARCHAR(10240)))
>
> .inAppendMode()
>
> .createTemporaryTable("staging");
>
>
>
> Table table = tableEnv.fromDataStream(reportData);
>
> table.insertInto("staging");
>
>
>
> Using the connect() method, I can see that the code attempts to use the
> JDBCTableSourceSinkFactory, but does not like ‘update-mode’.
>
>
>
> Do you have an example using connect() method? Thanks.
>
>
>
> *From:* Arvid Heise 
> *Sent:* Thursday, March 5, 2020 1:15 AM
> *To:* Norm Vilmer (Contractor) 
> *Cc:* user@flink.apache.org
> *Subject:* EXTERNAL - Re: Teradata as JDBC Connection
>
>
>
> *Caution:** Sender is from outside SWA. Take caution before opening
> links/attachments or replying with sensitive data. If suspicious, forward
> to 'suspici...@wnco.com '.*
>
>
>
> Hi Norm,
>
>
>
> the error message already points to the main issue: your property names
> are not correct.
>
>
>
>
>
>
> *Unsupported property keys: drivername update-mode password dburl username*
>
>
>
> You should use the builder to properly configure the sink [1].
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbcappendtablesink
> 
>
>
>
> On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) <
> norm.vil...@wnco.com> wrote:
>
> Same error with this change:
>
> public class Teradata extends ConnectorDescriptor {
> /**
>  * Constructs a {@link ConnectorDescriptor}.
>  */
> public Teradata() {
> super("jdbc", 1, false);
> }
>
> @Override
> protected Map toConnectorProperties() {
> Map map = new HashMap<>();
> map.put(JDBCValidator.CONNECTOR_DRIVER,
> "com.teradata.jdbc.TeraDriver");
> map.put(JDBCValidator.CONNECTOR_URL,
> "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
> map.put(JDBCValidator.CONNECTOR_USERNAME, "...");
> map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!");
> return map;
> }
> }
>
> -Original Message-
> From: Norm Vilmer (Contractor) 
> Sent: Wednesday, March 4, 2020 10:37 AM
> To: user@flink.apache.org
> Subject: EXTERNAL - Teradata as JDBC Connection
>
> Caution: Sender is from outside SWA. Take caution before opening
> links/attachments or replying with sensitive data. If suspicious, forward
> to 'suspici...@wnco.com'.
>
> Using Flink 1.10 and coding in Java 11, is it possible use to write to
> Teradata in append mode? MySQL, PostgreSQL, and Derby are the only
> supported drivers listed. Thanks.
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_table_connect.html-23connectors&d=DwIFAg&c=dyyteaO_66X5RejcGgaVFCWGX8V6S6CQobBcYjo__mc&r=a8BqCmWrJ1FuU14JVrlQLeWdeeSBWSiCJA9Y5xTWafg&m=kfV3arAbKYvpd5IvCtggkHsoDXKTgA1RrGMWrbcWZOo&s=n91D15kGNf9TDtKedGYD8EfDYxnvEzY8POgNtSE-icY&e=
>
> I created the ConnectorDescriptor below and am using it from
> tableEnvironment.connect() but get the exception shown below.
>
> public class Te

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-05 Thread Xintong Song
Hi Abhinav,

Thanks for the log. However, the attached log seems to be incomplete.
The NoResourceAvailableException cannot be found in this log.

Regarding connecting to ResourceManager, the log suggests that:

   - ZK was back to life and connected at 06:29:56.
   2020-02-27 06:29:56.539 [main-EventThread] level=INFO
o.a.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State
   change: CONNECTED
   - RM registered to ZK and was granted leadership at 06:30:01.
   2020-02-27 06:30:01.677 [flink-akka.actor.default-dispatcher-5]
   level=INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
   ResourceManager akka.tcp://flink@JOBMANAGER:6126/user/resourcemanager
   was granted leadership with fencing token a2c453481ea4e0c7722cab1e4dd741db
   - JM requests RM leader address from ZK at 06:30:06.
   2020-02-27 06:30:06.272 [flink-akka.actor.default-dispatcher-17]
   level=INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
   Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
   - The RM leader address will be notified asynchronously, and only after
   that JM will try to connect to RM (printing the "Connecting to
   ResourceManager" log). The attached log ends in 100ms after JM requesting
   RM leader address, which is too short to tell whether the RM is connected
   properly.

Another finding is about the TM registration. According to the log:

   - The parallelism of your job is 20, which means it needs 20 slots to be
   executed.
   - There are only 5 TMs registered. (Searching for "Registering
   TaskManager with ResourceID")
   - Assuming you have the same configurations for JM and TMs (this might
   not always be true), you have one slot per TM.
   599 2020-02-27 06:28:56.495 [main] level=INFO
org.apache.flink.configuration.GlobalConfiguration  - Loading
   configuration property: taskmanager.numberOfTaskSlots, 1
   - That suggests that it is possible that not all the TaskExecutors are
   recovered/reconnected, leading to the NoResourceAvailableException. We
   would need the rest part of the log (from where the current one ends to
   the NoResourceAvailableException) to tell what happened during the
   scheduling. Also, could you confirm how many TMs do you use?



Thank you~

Xintong Song



On Fri, Mar 6, 2020 at 5:55 AM Bajaj, Abhinav 
wrote:

> Hi Xintong,
>
>
>
> Highly appreciate your assistance here.
>
> I am attaching the jobmanager log for reference.
>
>
>
> Let me share my quick responses on what you mentioned.
>
>
>
>
>
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot
> request, no ResourceManager connected.
>
> *XS**: Sometimes you see this log because the ResourceManager is not yet
> connect when the slot request arrives the SlotPool. If the ResourceManager
> is connected later, the SlotPool will still send the pending slot requests,
> in that case you should find logs for SlotPool requesting slots from
> ResourceManager.*
>
>
>
> *AB*: Yes, I have noticed that behavior in scenarios where
> resourcemanager and jobmanager are connected successfully. The requests
> fail initially and they are served later when they are connected.
>
> I don’t think that happened in this case. But you have access to the
> jobmanager logs to check my understanding.
>
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms……
>
> *XS**: This error message simply means that the slot requests are not
> satisfied in 5min. Various reasons might cause this problem.*
>
>- *The ResourceManager is not connected at all.*
>   - *AB*: I think Resoucemanager is not connected to Jobmaster or
>   vice versa. My basis is *absence* of below logs –
>  - org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to
>  ResourceManager
>  - o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>  Registering job manager
>   - *The ResourceManager is connected, but some TaskExecutors are not
>registered due to the ZK problem. *
>   - *AB*: I think the Task Executors were able to register or were in
>   the process of registering with ResourceManager.
>- *ZK recovery takes too much time, so that despite all JM, RM, TMs
>are able to connect to the ZK there might not be enough time to satisfy the
>slot request before the timeout.*
>   - *AB*: To help check that may be you can use this log time
>  - 2020-02-27 06:29:53,732 [myid:1] - INFO
>  [QuorumPeer[myid=1]/0.0.0.0:2181:Follower@64] - FOLLOWING -
>  LEADER ELECTION TOOK - 25069
>  - 2020-02-27 06:29:53,766 [myid:1] - INFO
>  [QuorumPeer[myid=1]/0.0.0.0:2181:Learner@332] - Getting a diff
>  from the leader 0x22bf6
>
> Thanks a lot for looking into this.
>
> ~ Abhinav Bajaj
>
>
>
>
>
> *From: *Xintong Song 
> *Date: *Wednesday, March 4, 2020 at 7:17 PM
> *To: 

Re: Writing retract streams to Kafka

2020-03-05 Thread Jark Wu
Hi Gyula,

Does tumbling 5 seconds for aggregation meet your need? For example:

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
SECOND), sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);

Best,
Jark

On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:

> I see, maybe I just dont understand how to properly express what I am
> trying to compute.
>
> Basically I want to aggregate the quantities of the transactions that
> happened in the 5 seconds before the query.
> Every query.id belongs to a single query (event_time, itemid) but still I
> have to group :/
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:
>
>> I think the issue is not caused by event time interval join, but the
>> aggregation after the join:
>> GROUP BY t.itemId, q.event_time, q.queryId;
>>
>> In this case, there is still no chance for Flink to determine whether the
>> groups like (itemId, eventtime, queryId) have complete data or not.
>> As a comparison, if you change the grouping key to a window which based
>> only on q.event_time, then the query would emit insert only results.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:
>>
>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>> delete messages").
>>>
>>> As for the data completion, in my above example it is basically an event
>>> time interval join.
>>> With watermarks defined Flink should be able to compute results once in
>>> exactly the same way as for the tumbling window.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>>>
 Back to this case, I assume you are expecting something like "ignore
 all delete messages" flag? With this
 flag turned on, Flink will only send insert messages which
 corresponding current correct results to kafka and
 drop all retractions and deletes on the fly.

 Best,
 Kurt


 On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:

> > I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> If you are doing a tumble window aggregate with watermark enabled,
> Flink will only fire a final result for
> each window at once, no modification or retractions will happen after
> a window is calculated and fired.
> But with some other arbitrary aggregations, there is not enough
> information for Flink to determine whether
> the data is complete or not, so the framework will keep calculating
> results when receiving new records and
> retract earlier results by firing retraction/deletion messages.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
> wrote:
>
>> Thanks Benoît!
>>
>> I can see now how I can implement this myself through the provided
>> sink interfaces but I was trying to avoid having to write code for this 
>> :D
>> My initial motivation was to see whether we are able to write out any
>> kind of table to Kafka as a simple stream of "upserts".
>>
>> I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi Gyula,
>>>
>>> I'm afraid conversion to see the retractions vs inserts can't be
>>> done in pure SQL (though I'd love that feature).
>>>
>>> You might want to go lower level and implement a
>>> RetractStreamTableSink [1][2] that you would wrap around a 
>>> KafkaTableSink
>>> [3]. This will give you a emitDataStream(DataStream>
>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>> 'retract' signal.
>>> You can then filter the DataStream accordingly before passing to the
>>> KafkaTableSink.
>>>
>>> Hope this helps.
>>>
>>> Best regards
>>> Benoît
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>
>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra 
>>> wrote:
>>>
 Hi Roman,

 This is the core logic:

 CREATE TABLE QueryResult (
>>>

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
There's 2 kinds of configuration: job level & cluster level. I am afraid we
don't have document to differentiate that, it depends on how user
understand these configuration. We may need to improve document on that.

Kurt Young  于2020年3月6日周五 上午8:34写道:

> If you already have a running flink cluster and you want submit another
> job to this cluster, then all the configurations
> relates to process parameters like TM memory, slot number etc are not be
> able to modify.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:
>
>> Kurt can you please explain which conf parameters do you mean?
>>
>> In regular executions (Yarn for instance) we  have dynamic config
>> parameters overriding any flink-conf argument.
>> So it is not about setting them in the user code but it should happen
>> before the ClusterDescriptors are created (ie in the together with the
>> CustomCommandLine logic)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>>
>>> IIRC the tricky thing here is not all the config options belong to
>>> flink-conf.yaml can be adjust dynamically in user's program.
>>> So it will end up like some of the configurations can be overridden but
>>> some are not. The experience is not quite good for users.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>>>
 Hi Gyula,

 I am doing integration Flink with Zeppelin. One feature in Zeppelin is
 that user could override any features in flink-conf.yaml. (Actually any
 features here
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
 Of course you can run flink sql in Zeppelin, and could also leverage other
 features of Zeppelin, like visualization.

 If you are interested, you could try the master branch of Zeppelin +
 this improvement PR

 https://github.com/apache/zeppelin
 https://github.com/apache/zeppelin/pull/3676
 https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md






 Gyula Fóra  于2020年3月5日周四 下午6:51写道:

> I could basically list a few things I want to set (execution.target
> for example), but it's fair to assume that I would like to be able to set
> anything :)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
> wrote:
>
>> Hi Gyula,
>>
>> Maybe Blink planner has invoked
>> "StreamExecutionEnvironment.configure", which planner do you use?
>>
>> But "StreamExecutionEnvironment.configure" is only for partial
>> configuration, can not for all configuration in flink-conf.yaml.
>> So what's the config do you want to set? I know some config like
>> "taskmanager.network.blocking-shuffle.compression.enabled" can not 
>> set
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> Flink configurations can be overrided via
>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
>>> Table
>>> specific configs.
>>> I will think it as a bug/improvement of SQL CLI which should be
>>> fixed in 1.10.1.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra 
>>> wrote:
>>>
 Thanks Caizhi,

 This seems like a pretty big shortcoming for any
 multi-user/multi-app environment. I will open a jira for this.

 Gyula

 On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
 wrote:

> Hi Gyula.
>
> I'm afraid there is no way to override all Flink configurations
> currently. SQL client yaml file can only override some of the Flink
> configurations.
>
> Configuration entries indeed can only set Table specific configs,
> while deployment entires are used to set the result fetching address 
> and
> port. There is currently no way to change the execution target from 
> the SQL
> client.
>
> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>
>> Hi All!
>>
>> I am trying to understand if there is any way to override flink
>> configuration parameters when starting the SQL Client.
>>
>> It seems that the only way to pass any parameters is through the
>> environment yaml.
>>
>> There I found 2 possible routes:
>>
>> configuration: this doesn't work as it only sets Table specific
>> configs apparently, but maybe I am wrong.
>>
>> deployment: I tried using dynamic properties options here but
>> unfortunately we normalize (lowercase) the YAML keys so it is 
>> impossible to
>> pass options like -yD or -D.
>>
>> Does anyone have any suggestions?
>>
>> Thanks
>> Gyula
>>
>
>>

Re: Single stream, two sinks

2020-03-05 Thread Austin Cawley-Edwards
We have the same setup and it works quite well. One thing to take into
account is that your HTTP call may happen multiple times if you’re using
checkpointing/ fault tolerance mechanism, so it’s important that those
calls are idempotent and won’t duplicate data.

Also we’ve found that it’s important to make the max number of parallel
requests in your async operator runtime-configurable so you can control
that bottleneck.

Hope that is helpful!

Austin

On Thu, Mar 5, 2020 at 6:18 PM Gadi Katsovich 
wrote:

> Guys, thanks for the great advice. It works!
> I used HttpAsyncClient from Apache Commons.
> At first I tried to implement the async http client by implementing
> AsyncFunction. I implemented the asyncInvoke method and used
> try-with-resouce to instantiate the client (because it's
> CloseableHttpAsyncClient). That didn't work and I got "Async function call
> has timed out" exception.
> Then I followed the example in the link and had my async http client
> extend RichAsyncFunction that opens and closes the http client instance in
> the corresponding methods, all started to working.
>
> On Tue, Mar 3, 2020 at 7:13 PM John Smith  wrote:
>
>> If I understand correctly he wants the HTTP result in the DB. So I do not
>> think side output works here. The DB would have to be the sink. Also sinks
>> in Flink are the final destination.
>>
>> So it would have to be RabbitMQ -> Some Cool Business Logic Operators
>> Here > Async I/O HTTP Operator -> JDBC Sink.
>>
>> Take look here also:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>  <--
>> The Example shows database client, but you can easily replace that with
>> HTTP client.
>>
>> But basically...
>> 1- Get input from RabbitMQ Source.
>> 2- Do what ever type of stream computations/business logic you need.
>> 3- Use the Async I/O operator to send HTTP
>> - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
>> other info you want. Maybe response body.
>> - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
>> info.
>> 4- Sink the output record from #3 to JDBC.
>>
>> On Sun, 1 Mar 2020 at 10:28, miki haiat  wrote:
>>
>>> So you have rabitmq source and http sink?
>>> If so you can use side output in order to dump your data to db.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>
>>> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich 
>>> wrote:
>>>
 Hi,
 I'm new to flink and am evaluating it to replace our existing streaming
 application.
 The use case I'm working on is reading messages from RabbitMQ queue,
 applying some transformation and filtering logic and sending it via HTTP to
 a 3rd party.
 A must have requirement of this flow is to to write the data that was
 sent to an SQL db, for audit and troubleshooting purposes.
 I'm currently basing my HTTP solution on a PR with needed adjustments:
 https://github.com/apache/flink/pull/5866/files
 How can I add an insertion to a DB after a successful HTTP request?
 Thank you.

>>>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
If you already have a running flink cluster and you want submit another job
to this cluster, then all the configurations
relates to process parameters like TM memory, slot number etc are not be
able to modify.

Best,
Kurt


On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:

> Kurt can you please explain which conf parameters do you mean?
>
> In regular executions (Yarn for instance) we  have dynamic config
> parameters overriding any flink-conf argument.
> So it is not about setting them in the user code but it should happen
> before the ClusterDescriptors are created (ie in the together with the
> CustomCommandLine logic)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>
>> IIRC the tricky thing here is not all the config options belong to
>> flink-conf.yaml can be adjust dynamically in user's program.
>> So it will end up like some of the configurations can be overridden but
>> some are not. The experience is not quite good for users.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>>
>>> Hi Gyula,
>>>
>>> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
>>> that user could override any features in flink-conf.yaml. (Actually any
>>> features here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
>>> Of course you can run flink sql in Zeppelin, and could also leverage other
>>> features of Zeppelin, like visualization.
>>>
>>> If you are interested, you could try the master branch of Zeppelin +
>>> this improvement PR
>>>
>>> https://github.com/apache/zeppelin
>>> https://github.com/apache/zeppelin/pull/3676
>>> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>>>
>>>
>>>
>>>
>>>
>>>
>>> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>>>
 I could basically list a few things I want to set (execution.target for
 example), but it's fair to assume that I would like to be able to set
 anything :)

 Gyula

 On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
 wrote:

> Hi Gyula,
>
> Maybe Blink planner has invoked
> "StreamExecutionEnvironment.configure", which planner do you use?
>
> But "StreamExecutionEnvironment.configure" is only for partial
> configuration, can not for all configuration in flink-conf.yaml.
> So what's the config do you want to set? I know some config like
> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>
> Best,
> Jingsong Lee
>
> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>
>> Hi Gyula,
>>
>> Flink configurations can be overrided via
>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
>> Table
>> specific configs.
>> I will think it as a bug/improvement of SQL CLI which should be fixed
>> in 1.10.1.
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>
>>> Thanks Caizhi,
>>>
>>> This seems like a pretty big shortcoming for any
>>> multi-user/multi-app environment. I will open a jira for this.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>>> wrote:
>>>
 Hi Gyula.

 I'm afraid there is no way to override all Flink configurations
 currently. SQL client yaml file can only override some of the Flink
 configurations.

 Configuration entries indeed can only set Table specific configs,
 while deployment entires are used to set the result fetching address 
 and
 port. There is currently no way to change the execution target from 
 the SQL
 client.

 Gyula Fóra  于2020年3月5日周四 下午4:31写道:

> Hi All!
>
> I am trying to understand if there is any way to override flink
> configuration parameters when starting the SQL Client.
>
> It seems that the only way to pass any parameters is through the
> environment yaml.
>
> There I found 2 possible routes:
>
> configuration: this doesn't work as it only sets Table specific
> configs apparently, but maybe I am wrong.
>
> deployment: I tried using dynamic properties options here but
> unfortunately we normalize (lowercase) the YAML keys so it is 
> impossible to
> pass options like -yD or -D.
>
> Does anyone have any suggestions?
>
> Thanks
> Gyula
>

>
> --
> Best, Jingsong Lee
>

>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: Single stream, two sinks

2020-03-05 Thread Gadi Katsovich
Guys, thanks for the great advice. It works!
I used HttpAsyncClient from Apache Commons.
At first I tried to implement the async http client by implementing
AsyncFunction. I implemented the asyncInvoke method and used
try-with-resouce to instantiate the client (because it's
CloseableHttpAsyncClient). That didn't work and I got "Async function call
has timed out" exception.
Then I followed the example in the link and had my async http client extend
RichAsyncFunction that opens and closes the http client instance in the
corresponding methods, all started to working.

On Tue, Mar 3, 2020 at 7:13 PM John Smith  wrote:

> If I understand correctly he wants the HTTP result in the DB. So I do not
> think side output works here. The DB would have to be the sink. Also sinks
> in Flink are the final destination.
>
> So it would have to be RabbitMQ -> Some Cool Business Logic Operators
> Here > Async I/O HTTP Operator -> JDBC Sink.
>
> Take look here also:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>  <--
> The Example shows database client, but you can easily replace that with
> HTTP client.
>
> But basically...
> 1- Get input from RabbitMQ Source.
> 2- Do what ever type of stream computations/business logic you need.
> 3- Use the Async I/O operator to send HTTP
> - If HTTP 200 OK create Flink record tagged as SUCESS and what ever
> other info you want. Maybe response body.
> - If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
> info.
> 4- Sink the output record from #3 to JDBC.
>
> On Sun, 1 Mar 2020 at 10:28, miki haiat  wrote:
>
>> So you have rabitmq source and http sink?
>> If so you can use side output in order to dump your data to db.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich 
>> wrote:
>>
>>> Hi,
>>> I'm new to flink and am evaluating it to replace our existing streaming
>>> application.
>>> The use case I'm working on is reading messages from RabbitMQ queue,
>>> applying some transformation and filtering logic and sending it via HTTP to
>>> a 3rd party.
>>> A must have requirement of this flow is to to write the data that was
>>> sent to an SQL db, for audit and troubleshooting purposes.
>>> I'm currently basing my HTTP solution on a PR with needed adjustments:
>>> https://github.com/apache/flink/pull/5866/files
>>> How can I add an insertion to a DB after a successful HTTP request?
>>> Thank you.
>>>
>>


Backpressure and 99th percentile latency

2020-03-05 Thread Felipe Gutierrez
Hi,

I am a bit confused about the topic of tracking latency in Flink [1]. It
says if I use the latency track I am measuring the Flink’s network stack
but application code latencies also can influence it. For instance, if I am
using the metrics.latency.granularity: operator (default) and
setLatencyTrackingInterval(1). I understand that I am tracking latency
every 10 seconds for each physical instance operator. Is that right?

In my application, I am tracking the latency of all aggregators. When I
have a high workload and I can see backpressure from the flink UI the 99th
percentile latency is 13, 25, 21, and 25 seconds. Then I set my aggregator
to have a larger window. The backpressure goes away but the 99th percentile
latency is still the same. Why? Does it have no relation with each other?

In the end I left the experiment for more than 2 hours running and only
after about 1,5 hour the 99th percentile latency got down to milliseconds.
Is that normal? Please see the figure attached.

[1]
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
Awesome Arvid, thanks a lot! :)

And I thought when doing this that I was simplifying the test ...

On Thu, Mar 5, 2020 at 8:27 PM Arvid Heise  wrote:

> Hi David,
>
> bounded sources do not work well with checkpointing. As soon as the source
> is drained, no checkpoints are performed anymore. It's an unfortunate
> limitation that we want to get rid of, but haven't found the time (because
> it requires larger changes).
>
> So for your test to work, you need to add a source that is continuously
> open, but does not output more than one element. Fortunately, there is
> already a working implementation in our test bed.
>
> https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
>
> On Thu, Mar 5, 2020 at 7:54 PM David Magalhães 
> wrote:
>
>> I've implemented a CustomSink with TwoPhaseCommit. To test this I've
>> create a test using the baselines of this [1] one, and it works fine.
>>
>> To test the integration with S3 (and with an exponential back off), I've
>> tried to implement a new test, using the following code:
>>
>> ...
>> val invalidWriter = writer
>>   .asInstanceOf[WindowParquetGenericRecordListFileSink]
>>   .copy(filePath = s"s3a://bucket_that_doesnt_exists/")
>>
>> val records: Iterable[GenericRecord] = Iterable apply {
>> new GenericData.Record(GenericRecordSchema.schema) {
>> put(KEY.name, "x")
>> put(EVENT.name, "record.value()")
>> put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
>> put(EVENT_TYPE.name, "xpto")
>> }
>> }
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> env
>>
>> .enableCheckpointing(1000)
>> .fromElements(records)
>> .addSink(invalidWriter)
>>
>> val task = executor.submit(() => env.execute("s3exponential"))
>> ...
>>
>> This will setup a small environment with one record and enable checkpoint
>> (in order for the TPC works), and then execute in another thread so the
>> test check check if the error count is increasing.
>>
>> So, the test have the following behaviour:
>>
>> If I use enableCheckpointing(10), the test passes 9 of 10 times.
>> If I use other values, like 1000, the test fails if not all the times,
>> most of the times.
>>
>> Here is a small example of the log when the test is successful.
>>
>> 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
>> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
>> 2020-03-05 16:04:40,342 DEBUG
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
>> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
>> flushing transaction
>> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
>> transactionStartTime=1583424280304}'
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
>> openedTransactions [1]
>> ### 2020-03-05 16:04:40,342 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - operation='preCommit', message='Start writing #1 records'
>>
>> When the test fails, here is some part of the log:
>>
>> 2020-03-05 16:38:44,386 DEBUG
>> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>>  - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
>> 2020-03-05 16:38:44,386 DEBUG
>> org.apache.flink.streaming.runtime.tasks.StreamTask   - Aborting
>> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
>> 2020-03-05 16:38:44,387 DEBUG
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
>> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
>> feeding buffered data back.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  -
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
>> consumed notification for subpartition 0.
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
>> Received consume notification from ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>> 2020-03-05 16:38:44,390 DEBUG
>> org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
>> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
>> ReleaseOnConsumptionResultPartition
>> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
>> [PIPELINED_BOUNDED, 2 subpartitions,

Re: Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread Arvid Heise
Hi David,

bounded sources do not work well with checkpointing. As soon as the source
is drained, no checkpoints are performed anymore. It's an unfortunate
limitation that we want to get rid of, but haven't found the time (because
it requires larger changes).

So for your test to work, you need to add a source that is continuously
open, but does not output more than one element. Fortunately, there is
already a working implementation in our test bed.
https://github.com/apache/flink/blob/master/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java

On Thu, Mar 5, 2020 at 7:54 PM David Magalhães 
wrote:

> I've implemented a CustomSink with TwoPhaseCommit. To test this I've
> create a test using the baselines of this [1] one, and it works fine.
>
> To test the integration with S3 (and with an exponential back off), I've
> tried to implement a new test, using the following code:
>
> ...
> val invalidWriter = writer
>   .asInstanceOf[WindowParquetGenericRecordListFileSink]
>   .copy(filePath = s"s3a://bucket_that_doesnt_exists/")
>
> val records: Iterable[GenericRecord] = Iterable apply {
> new GenericData.Record(GenericRecordSchema.schema) {
> put(KEY.name, "x")
> put(EVENT.name, "record.value()")
> put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
> put(EVENT_TYPE.name, "xpto")
> }
> }
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env
>
> .enableCheckpointing(1000)
> .fromElements(records)
> .addSink(invalidWriter)
>
> val task = executor.submit(() => env.execute("s3exponential"))
> ...
>
> This will setup a small environment with one record and enable checkpoint
> (in order for the TPC works), and then execute in another thread so the
> test check check if the error count is increasing.
>
> So, the test have the following behaviour:
>
> If I use enableCheckpointing(10), the test passes 9 of 10 times.
> If I use other values, like 1000, the test fails if not all the times,
> most of the times.
>
> Here is a small example of the log when the test is successful.
>
> 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
> 2020-03-05 16:04:40,342 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
> checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
> 2020-03-05 16:04:40,342 DEBUG
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
> WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
> flushing transaction
> 'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
> transactionStartTime=1583424280304}'
> ### 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
> openedTransactions [1]
> ### 2020-03-05 16:04:40,342 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - operation='preCommit', message='Start writing #1 records'
>
> When the test fails, here is some part of the log:
>
> 2020-03-05 16:38:44,386 DEBUG
> com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
>  - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
> 2020-03-05 16:38:44,386 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Aborting
> checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
> 2020-03-05 16:38:44,387 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
> Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
> feeding buffered data back.
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartition  -
> ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
> consumed notification for subpartition 0.
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
> Received consume notification from ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
> 2020-03-05 16:38:44,390 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
> Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
> ReleaseOnConsumptionResultPartition
> 8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
> [PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
>
> Not sure why this behaviour changes with the time for each checkpoint, but
> until now I didn't find the reason why "pre commit" isn't execute
>
> Does anyone have any thought, something that I'm missing ?
>
> [1]
> https://github.com/apache/

Re: Very large _metadata file

2020-03-05 Thread Jacob Sevart
Thanks, I will monitor that thread.

I'm having a hard time following the serialization code, but if you know
anything about the layout, tell me if this makes sense. What I see in the
hex editor is, first, many HDFS paths. Then gigabytes of unreadable data.
Then finally another HDFS path at the end.

If it is putting state in there, under normal circumstances, does it make
sense that it would be interleaved with metadata? I would expect all the
metadata to come first, and then state.

Jacob



Jacob

On Thu, Mar 5, 2020 at 10:53 AM Kostas Kloudas  wrote:

> Hi Jacob,
>
> As I said previously I am not 100% sure what can be causing this
> behavior, but this is a related thread here:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d-2540-253Cuser.flink.apache.org-253E&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=P3Xd0IFKJTDIG2MMeP-hOSfY4ohoCEUMQEJhvGecSlI&e=
>
> Which you can re-post your problem and monitor for answers.
>
> Cheers,
> Kostas
>
> On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
> >
> > Kostas and Gordon,
> >
> > Thanks for the suggestions! I'm on RocksDB. We don't have that setting
> configured so it should be at the default 1024b. This is the full "state.*"
> section showing in the JobManager UI.
> >
> >
> >
> > Jacob
> >
> > On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai 
> wrote:
> >>
> >> Hi Jacob,
> >>
> >> Apart from what Klou already mentioned, one slightly possible reason:
> >>
> >> If you are using the FsStateBackend, it is also possible that your
> state is small enough to be considered to be stored inline within the
> metadata file.
> >> That is governed by the "state.backend.fs.memory-threshold"
> configuration, with a default value of 1024 bytes, or can also be
> configured with the `fileStateSizeThreshold` argument when constructing the
> `FsStateBackend`.
> >> The purpose of that threshold is to ensure that the backend does not
> create a large amount of very small files, where potentially the file
> pointers are actually larger than the state itself.
> >>
> >> Cheers,
> >> Gordon
> >>
> >>
> >>
> >> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas 
> wrote:
> >>>
> >>> Hi Jacob,
> >>>
> >>> Could you specify which StateBackend you are using?
> >>>
> >>> The reason I am asking is that, from the documentation in [1]:
> >>>
> >>> "Note that if you use the MemoryStateBackend, metadata and savepoint
> >>> state will be stored in the _metadata file. Since it is
> >>> self-contained, you may move the file and restore from any location."
> >>>
> >>> I am also cc'ing Gordon who may know a bit more about state formats.
> >>>
> >>> I hope this helps,
> >>> Kostas
> >>>
> >>> [1]
> https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.6_ops_state_savepoints.html&d=DwIBaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=lTq5mEceM-U-tVfWzKBngg&m=awEv6FqKY6dZ8NIA4KEFc_qQ6aadR_jTAWnO17wtAus&s=fw0c-Ct21HHJv4MzZRicIaltqHLQOrNvqchzNgCdwkA&e=
> >>>
> >>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
> >>> >
> >>> > Per the documentation:
> >>> >
> >>> > "The meta data file of a Savepoint contains (primarily) pointers to
> all files on stable storage that are part of the Savepoint, in form of
> absolute paths."
> >>> >
> >>> > I somehow have a _metadata file that's 1.9GB. Running strings on it
> I find 962 strings, most of which look like HDFS paths, which leaves a lot
> of that file-size unexplained. What else is in there, and how exactly could
> this be happening?
> >>> >
> >>> > We're running 1.6.
> >>> >
> >>> > Jacob
> >
> >
> >
> > --
> > Jacob Sevart
> > Software Engineer, Safety
>


-- 
Jacob Sevart
Software Engineer, Safety


Weird behaviour testing TwoPhaseCommit

2020-03-05 Thread David Magalhães
I've implemented a CustomSink with TwoPhaseCommit. To test this I've create
a test using the baselines of this [1] one, and it works fine.

To test the integration with S3 (and with an exponential back off), I've
tried to implement a new test, using the following code:

...
val invalidWriter = writer
  .asInstanceOf[WindowParquetGenericRecordListFileSink]
  .copy(filePath = s"s3a://bucket_that_doesnt_exists/")

val records: Iterable[GenericRecord] = Iterable apply {
new GenericData.Record(GenericRecordSchema.schema) {
put(KEY.name, "x")
put(EVENT.name, "record.value()")
put(LOGGER_TIMESTAMP.name, "2020-01-01T02:22:23.123456Z")
put(EVENT_TYPE.name, "xpto")
}
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env

.enableCheckpointing(1000)
.fromElements(records)
.addSink(invalidWriter)

val task = executor.submit(() => env.execute("s3exponential"))
...

This will setup a small environment with one record and enable checkpoint
(in order for the TPC works), and then execute in another thread so the
test check check if the error count is increasing.

So, the test have the following behaviour:

If I use enableCheckpointing(10), the test passes 9 of 10 times.
If I use other values, like 1000, the test fails if not all the times, most
of the times.

Here is a small example of the log when the test is successful.

2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1]
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   - Starting
checkpoint (5) CHECKPOINT on task Sink: Unnamed (2/2)
2020-03-05 16:04:40,342 DEBUG
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  -
WindowParquetGenericRecordListFileSink 1/2 - checkpoint 5 triggered,
flushing transaction
'TransactionHolder{handle=f3b667c1-8d75-4ab8-9cab-71dfa6a71271,
transactionStartTime=1583424280304}'
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Pre Commit - f3b667c1-8d75-4ab8-9cab-71dfa6a71271 [1] -
openedTransactions [1]
### 2020-03-05 16:04:40,342 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - operation='preCommit', message='Start writing #1 records'

When the test fails, here is some part of the log:

2020-03-05 16:38:44,386 DEBUG
com.talkdesk.automatebackfill.kratos.writer.WindowParquetGenericRecordListFileSink
 - Invoke - 7c5dd046-54b8-4b13-b360-397da6743b3b [1]
2020-03-05 16:38:44,386 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   - Aborting
checkpoint via cancel-barrier 5 for task Sink: Unnamed (1/2)
2020-03-05 16:38:44,387 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  - Sink:
Unnamed (1/2) (e5699032cb2eb874dcff740b6b7b62f1): End of stream alignment,
feeding buffered data back.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  -
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions]: Received
consumed notification for subpartition 0.
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  -
Received consume notification from ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].
2020-03-05 16:38:44,390 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
Collection Source (1/1) (b32dcfe3631e86d18cb893a258a6f0f9): Releasing
ReleaseOnConsumptionResultPartition
8a4be598a77d12582a92096db8e6319c@b32dcfe3631e86d18cb893a258a6f0f9
[PIPELINED_BOUNDED, 2 subpartitions, 0 pending consumptions].

Not sure why this behaviour changes with the time for each checkpoint, but
until now I didn't find the reason why "pre commit" isn't execute

Does anyone have any thought, something that I'm missing ?

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java


Re: Very large _metadata file

2020-03-05 Thread Kostas Kloudas
Hi Jacob,

As I said previously I am not 100% sure what can be causing this
behavior, but this is a related thread here:
https://lists.apache.org/thread.html/r3bfa2a3368a9c7850cba778e4decfe4f6dba9607f32addb69814f43d%40%3Cuser.flink.apache.org%3E

Which you can re-post your problem and monitor for answers.

Cheers,
Kostas

On Wed, Mar 4, 2020 at 7:02 PM Jacob Sevart  wrote:
>
> Kostas and Gordon,
>
> Thanks for the suggestions! I'm on RocksDB. We don't have that setting 
> configured so it should be at the default 1024b. This is the full "state.*" 
> section showing in the JobManager UI.
>
>
>
> Jacob
>
> On Wed, Mar 4, 2020 at 2:45 AM Tzu-Li (Gordon) Tai  
> wrote:
>>
>> Hi Jacob,
>>
>> Apart from what Klou already mentioned, one slightly possible reason:
>>
>> If you are using the FsStateBackend, it is also possible that your state is 
>> small enough to be considered to be stored inline within the metadata file.
>> That is governed by the "state.backend.fs.memory-threshold" configuration, 
>> with a default value of 1024 bytes, or can also be configured with the 
>> `fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
>> The purpose of that threshold is to ensure that the backend does not create 
>> a large amount of very small files, where potentially the file pointers are 
>> actually larger than the state itself.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:
>>>
>>> Hi Jacob,
>>>
>>> Could you specify which StateBackend you are using?
>>>
>>> The reason I am asking is that, from the documentation in [1]:
>>>
>>> "Note that if you use the MemoryStateBackend, metadata and savepoint
>>> state will be stored in the _metadata file. Since it is
>>> self-contained, you may move the file and restore from any location."
>>>
>>> I am also cc'ing Gordon who may know a bit more about state formats.
>>>
>>> I hope this helps,
>>> Kostas
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>>>
>>> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
>>> >
>>> > Per the documentation:
>>> >
>>> > "The meta data file of a Savepoint contains (primarily) pointers to all 
>>> > files on stable storage that are part of the Savepoint, in form of 
>>> > absolute paths."
>>> >
>>> > I somehow have a _metadata file that's 1.9GB. Running strings on it I 
>>> > find 962 strings, most of which look like HDFS paths, which leaves a lot 
>>> > of that file-size unexplained. What else is in there, and how exactly 
>>> > could this be happening?
>>> >
>>> > We're running 1.6.
>>> >
>>> > Jacob
>
>
>
> --
> Jacob Sevart
> Software Engineer, Safety


Re: StreamingFileSink Not Flushing All Data

2020-03-05 Thread Kostas Kloudas
Thanks Austin,

If the CompressionWriterFactory works for you in 1.10, then you can copy it
as is in 1.9 and use it. The BulkWriter interfaces have not changed between
the versions (as far as I recall). But please keep in mind that there is a
bug in the CompressWriterFactory with a pending PR that fixes it (
https://github.com/apache/flink/pull/11307). So if you copy and try to use
it please include the patch from that PR.

As for the documentation, if you are willing to contribute that would be a
great help. You can open an issue and submit a PR with an example, as done
for the other bulk formats in the documentation here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bulk-encoded-formats

Let us know if it works for you!

Cheers,
Kostas

On Thu, Mar 5, 2020 at 1:43 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey Kostas,
>
> We’re a little bit off from a 1.10 update but I can certainly see if that
> CompressWriterFactory might solve my use case for when we do.
>
> If there is anything I can do to help document that feature, please let me
> know.
>
> Thanks!
>
> Austin
>
> On Wed, Mar 4, 2020 at 4:58 AM Kostas Kloudas  wrote:
>
>> Hi Austin,
>>
>> I will have a look at your repo. In the meantime, given that [1] is
>> already merged in 1.10,
>> would upgrading to 1.10 and using the newly introduced
>> CompressWriterFactory be an option for you?
>>
>> It is unfortunate that this feature was not documented.
>>
>> Cheers,
>> Kostas
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13634
>>
>>
>> On Tue, Mar 3, 2020 at 11:13 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy
>>> Kostas -- strange though, as I wasn't using a bounded source when I first
>>> ran into this issue. I have updated the example repo to use an unbounded
>>> source[1], and the same file corruption problems remain.
>>>
>>> Anything else I could be doing wrong with the compression stream?
>>>
>>> Thanks again,
>>> Austin
>>>
>>> [1]:
>>> https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded
>>>
>>> On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas 
>>> wrote:
>>>
 Hi Austin and Rafi,

 @Rafi Thanks for providing the pointers!
 Unfortunately there is no progress on the FLIP (or the issue).

 @ Austin In the meantime, what you could do --assuming that your input
 is bounded --  you could simply not stop the job after the whole input is
 processed, then wait until the output is committed, and then cancel the
 job. I know and I agree that this is not an elegant solution but it is a
 temporary workaround.

 Hopefully the FLIP and related issue is going to be prioritised soon.

 Cheers,
 Kostas

 On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:

> Hi,
>
> This happens because StreamingFileSink does not support a finite input
> stream.
> In the docs it's mentioned under "Important Considerations":
>
> [image: image.png]
>
> This behaviour often surprises users...
>
> There's a FLIP
> 
>  and
> an issue  about
> fixing this. I'm not sure what's the status though, maybe Kostas can 
> share.
>
> Thanks,
> Rafi
>
>
> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Dawid and Kostas,
>>
>> Sorry for the late reply + thank you for the troubleshooting. I put
>> together an example repo that reproduces the issue[1], because I did have
>> checkpointing enabled in my previous case -- still must be doing 
>> something
>> wrong with that config though.
>>
>> Thanks!
>> Austin
>>
>> [1]:
>> https://github.com/austince/flink-streaming-file-sink-compression
>>
>>
>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
>> wrote:
>>
>>> Hi Austin,
>>>
>>> Dawid is correct in that you need to enable checkpointing for the
>>> StreamingFileSink to work.
>>>
>>> I hope this solves the problem,
>>> Kostas
>>>
>>> On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
>>>  wrote:
>>> >
>>> > Hi Austing,
>>> >
>>> > If I am not mistaken the StreamingFileSink by default flushes on
>>> checkpoints. If you don't have checkpoints enabled it might happen that 
>>> not
>>> all data is flushed.
>>> >
>>> > I think you can also adjust that behavior with:
>>> >
>>> > forBulkFormat(...)
>>> >
>>> > .withRollingPolicy(/* your custom logic */)
>>> >
>>> > I also cc Kostas who should be able to correct me if I am wrong.
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>>

Re: Flink Deployment failing with RestClientException

2020-03-05 Thread Andrey Zagrebin
Hi Samir,

It may be a known issue [1][2] where some action during job submission takes 
too long time but eventually completes in job manager.
Have you checked job manager logs whether there are any other failures, not 
“Ask timed out"?
Have you checked Web UI whether all the jobs have been started in fact despite 
the client error?

Best,
Andrey

[1] https://issues.apache.org/jira/browse/FLINK-16429 

[2] https://issues.apache.org/jira/browse/FLINK-16018 


> On 5 Mar 2020, at 17:49, Samir Tusharbhai Chauhan 
>  wrote:
> 
> Hi,
> 
> I am having issue where after deploying few jobs, it starts failing with 
> below errors. I don’t have such issue in other environments. What should I 
> check first in such scenario?
> 
> My environment is
> Azure Kubernetes 1.15.7
> Flink 1.6.0
> Zookeeper 3.4.10
>  
> 
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not submit 
> job (JobID: e83db2da358db355ccdcf6740c6bb134)
> at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:249)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
> at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:432)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
> at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Exception is not retryable.
> at 
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at 
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at 
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at 
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 12 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Exception is not retryable.
> ... 10 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.rest.util.RestClientException: [Job submission 
> failed.]
> at 
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at 
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at 
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)

How do I get the value of 99th latency inside an operator?

2020-03-05 Thread Felipe Gutierrez
Hi community,

where from the Dlink code I can get the value of 99th percentile latency
(flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{operator_id="93352199ce18d8917f20fdf82cedb1b4",quantile="0.99"})?

Probably I will have to hack the Flink source code to export those values
to my own operator. Nevertheless, it is what I need.

Kind Regards,
Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Flink Deployment failing with RestClientException

2020-03-05 Thread Samir Tusharbhai Chauhan
Hi,
I am having issue where after deploying few jobs, it starts failing with below 
errors. I don't have such issue in other environments. What should I check 
first in such scenario?
My environment is
Azure Kubernetes 1.15.7
Flink 1.6.0
Zookeeper 3.4.10

The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not submit 
job (JobID: e83db2da358db355ccdcf6740c6bb134)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:249)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:432)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:379)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Exception is not retryable.
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Exception is not retryable.
... 10 more
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at 
java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job 
submission failed.]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:310)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:294)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more


More errors
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#-1725880087]] after [1 ms]. 
Sender[null] sent message of type 
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at 
akka.pattern.PromiseAc

RE: Re: Teradata as JDBC Connection

2020-03-05 Thread Norm Vilmer (Contractor)
Thanks for the reply, Arvid. I changed the property names in my 
ConnectorDescriptor subclass to match what the validator wanted and now get:

“Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
Unsupported property keys:
update-mode”

The method suggested in the link you sent, registerTableSink, is deprecated in 
1.10, so I was trying to use the following:

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

. . .

tableEnv.connect(new Teradata())
.withSchema(new Schema()
.field("f0", DataTypes.VARCHAR(25))
.field("f1", DataTypes.VARCHAR(10240))
.field("f2", DataTypes.VARCHAR(10240)))
.inAppendMode()
.createTemporaryTable("staging");

Table table = tableEnv.fromDataStream(reportData);
table.insertInto("staging");

Using the connect() method, I can see that the code attempts to use the 
JDBCTableSourceSinkFactory, but does not like ‘update-mode’.

Do you have an example using connect() method? Thanks.

From: Arvid Heise 
Sent: Thursday, March 5, 2020 1:15 AM
To: Norm Vilmer (Contractor) 
Cc: user@flink.apache.org
Subject: EXTERNAL - Re: Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/attachments or replying with sensitive data. If suspicious, forward to 
'suspici...@wnco.com'.

Hi Norm,

the error message already points to the main issue: your property names are not 
correct.
Unsupported property keys:
drivername
update-mode
password
dburl
username

You should use the builder to properly configure the sink [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbcappendtablesink

On Thu, Mar 5, 2020 at 12:58 AM Norm Vilmer (Contractor) 
mailto:norm.vil...@wnco.com>> wrote:
Same error with this change:

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put(JDBCValidator.CONNECTOR_DRIVER, "com.teradata.jdbc.TeraDriver");
map.put(JDBCValidator.CONNECTOR_URL, 
"jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put(JDBCValidator.CONNECTOR_USERNAME, "...");
map.put(JDBCValidator.CONNECTOR_PASSWORD, "...!");
return map;
}
}

-Original Message-
From: Norm Vilmer (Contractor) 
mailto:norm.vil...@wnco.com>>
Sent: Wednesday, March 4, 2020 10:37 AM
To: user@flink.apache.org
Subject: EXTERNAL - Teradata as JDBC Connection

Caution: Sender is from outside SWA. Take caution before opening 
links/attachments or replying with sensitive data. If suspicious, forward to 
'suspici...@wnco.com'.

Using Flink 1.10 and coding in Java 11, is it possible use to write to Teradata 
in append mode? MySQL, PostgreSQL, and Derby are the only supported drivers 
listed. Thanks.

https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_dev_table_connect.html-23connectors&d=DwIFAg&c=dyyteaO_66X5RejcGgaVFCWGX8V6S6CQobBcYjo__mc&r=a8BqCmWrJ1FuU14JVrlQLeWdeeSBWSiCJA9Y5xTWafg&m=kfV3arAbKYvpd5IvCtggkHsoDXKTgA1RrGMWrbcWZOo&s=n91D15kGNf9TDtKedGYD8EfDYxnvEzY8POgNtSE-icY&e=

I created the ConnectorDescriptor below and am using it from 
tableEnvironment.connect() but get the exception shown below.

public class Teradata extends ConnectorDescriptor {
/**
 * Constructs a {@link ConnectorDescriptor}.
 */
public Teradata() {
super("jdbc", 1, false);
}

@Override
protected Map toConnectorProperties() {
Map map = new HashMap<>();
map.put("Drivername", "com.teradata.jdbc.TeraDriver");
map.put("DBUrl", "jdbc:teradata://myserver/database=mydb,LOGMECH=LDAP");
map.put("Username", "...");
map.put("Password", "...");
return map;
}
}

org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' 
in the classpath.

Reason: No factory supports all properties.

The matching can

Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
thanks! I was wondering why the operator name is not implemented for the
latency metrics, because for the other metrics it is implemented.
but thanks anyway!
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Mar 5, 2020 at 2:06 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Felipe,
>
> Please find the answers to your questions below.
>
> > Each "operator_subtask_index" means each instance of the parallel
> physical operator, doesn't it?
> Yes.
> > How can I set a fixed ID for the "operator_id" in my code so I can
> identify quickly which operator I am measuring?
> You are using the correct api (uid(...))
> > What is the hash function used so I can identify my operator?
> Flink uses
> https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)
>
> Regards,
> Roman
>
>
> On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi community,
>>
>> I am tracking the latency of operators in Flink according to this
>> reference [1]. When I am using Prometheus+Grafana I can issue a query using
>> "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
>> and I can check the percentiles of each "operator_id" and each
>> "operator_subtask_index". Each "operator_subtask_index" means each instance
>> of the parallel physical operator, doesn't it?
>>
>> How can I set a fixed ID for the "operator_id" in my code so I can
>> identify quickly which operator I am measuring? I used "map(new
>> MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash
>> function that converts the string to a hash value. What is the hash
>> function used so I can identify my operator? I know that I can use the Rest
>> API [2] and if I name my operator it will have always the same hash when I
>> restart the job, but I would like to set its name.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
>> *-*
>> *- Felipe Gutierrez*
>>
>> *- skype: felipe.o.gutierrez*
>> *- **https://felipeogutierrez.blogspot.com
>> * *
>> *
>>
>


Re: checkpoint _metadata file has >20x different in size among different check-points

2020-03-05 Thread Congxian Qiu
Hi

Maybe there contains some ByteStreamStateHandle in the checkpoint, if you
want to verify this, maybe you can configure
`state.backend.fs.memory-threshold` to verify it. Please be careful to set
this config, because it may produce many files with small size.

Best,
Congxian


Arvid Heise  于2020年3月5日周四 上午2:26写道:

> Hi Yu,
>
> are you using incremental checkpoints [1]? If so, then the smaller
> checkpoints would be the deltas and the larger the complete state.
>
> [1]
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
> On Wed, Mar 4, 2020 at 6:41 PM Yu Yang  wrote:
>
>> Hi all,
>>
>> We have a flink job that does check-pointing per 10 minutes. We noticed
>> that for the check-points of this job,  the _metadata file size can vary a
>> lot. In some checkpoint, we observe that _metadata file size was >900MB,
>> while in some other check-points of the same job, the _metadata file size
>> is < 4MB.  Any insights on what may cause the difference?
>>
>> Thank you!
>>
>> Regards,
>> -Yu
>>
>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Kurt can you please explain which conf parameters do you mean?

In regular executions (Yarn for instance) we  have dynamic config
parameters overriding any flink-conf argument.
So it is not about setting them in the user code but it should happen
before the ClusterDescriptors are created (ie in the together with the
CustomCommandLine logic)

Gyula

On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:

> IIRC the tricky thing here is not all the config options belong to
> flink-conf.yaml can be adjust dynamically in user's program.
> So it will end up like some of the configurations can be overridden but
> some are not. The experience is not quite good for users.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>
>> Hi Gyula,
>>
>> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
>> that user could override any features in flink-conf.yaml. (Actually any
>> features here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
>> Of course you can run flink sql in Zeppelin, and could also leverage other
>> features of Zeppelin, like visualization.
>>
>> If you are interested, you could try the master branch of Zeppelin + this
>> improvement PR
>>
>> https://github.com/apache/zeppelin
>> https://github.com/apache/zeppelin/pull/3676
>> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>>
>>
>>
>>
>>
>>
>> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>>
>>> I could basically list a few things I want to set (execution.target for
>>> example), but it's fair to assume that I would like to be able to set
>>> anything :)
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>>> wrote:
>>>
 Hi Gyula,

 Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
 which planner do you use?

 But "StreamExecutionEnvironment.configure" is only for partial
 configuration, can not for all configuration in flink-conf.yaml.
 So what's the config do you want to set? I know some config like
 "taskmanager.network.blocking-shuffle.compression.enabled" can not set

 Best,
 Jingsong Lee

 On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:

> Hi Gyula,
>
> Flink configurations can be overrided via
> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
> Table
> specific configs.
> I will think it as a bug/improvement of SQL CLI which should be fixed
> in 1.10.1.
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>
>> Thanks Caizhi,
>>
>> This seems like a pretty big shortcoming for any multi-user/multi-app
>> environment. I will open a jira for this.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>> wrote:
>>
>>> Hi Gyula.
>>>
>>> I'm afraid there is no way to override all Flink configurations
>>> currently. SQL client yaml file can only override some of the Flink
>>> configurations.
>>>
>>> Configuration entries indeed can only set Table specific configs,
>>> while deployment entires are used to set the result fetching address and
>>> port. There is currently no way to change the execution target from the 
>>> SQL
>>> client.
>>>
>>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>>
 Hi All!

 I am trying to understand if there is any way to override flink
 configuration parameters when starting the SQL Client.

 It seems that the only way to pass any parameters is through the
 environment yaml.

 There I found 2 possible routes:

 configuration: this doesn't work as it only sets Table specific
 configs apparently, but maybe I am wrong.

 deployment: I tried using dynamic properties options here but
 unfortunately we normalize (lowercase) the YAML keys so it is 
 impossible to
 pass options like -yD or -D.

 Does anyone have any suggestions?

 Thanks
 Gyula

>>>

 --
 Best, Jingsong Lee

>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
I see, maybe I just dont understand how to properly express what I am
trying to compute.

Basically I want to aggregate the quantities of the transactions that
happened in the 5 seconds before the query.
Every query.id belongs to a single query (event_time, itemid) but still I
have to group :/

Gyula

On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:

> I think the issue is not caused by event time interval join, but the
> aggregation after the join:
> GROUP BY t.itemId, q.event_time, q.queryId;
>
> In this case, there is still no chance for Flink to determine whether the
> groups like (itemId, eventtime, queryId) have complete data or not.
> As a comparison, if you change the grouping key to a window which based
> only on q.event_time, then the query would emit insert only results.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:
>
>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>> delete messages").
>>
>> As for the data completion, in my above example it is basically an event
>> time interval join.
>> With watermarks defined Flink should be able to compute results once in
>> exactly the same way as for the tumbling window.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>>
>>> Back to this case, I assume you are expecting something like "ignore all
>>> delete messages" flag? With this
>>> flag turned on, Flink will only send insert messages which corresponding
>>> current correct results to kafka and
>>> drop all retractions and deletes on the fly.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>>
 > I also don't completely understand at this point why I can write the
 result of a group, tumble window aggregate to Kafka and not this window
 join / aggregate.

 If you are doing a tumble window aggregate with watermark enabled,
 Flink will only fire a final result for
 each window at once, no modification or retractions will happen after a
 window is calculated and fired.
 But with some other arbitrary aggregations, there is not enough
 information for Flink to determine whether
 the data is complete or not, so the framework will keep calculating
 results when receiving new records and
 retract earlier results by firing retraction/deletion messages.

 Best,
 Kurt


 On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
 wrote:

> Thanks Benoît!
>
> I can see now how I can implement this myself through the provided
> sink interfaces but I was trying to avoid having to write code for this :D
> My initial motivation was to see whether we are able to write out any
> kind of table to Kafka as a simple stream of "upserts".
>
> I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> Cheers,
> Gyula
>
> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi Gyula,
>>
>> I'm afraid conversion to see the retractions vs inserts can't be done
>> in pure SQL (though I'd love that feature).
>>
>> You might want to go lower level and implement a
>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>> [3]. This will give you a emitDataStream(DataStream>
>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>> 'retract' signal.
>> You can then filter the DataStream accordingly before passing to the
>> KafkaTableSink.
>>
>> Hope this helps.
>>
>> Best regards
>> Benoît
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>
>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra 
>> wrote:
>>
>>> Hi Roman,
>>>
>>> This is the core logic:
>>>
>>> CREATE TABLE QueryResult (
>>> queryIdBIGINT,
>>>   itemIdSTRING,
>>>   quantity INT
>>> ) WITH (
>>> 'connector.type' = 'kafka',
>>> 'connector.version' = 'universal',
>>> 'connector.topic'   = 'query.output.log.1',
>>> 'connector.properties.bootstrap.servers' = '',
>>> 'format.type' = 'json'
>>> );
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GR

Re: java.time.LocalDateTime in POJO type

2020-03-05 Thread KristoffSC
Thanks,
do you have any example how I could use it?

Basically I have a POJO class that has LocalDateTime filed in it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to use self defined json format when create table from kafka stream?

2020-03-05 Thread Kurt Young
User defined formats also sounds like an interesting extension.

Best,
Kurt


On Thu, Mar 5, 2020 at 3:06 PM Jark Wu  wrote:

> Hi Lei,
>
> Currently, Flink SQL doesn't support to register a binlog format (i.e.
> just define "order_id" and "order_no", but the json schema has other binlog
> fields).
> This is exactly what we want to support in FLIP-105 [1] and FLIP-95.
>
> For now, if you want to consume such json data, you have to define the
> full schema, e.g. "type", "timestmap", and so on...
>
> Btw, what Change Data Capture (CDC) tool are you using?
>
> Best,
> Jark
>
> [1]:
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>
>
> On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> I want to rigister a table from mysql binlog like this:
>>
>> tEnv.sqlUpdate("CREATE TABLE order(\n"
>> + "order_id BIGINT,\n"
>> + "order_no VARCHAR,\n"
>> + ") WITH (\n"
>> + "'connector.type' = 'kafka',\n"
>> ...
>> + "'update-mode' = 'append',\n"
>> + "'format.type' = 'json',\n"
>> + "'format.derive-schema' = 'true'\n"
>> + ")");
>>
>> using the following log format:
>>
>> {
>>   "type" : "update",
>>   "timestamp" : 1583373066000,
>>   "binlog_filename" : "mysql-bin.000453",
>>   "binlog_position" : 923020943,
>>   "database" : "wms",
>>   "table_name" : "t_pick_order",
>>   "table_id" : 131936,
>>   "columns" : [ {
>> "id" : 1,
>> "name" : "order_id",
>> "column_type" : -5,
>> "last_value" : 4606458,
>> "value" : 4606458
>>   }, {
>> "id" : 2,
>> "name" : "order_no",
>> "column_type" : 12,
>> "last_value" : "EDBMFSJ1S2003050006628",
>> "value" : "EDBMFSJ1S2003050006628"
>>   }]
>> }
>>
>>
>> Surely the format.type' = 'json',\n" will not parse the result as I
>> expected.
>> Is there any method I can implement this? For example, using a self
>> defined format class.
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
IIRC the tricky thing here is not all the config options belong to
flink-conf.yaml can be adjust dynamically in user's program.
So it will end up like some of the configurations can be overridden but
some are not. The experience is not quite good for users.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:

> Hi Gyula,
>
> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
> that user could override any features in flink-conf.yaml. (Actually any
> features here
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
> Of course you can run flink sql in Zeppelin, and could also leverage other
> features of Zeppelin, like visualization.
>
> If you are interested, you could try the master branch of Zeppelin + this
> improvement PR
>
> https://github.com/apache/zeppelin
> https://github.com/apache/zeppelin/pull/3676
> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>
>
>
>
>
>
> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>
>> I could basically list a few things I want to set (execution.target for
>> example), but it's fair to assume that I would like to be able to set
>> anything :)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
>>> which planner do you use?
>>>
>>> But "StreamExecutionEnvironment.configure" is only for partial
>>> configuration, can not for all configuration in flink-conf.yaml.
>>> So what's the config do you want to set? I know some config like
>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>
 Hi Gyula,

 Flink configurations can be overrided via
 `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
 specific configs.
 I will think it as a bug/improvement of SQL CLI which should be fixed
 in 1.10.1.

 Best,
 Jark

 On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:

> Thanks Caizhi,
>
> This seems like a pretty big shortcoming for any multi-user/multi-app
> environment. I will open a jira for this.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
> wrote:
>
>> Hi Gyula.
>>
>> I'm afraid there is no way to override all Flink configurations
>> currently. SQL client yaml file can only override some of the Flink
>> configurations.
>>
>> Configuration entries indeed can only set Table specific configs,
>> while deployment entires are used to set the result fetching address and
>> port. There is currently no way to change the execution target from the 
>> SQL
>> client.
>>
>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>
>>> Hi All!
>>>
>>> I am trying to understand if there is any way to override flink
>>> configuration parameters when starting the SQL Client.
>>>
>>> It seems that the only way to pass any parameters is through the
>>> environment yaml.
>>>
>>> There I found 2 possible routes:
>>>
>>> configuration: this doesn't work as it only sets Table specific
>>> configs apparently, but maybe I am wrong.
>>>
>>> deployment: I tried using dynamic properties options here but
>>> unfortunately we normalize (lowercase) the YAML keys so it is 
>>> impossible to
>>> pass options like -yD or -D.
>>>
>>> Does anyone have any suggestions?
>>>
>>> Thanks
>>> Gyula
>>>
>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
I think the issue is not caused by event time interval join, but the
aggregation after the join:
GROUP BY t.itemId, q.event_time, q.queryId;

In this case, there is still no chance for Flink to determine whether the
groups like (itemId, eventtime, queryId) have complete data or not.
As a comparison, if you change the grouping key to a window which based
only on q.event_time, then the query would emit insert only results.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:

> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
> delete messages").
>
> As for the data completion, in my above example it is basically an event
> time interval join.
> With watermarks defined Flink should be able to compute results once in
> exactly the same way as for the tumbling window.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>
>> Back to this case, I assume you are expecting something like "ignore all
>> delete messages" flag? With this
>> flag turned on, Flink will only send insert messages which corresponding
>> current correct results to kafka and
>> drop all retractions and deletes on the fly.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>
>>> > I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> If you are doing a tumble window aggregate with watermark enabled, Flink
>>> will only fire a final result for
>>> each window at once, no modification or retractions will happen after a
>>> window is calculated and fired.
>>> But with some other arbitrary aggregations, there is not enough
>>> information for Flink to determine whether
>>> the data is complete or not, so the framework will keep calculating
>>> results when receiving new records and
>>> retract earlier results by firing retraction/deletion messages.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:
>>>
 Thanks Benoît!

 I can see now how I can implement this myself through the provided sink
 interfaces but I was trying to avoid having to write code for this :D
 My initial motivation was to see whether we are able to write out any
 kind of table to Kafka as a simple stream of "upserts".

 I also don't completely understand at this point why I can write the
 result of a group, tumble window aggregate to Kafka and not this window
 join / aggregate.

 Cheers,
 Gyula

 On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
 benoit.pa...@centraliens-lille.org> wrote:

> Hi Gyula,
>
> I'm afraid conversion to see the retractions vs inserts can't be done
> in pure SQL (though I'd love that feature).
>
> You might want to go lower level and implement a
> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
> [3]. This will give you a emitDataStream(DataStream>
> dataStream);, in which the Boolean flag will give you an 'accumulate' or
> 'retract' signal.
> You can then filter the DataStream accordingly before passing to the
> KafkaTableSink.
>
> Hope this helps.
>
> Best regards
> Benoît
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>
> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra 
> wrote:
>
>> Hi Roman,
>>
>> This is the core logic:
>>
>> CREATE TABLE QueryResult (
>> queryIdBIGINT,
>>   itemIdSTRING,
>>   quantity INT
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic'   = 'query.output.log.1',
>> 'connector.properties.bootstrap.servers' = '',
>> 'format.type' = 'json'
>> );
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>> q.event_time
>> GROUP BY
>>   t.itemId, q.event_time, q.queryId;
>>
>> And the error I get is:
>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>> SQL update statement.
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecuto

Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
That's exactly the kind of behaviour I am looking for Kurt ("ignore all
delete messages").

As for the data completion, in my above example it is basically an event
time interval join.
With watermarks defined Flink should be able to compute results once in
exactly the same way as for the tumbling window.

Gyula

On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:

> Back to this case, I assume you are expecting something like "ignore all
> delete messages" flag? With this
> flag turned on, Flink will only send insert messages which corresponding
> current correct results to kafka and
> drop all retractions and deletes on the fly.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>
>> > I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> If you are doing a tumble window aggregate with watermark enabled, Flink
>> will only fire a final result for
>> each window at once, no modification or retractions will happen after a
>> window is calculated and fired.
>> But with some other arbitrary aggregations, there is not enough
>> information for Flink to determine whether
>> the data is complete or not, so the framework will keep calculating
>> results when receiving new records and
>> retract earlier results by firing retraction/deletion messages.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:
>>
>>> Thanks Benoît!
>>>
>>> I can see now how I can implement this myself through the provided sink
>>> interfaces but I was trying to avoid having to write code for this :D
>>> My initial motivation was to see whether we are able to write out any
>>> kind of table to Kafka as a simple stream of "upserts".
>>>
>>> I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
 Hi Gyula,

 I'm afraid conversion to see the retractions vs inserts can't be done
 in pure SQL (though I'd love that feature).

 You might want to go lower level and implement a RetractStreamTableSink
 [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
 a emitDataStream(DataStream> dataStream);, in which the
 Boolean flag will give you an 'accumulate' or 'retract' signal.
 You can then filter the DataStream accordingly before passing to the
 KafkaTableSink.

 Hope this helps.

 Best regards
 Benoît

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
 [3]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html

 On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:

> Hi Roman,
>
> This is the core logic:
>
> CREATE TABLE QueryResult (
> queryIdBIGINT,
>   itemIdSTRING,
>   quantity INT
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic'   = 'query.output.log.1',
> 'connector.properties.bootstrap.servers' = '',
> 'format.type' = 'json'
> );
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
> q.event_time
> GROUP BY
>   t.itemId, q.event_time, q.queryId;
>
> And the error I get is:
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
> SQL update statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableExceptio

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Back to this case, I assume you are expecting something like "ignore all
delete messages" flag? With this
flag turned on, Flink will only send insert messages which corresponding
current correct results to kafka and
drop all retractions and deletes on the fly.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:

> > I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> If you are doing a tumble window aggregate with watermark enabled, Flink
> will only fire a final result for
> each window at once, no modification or retractions will happen after a
> window is calculated and fired.
> But with some other arbitrary aggregations, there is not enough
> information for Flink to determine whether
> the data is complete or not, so the framework will keep calculating
> results when receiving new records and
> retract earlier results by firing retraction/deletion messages.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:
>
>> Thanks Benoît!
>>
>> I can see now how I can implement this myself through the provided sink
>> interfaces but I was trying to avoid having to write code for this :D
>> My initial motivation was to see whether we are able to write out any
>> kind of table to Kafka as a simple stream of "upserts".
>>
>> I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi Gyula,
>>>
>>> I'm afraid conversion to see the retractions vs inserts can't be done in
>>> pure SQL (though I'd love that feature).
>>>
>>> You might want to go lower level and implement a RetractStreamTableSink
>>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>>> a emitDataStream(DataStream> dataStream);, in which the
>>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>>> You can then filter the DataStream accordingly before passing to the
>>> KafkaTableSink.
>>>
>>> Hope this helps.
>>>
>>> Best regards
>>> Benoît
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>
>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:
>>>
 Hi Roman,

 This is the core logic:

 CREATE TABLE QueryResult (
 queryIdBIGINT,
   itemIdSTRING,
   quantity INT
 ) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic'   = 'query.output.log.1',
 'connector.properties.bootstrap.servers' = '',
 'format.type' = 'json'
 );

 INSERT INTO QueryResult
 SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
 FROM
   ItemTransactions AS t,
   Queries AS q
 WHERE
   t.itemId = q.itemId AND
   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
 q.event_time
 GROUP BY
   t.itemId, q.event_time, q.queryId;

 And the error I get is:
 org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
 SQL update statement.
 at
 org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
 at
 org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 at
 org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 at
 org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
 at
 org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
 at java.util.Optional.ifPresent(Optional.java:159)
 at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
 at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
 Caused by: org.apache.flink.table.api.TableException:
 AppendStreamTableSink requires that Table has only insert changes.
 at
 org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
 at
 org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
 at
 org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

 I am wondering what could I d

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
> I also don't completely understand at this point why I can write the
result of a group, tumble window aggregate to Kafka and not this window
join / aggregate.

If you are doing a tumble window aggregate with watermark enabled, Flink
will only fire a final result for
each window at once, no modification or retractions will happen after a
window is calculated and fired.
But with some other arbitrary aggregations, there is not enough information
for Flink to determine whether
the data is complete or not, so the framework will keep calculating results
when receiving new records and
retract earlier results by firing retraction/deletion messages.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:

> Thanks Benoît!
>
> I can see now how I can implement this myself through the provided sink
> interfaces but I was trying to avoid having to write code for this :D
> My initial motivation was to see whether we are able to write out any kind
> of table to Kafka as a simple stream of "upserts".
>
> I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> Cheers,
> Gyula
>
> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi Gyula,
>>
>> I'm afraid conversion to see the retractions vs inserts can't be done in
>> pure SQL (though I'd love that feature).
>>
>> You might want to go lower level and implement a RetractStreamTableSink
>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>> a emitDataStream(DataStream> dataStream);, in which the
>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>> You can then filter the DataStream accordingly before passing to the
>> KafkaTableSink.
>>
>> Hope this helps.
>>
>> Best regards
>> Benoît
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>
>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:
>>
>>> Hi Roman,
>>>
>>> This is the core logic:
>>>
>>> CREATE TABLE QueryResult (
>>> queryIdBIGINT,
>>>   itemIdSTRING,
>>>   quantity INT
>>> ) WITH (
>>> 'connector.type' = 'kafka',
>>> 'connector.version' = 'universal',
>>> 'connector.topic'   = 'query.output.log.1',
>>> 'connector.properties.bootstrap.servers' = '',
>>> 'format.type' = 'json'
>>> );
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.event_time, q.queryId;
>>>
>>> And the error I get is:
>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>>> update statement.
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>> at java.util.Optional.ifPresent(Optional.java:159)
>>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> AppendStreamTableSink requires that Table has only insert changes.
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>
>>> I am wondering what could I do to just simply pump the result updates to
>>> Kafka here.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Gyula,

 Could you provide the code of your Flink program, the error with
 stacktrace and the Flink version?

 Thanks.,
 Roman


 On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:

> Hi All!
>
> Excuse my stupid question, I 

Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Yes Arvid, the Sink is keyed by a String dbName::tableName
This is kafka as input but to init the state we have to read Hive delta
files febore consume kafka records. This is ORC files we have to read to
init the state with one directory per table.
A key (primary key) is only in one bucket file. So to init the state per
table (per keyedstream in fact) we've created a pool of threads to read
more than one bucket file in parallel.
This task is performed when the first record for one table is read from
kafka and if the state for this table does not exist (So this code is in
the process method)
Then, we can snapshot the state and reuse it but this task of init must be
done at least once.
We can have more than one instances of Sink but each task is in one JVM and
we can't have more than one task for one table (keystream) at the moment. A
sharding has been developed but not yet tested.
We use yarn session and we specified the --slots option to force one task
per taskmanager because we used a lib (dependency) not thread safe.
So if I'm right we can't read the same bucket file on multiple parallel
sinks at the moment.
But yes to make this task of state init per table faster, I've naively
created this pool of threads.
If I can keep this as a workaround it would be great (in waiting a better
solution: sharding, State processor API, ...)
I'm open to any suggestion for the short or the long term.

Thanks

Le jeu. 5 mars 2020 à 14:35, Arvid Heise  a écrit :

> Hi David,
>
> could you please explain what you are actually trying to achieve?
>
> It seems like you are reading in the SinkFunction#open some files from S3
> and put it into state (bootstrapping?)
> How many instances of the sink are executed?
> How do you shard the buckets / e.g. how do you avoid reading the same file
> on multiple parallel sinks?
> Is your sink running in a keyed context? Maybe even provide the general
> pipeline.
>
> On Thu, Mar 5, 2020 at 2:29 PM David Morin 
> wrote:
>
>> Hello Arvid,
>>
>> After some investigations with the help of my colleague we finally found
>> the root cause.
>> In order to improve the init of the state, I've created some threads to
>> parallelize the read of bucket files.
>> This is a temporary solution because I've planned to use the State
>> Processor API.
>> Here after an abstract of the code:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *ExecutorService executorService =
>> Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
>> bucketFile : xxx) {executorService.submit(() -> {
>> try {readBucketFct(XXX); // Update the state with the bucket
>> content...} catch (Exception e) {   }
>> });}executorService.shutdown();boolean terminated =
>> executorService.awaitTermination(initStateTimeoutSeconds,
>> TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
>> throw new SinkException("Init state failed...") ;}*
>>
>> After some tests: if I use one thead in my executorService it works. But
>> with 2 threads the job fails.
>> Can I mitigate this behaviour (in waiting the switch to the State
>> Processor API) ?
>>
>> Thanks
>> David
>>
>>
>> Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :
>>
>>> Hi David,
>>>
>>> the obvious reason is that your state stored an enum value that is not
>>> present anymore. It tries to deserialize the 512. entry in your enum that
>>> is not available.
>>>
>>> However, since it's highly unlikely that you actually have that many
>>> enum values in the same enum class, we are actually looking at a corrupt
>>> stream, which is hard to fix. Could you describe which state you have?
>>>
>>> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
>>> it's application, it may be that state is incompatible and would need to be
>>> migrated.
>>>
>>> Did you restart from checkpoint or savepoint?
>>>
>>> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
>>> wrote:
>>>
 Hello,

 I have this Exception in my datastream app and I can't find the root
 cause.
 I consume data from Kafka and it fails when I try to get a value from
 my MapState in RocksDB.
 It was working in previous release of my app but I can't find the cause
 of this error.

 java.lang.ArrayIndexOutOfBoundsException: 512
 at
 org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
 at
 org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
 at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
 at
 org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
 at
 org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
 at
 org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
Hi Gyula,

I am doing integration Flink with Zeppelin. One feature in Zeppelin is that
user could override any features in flink-conf.yaml. (Actually any features
here
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
Of course you can run flink sql in Zeppelin, and could also leverage other
features of Zeppelin, like visualization.

If you are interested, you could try the master branch of Zeppelin + this
improvement PR

https://github.com/apache/zeppelin
https://github.com/apache/zeppelin/pull/3676
https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md






Gyula Fóra  于2020年3月5日周四 下午6:51写道:

> I could basically list a few things I want to set (execution.target for
> example), but it's fair to assume that I would like to be able to set
> anything :)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
> wrote:
>
>> Hi Gyula,
>>
>> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
>> which planner do you use?
>>
>> But "StreamExecutionEnvironment.configure" is only for partial
>> configuration, can not for all configuration in flink-conf.yaml.
>> So what's the config do you want to set? I know some config like
>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> Flink configurations can be overrided via
>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
>>> specific configs.
>>> I will think it as a bug/improvement of SQL CLI which should be fixed in
>>> 1.10.1.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>>
 Thanks Caizhi,

 This seems like a pretty big shortcoming for any multi-user/multi-app
 environment. I will open a jira for this.

 Gyula

 On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
 wrote:

> Hi Gyula.
>
> I'm afraid there is no way to override all Flink configurations
> currently. SQL client yaml file can only override some of the Flink
> configurations.
>
> Configuration entries indeed can only set Table specific configs,
> while deployment entires are used to set the result fetching address and
> port. There is currently no way to change the execution target from the 
> SQL
> client.
>
> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>
>> Hi All!
>>
>> I am trying to understand if there is any way to override flink
>> configuration parameters when starting the SQL Client.
>>
>> It seems that the only way to pass any parameters is through the
>> environment yaml.
>>
>> There I found 2 possible routes:
>>
>> configuration: this doesn't work as it only sets Table specific
>> configs apparently, but maybe I am wrong.
>>
>> deployment: I tried using dynamic properties options here but
>> unfortunately we normalize (lowercase) the YAML keys so it is impossible 
>> to
>> pass options like -yD or -D.
>>
>> Does anyone have any suggestions?
>>
>> Thanks
>> Gyula
>>
>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best Regards

Jeff Zhang


Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Thanks Benoît!

I can see now how I can implement this myself through the provided sink
interfaces but I was trying to avoid having to write code for this :D
My initial motivation was to see whether we are able to write out any kind
of table to Kafka as a simple stream of "upserts".

I also don't completely understand at this point why I can write the result
of a group, tumble window aggregate to Kafka and not this window join /
aggregate.

Cheers,
Gyula

On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi Gyula,
>
> I'm afraid conversion to see the retractions vs inserts can't be done in
> pure SQL (though I'd love that feature).
>
> You might want to go lower level and implement a RetractStreamTableSink
> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
> a emitDataStream(DataStream> dataStream);, in which the
> Boolean flag will give you an 'accumulate' or 'retract' signal.
> You can then filter the DataStream accordingly before passing to the
> KafkaTableSink.
>
> Hope this helps.
>
> Best regards
> Benoît
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>
> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:
>
>> Hi Roman,
>>
>> This is the core logic:
>>
>> CREATE TABLE QueryResult (
>> queryIdBIGINT,
>>   itemIdSTRING,
>>   quantity INT
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic'   = 'query.output.log.1',
>> 'connector.properties.bootstrap.servers' = '',
>> 'format.type' = 'json'
>> );
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.event_time, q.queryId;
>>
>> And the error I get is:
>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>> update statement.
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>> at
>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>> at
>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>
>> I am wondering what could I do to just simply pump the result updates to
>> Kafka here.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Could you provide the code of your Flink program, the error with
>>> stacktrace and the Flink version?
>>>
>>> Thanks.,
>>> Roman
>>>
>>>
>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:
>>>
 Hi All!

 Excuse my stupid question, I am pretty new to the Table/SQL API and I
 am trying to play around with it implementing and running a few use-cases.

 I have a simple window join + aggregation, grouped on some id that I
 want to write to Kafka but I am hitting the following error:

 "AppendStreamTableSink requires that Table has only insert changes."

 If I understand correctly the problem here is that since updates are
 possible within a single group, we have a retract stream and the Kafka Sink
 cannot handle that. I tried to search for the solution but I haven't found
 any satisfying answers.

 How can I simply tell the INSERT logic to ignore previous values and
 just always keep sending the latest (like you would see it on the CLI
 output).

 Thank you!
 Gyula

>>>
>
> --
> Benoît Paris
> Ingén

Re: Writing retract streams to Kafka

2020-03-05 Thread Benoît Paris
Hi Gyula,

I'm afraid conversion to see the retractions vs inserts can't be done in
pure SQL (though I'd love that feature).

You might want to go lower level and implement a RetractStreamTableSink
[1][2] that you would wrap around a KafkaTableSink [3]. This will give you
a emitDataStream(DataStream> dataStream);, in which the
Boolean flag will give you an 'accumulate' or 'retract' signal.
You can then filter the DataStream accordingly before passing to the
KafkaTableSink.

Hope this helps.

Best regards
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html

On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:

> Hi Roman,
>
> This is the core logic:
>
> CREATE TABLE QueryResult (
> queryIdBIGINT,
>   itemIdSTRING,
>   quantity INT
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic'   = 'query.output.log.1',
> 'connector.properties.bootstrap.servers' = '',
> 'format.type' = 'json'
> );
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.event_time, q.queryId;
>
> And the error I get is:
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> update statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> I am wondering what could I do to just simply pump the result updates to
> Kafka here.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Could you provide the code of your Flink program, the error with
>> stacktrace and the Flink version?
>>
>> Thanks.,
>> Roman
>>
>>
>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>>> trying to play around with it implementing and running a few use-cases.
>>>
>>> I have a simple window join + aggregation, grouped on some id that I
>>> want to write to Kafka but I am hitting the following error:
>>>
>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>
>>> If I understand correctly the problem here is that since updates are
>>> possible within a single group, we have a retract stream and the Kafka Sink
>>> cannot handle that. I tried to search for the solution but I haven't found
>>> any satisfying answers.
>>>
>>> How can I simply tell the INSERT logic to ignore previous values and
>>> just always keep sending the latest (like you would see it on the CLI
>>> output).
>>>
>>> Thank you!
>>> Gyula
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi Roman,

This is the core logic:

CREATE TABLE QueryResult (
queryIdBIGINT,
  itemIdSTRING,
  quantity INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic'   = 'query.output.log.1',
'connector.properties.bootstrap.servers' = '',
'format.type' = 'json'
);

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.event_time, q.queryId;

And the error I get is:
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
update statement.
at
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
at
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
at
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
requires that Table has only insert changes.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

I am wondering what could I do to just simply pump the result updates to
Kafka here.

Gyula

On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Gyula,
>
> Could you provide the code of your Flink program, the error with
> stacktrace and the Flink version?
>
> Thanks.,
> Roman
>
>
> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:
>
>> Hi All!
>>
>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>> trying to play around with it implementing and running a few use-cases.
>>
>> I have a simple window join + aggregation, grouped on some id that I want
>> to write to Kafka but I am hitting the following error:
>>
>> "AppendStreamTableSink requires that Table has only insert changes."
>>
>> If I understand correctly the problem here is that since updates are
>> possible within a single group, we have a retract stream and the Kafka Sink
>> cannot handle that. I tried to search for the solution but I haven't found
>> any satisfying answers.
>>
>> How can I simply tell the INSERT logic to ignore previous values and just
>> always keep sending the latest (like you would see it on the CLI output).
>>
>> Thank you!
>> Gyula
>>
>


Re: Writing retract streams to Kafka

2020-03-05 Thread Khachatryan Roman
Hi Gyula,

Could you provide the code of your Flink program, the error with stacktrace
and the Flink version?

Thanks.,
Roman


On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:

> Hi All!
>
> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
> trying to play around with it implementing and running a few use-cases.
>
> I have a simple window join + aggregation, grouped on some id that I want
> to write to Kafka but I am hitting the following error:
>
> "AppendStreamTableSink requires that Table has only insert changes."
>
> If I understand correctly the problem here is that since updates are
> possible within a single group, we have a retract stream and the Kafka Sink
> cannot handle that. I tried to search for the solution but I haven't found
> any satisfying answers.
>
> How can I simply tell the INSERT logic to ignore previous values and just
> always keep sending the latest (like you would see it on the CLI output).
>
> Thank you!
> Gyula
>


Re: Rocksdb Serialization issue

2020-03-05 Thread Arvid Heise
Hi David,

could you please explain what you are actually trying to achieve?

It seems like you are reading in the SinkFunction#open some files from S3
and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading the same file
on multiple parallel sinks?
Is your sink running in a keyed context? Maybe even provide the general
pipeline.

On Thu, Mar 5, 2020 at 2:29 PM David Morin 
wrote:

> Hello Arvid,
>
> After some investigations with the help of my colleague we finally found
> the root cause.
> In order to improve the init of the state, I've created some threads to
> parallelize the read of bucket files.
> This is a temporary solution because I've planned to use the State
> Processor API.
> Here after an abstract of the code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *ExecutorService executorService =
> Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
> bucketFile : xxx) {executorService.submit(() -> {
> try {readBucketFct(XXX); // Update the state with the bucket
> content...} catch (Exception e) {   }
> });}executorService.shutdown();boolean terminated =
> executorService.awaitTermination(initStateTimeoutSeconds,
> TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
> throw new SinkException("Init state failed...") ;}*
>
> After some tests: if I use one thead in my executorService it works. But
> with 2 threads the job fails.
> Can I mitigate this behaviour (in waiting the switch to the State
> Processor API) ?
>
> Thanks
> David
>
>
> Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :
>
>> Hi David,
>>
>> the obvious reason is that your state stored an enum value that is not
>> present anymore. It tries to deserialize the 512. entry in your enum that
>> is not available.
>>
>> However, since it's highly unlikely that you actually have that many enum
>> values in the same enum class, we are actually looking at a corrupt stream,
>> which is hard to fix. Could you describe which state you have?
>>
>> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
>> it's application, it may be that state is incompatible and would need to be
>> migrated.
>>
>> Did you restart from checkpoint or savepoint?
>>
>> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
>> wrote:
>>
>>> Hello,
>>>
>>> I have this Exception in my datastream app and I can't find the root
>>> cause.
>>> I consume data from Kafka and it fails when I try to get a value from my
>>> MapState in RocksDB.
>>> It was working in previous release of my app but I can't find the cause
>>> of this error.
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 512
>>> at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>>> at
>>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>>> at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>> ..
>>>
>>> Flink version: 1.9.2
>>>
>>>
>>>


Re: Rocksdb Serialization issue

2020-03-05 Thread David Morin
Hello Arvid,

After some investigations with the help of my colleague we finally found
the root cause.
In order to improve the init of the state, I've created some threads to
parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State
Processor API.
Here after an abstract of the code:
















*ExecutorService executorService =
Executors.newFixedThreadPool(initStateMaxThreads);for (FileStatus
bucketFile : xxx) {executorService.submit(() -> {
try {readBucketFct(XXX); // Update the state with the bucket
content...} catch (Exception e) {   }
});}executorService.shutdown();boolean terminated =
executorService.awaitTermination(initStateTimeoutSeconds,
TimeUnit.SECONDS);if ((!terminated) || (readMetaErrors.get() > 0)) {
throw new SinkException("Init state failed...") ;}*

After some tests: if I use one thead in my executorService it works. But
with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor
API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise  a écrit :

> Hi David,
>
> the obvious reason is that your state stored an enum value that is not
> present anymore. It tries to deserialize the 512. entry in your enum that
> is not available.
>
> However, since it's highly unlikely that you actually have that many enum
> values in the same enum class, we are actually looking at a corrupt stream,
> which is hard to fix. Could you describe which state you have?
>
> Did you upgrade Flink or your application? If it's Flink, it's a bug. If
> it's application, it may be that state is incompatible and would need to be
> migrated.
>
> Did you restart from checkpoint or savepoint?
>
> On Thu, Mar 5, 2020 at 1:14 AM David Morin 
> wrote:
>
>> Hello,
>>
>> I have this Exception in my datastream app and I can't find the root
>> cause.
>> I consume data from Kafka and it fails when I try to get a value from my
>> MapState in RocksDB.
>> It was working in previous release of my app but I can't find the cause
>> of this error.
>>
>> java.lang.ArrayIndexOutOfBoundsException: 512
>> at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
>> at
>> org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
>> at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>> ..
>>
>> Flink version: 1.9.2
>>
>>
>>


Writing retract streams to Kafka

2020-03-05 Thread Gyula Fóra
Hi All!

Excuse my stupid question, I am pretty new to the Table/SQL API and I am
trying to play around with it implementing and running a few use-cases.

I have a simple window join + aggregation, grouped on some id that I want
to write to Kafka but I am hitting the following error:

"AppendStreamTableSink requires that Table has only insert changes."

If I understand correctly the problem here is that since updates are
possible within a single group, we have a retract stream and the Kafka Sink
cannot handle that. I tried to search for the solution but I haven't found
any satisfying answers.

How can I simply tell the INSERT logic to ignore previous values and just
always keep sending the latest (like you would see it on the CLI output).

Thank you!
Gyula


Re: Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Khachatryan Roman
Hi Felipe,

Please find the answers to your questions below.

> Each "operator_subtask_index" means each instance of the parallel
physical operator, doesn't it?
Yes.
> How can I set a fixed ID for the "operator_id" in my code so I can
identify quickly which operator I am measuring?
You are using the correct api (uid(...))
> What is the hash function used so I can identify my operator?
Flink uses
https://guava.dev/releases/18.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)

Regards,
Roman


On Thu, Mar 5, 2020 at 12:45 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I am tracking the latency of operators in Flink according to this
> reference [1]. When I am using Prometheus+Grafana I can issue a query using
> "flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
> and I can check the percentiles of each "operator_id" and each
> "operator_subtask_index". Each "operator_subtask_index" means each instance
> of the parallel physical operator, doesn't it?
>
> How can I set a fixed ID for the "operator_id" in my code so I can
> identify quickly which operator I am measuring? I used "map(new
> MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash
> function that converts the string to a hash value. What is the hash
> function used so I can identify my operator? I know that I can use the Rest
> API [2] and if I name my operator it will have always the same hash when I
> restart the job, but I would like to set its name.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
> *-*
> *- Felipe Gutierrez*
>
> *- skype: felipe.o.gutierrez*
> *- **https://felipeogutierrez.blogspot.com
> * *
> *
>


Setting the operator-id to measure percentile latency over several jobs

2020-03-05 Thread Felipe Gutierrez
Hi community,

I am tracking the latency of operators in Flink according to this reference
[1]. When I am using Prometheus+Grafana I can issue a query using
"flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency"
and I can check the percentiles of each "operator_id" and each
"operator_subtask_index". Each "operator_subtask_index" means each instance
of the parallel physical operator, doesn't it?

How can I set a fixed ID for the "operator_id" in my code so I can identify
quickly which operator I am measuring? I used "map(new
MyMapUDF()).uid('my-operator-ID')" but it seems that there is a hash
function that converts the string to a hash value. What is the hash
function used so I can identify my operator? I know that I can use the Rest
API [2] and if I name my operator it will have always the same hash when I
restart the job, but I would like to set its name.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rest-api-integration
*-*
*- Felipe Gutierrez*

*- skype: felipe.o.gutierrez*
*- **https://felipeogutierrez.blogspot.com
* *
*


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
I could basically list a few things I want to set (execution.target for
example), but it's fair to assume that I would like to be able to set
anything :)

Gyula

On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li  wrote:

> Hi Gyula,
>
> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
> which planner do you use?
>
> But "StreamExecutionEnvironment.configure" is only for partial
> configuration, can not for all configuration in flink-conf.yaml.
> So what's the config do you want to set? I know some config like
> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>
> Best,
> Jingsong Lee
>
> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>
>> Hi Gyula,
>>
>> Flink configurations can be overrided via
>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
>> specific configs.
>> I will think it as a bug/improvement of SQL CLI which should be fixed in
>> 1.10.1.
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>
>>> Thanks Caizhi,
>>>
>>> This seems like a pretty big shortcoming for any multi-user/multi-app
>>> environment. I will open a jira for this.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>>> wrote:
>>>
 Hi Gyula.

 I'm afraid there is no way to override all Flink configurations
 currently. SQL client yaml file can only override some of the Flink
 configurations.

 Configuration entries indeed can only set Table specific configs, while
 deployment entires are used to set the result fetching address and port.
 There is currently no way to change the execution target from the SQL
 client.

 Gyula Fóra  于2020年3月5日周四 下午4:31写道:

> Hi All!
>
> I am trying to understand if there is any way to override flink
> configuration parameters when starting the SQL Client.
>
> It seems that the only way to pass any parameters is through the
> environment yaml.
>
> There I found 2 possible routes:
>
> configuration: this doesn't work as it only sets Table specific
> configs apparently, but maybe I am wrong.
>
> deployment: I tried using dynamic properties options here but
> unfortunately we normalize (lowercase) the YAML keys so it is impossible 
> to
> pass options like -yD or -D.
>
> Does anyone have any suggestions?
>
> Thanks
> Gyula
>

>
> --
> Best, Jingsong Lee
>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jingsong Li
Hi Gyula,

Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
which planner do you use?

But "StreamExecutionEnvironment.configure" is only for partial
configuration, can not for all configuration in flink-conf.yaml.
So what's the config do you want to set? I know some config like
"taskmanager.network.blocking-shuffle.compression.enabled" can not set

Best,
Jingsong Lee

On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:

> Hi Gyula,
>
> Flink configurations can be overrided via
> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
> specific configs.
> I will think it as a bug/improvement of SQL CLI which should be fixed in
> 1.10.1.
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>
>> Thanks Caizhi,
>>
>> This seems like a pretty big shortcoming for any multi-user/multi-app
>> environment. I will open a jira for this.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng  wrote:
>>
>>> Hi Gyula.
>>>
>>> I'm afraid there is no way to override all Flink configurations
>>> currently. SQL client yaml file can only override some of the Flink
>>> configurations.
>>>
>>> Configuration entries indeed can only set Table specific configs, while
>>> deployment entires are used to set the result fetching address and port.
>>> There is currently no way to change the execution target from the SQL
>>> client.
>>>
>>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>>
 Hi All!

 I am trying to understand if there is any way to override flink
 configuration parameters when starting the SQL Client.

 It seems that the only way to pass any parameters is through the
 environment yaml.

 There I found 2 possible routes:

 configuration: this doesn't work as it only sets Table specific configs
 apparently, but maybe I am wrong.

 deployment: I tried using dynamic properties options here but
 unfortunately we normalize (lowercase) the YAML keys so it is impossible to
 pass options like -yD or -D.

 Does anyone have any suggestions?

 Thanks
 Gyula

>>>

-- 
Best, Jingsong Lee


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jark Wu
Hi Gyula,

Flink configurations can be overrided via `TableConfig#getConfiguration()`,
however, SQL CLI only allows to set Table specific configs.
I will think it as a bug/improvement of SQL CLI which should be fixed in
1.10.1.

Best,
Jark

On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:

> Thanks Caizhi,
>
> This seems like a pretty big shortcoming for any multi-user/multi-app
> environment. I will open a jira for this.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng  wrote:
>
>> Hi Gyula.
>>
>> I'm afraid there is no way to override all Flink configurations
>> currently. SQL client yaml file can only override some of the Flink
>> configurations.
>>
>> Configuration entries indeed can only set Table specific configs, while
>> deployment entires are used to set the result fetching address and port.
>> There is currently no way to change the execution target from the SQL
>> client.
>>
>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>
>>> Hi All!
>>>
>>> I am trying to understand if there is any way to override flink
>>> configuration parameters when starting the SQL Client.
>>>
>>> It seems that the only way to pass any parameters is through the
>>> environment yaml.
>>>
>>> There I found 2 possible routes:
>>>
>>> configuration: this doesn't work as it only sets Table specific configs
>>> apparently, but maybe I am wrong.
>>>
>>> deployment: I tried using dynamic properties options here but
>>> unfortunately we normalize (lowercase) the YAML keys so it is impossible to
>>> pass options like -yD or -D.
>>>
>>> Does anyone have any suggestions?
>>>
>>> Thanks
>>> Gyula
>>>
>>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Thanks Caizhi,

This seems like a pretty big shortcoming for any multi-user/multi-app
environment. I will open a jira for this.

Gyula

On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng  wrote:

> Hi Gyula.
>
> I'm afraid there is no way to override all Flink configurations currently.
> SQL client yaml file can only override some of the Flink configurations.
>
> Configuration entries indeed can only set Table specific configs, while
> deployment entires are used to set the result fetching address and port.
> There is currently no way to change the execution target from the SQL
> client.
>
> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>
>> Hi All!
>>
>> I am trying to understand if there is any way to override flink
>> configuration parameters when starting the SQL Client.
>>
>> It seems that the only way to pass any parameters is through the
>> environment yaml.
>>
>> There I found 2 possible routes:
>>
>> configuration: this doesn't work as it only sets Table specific configs
>> apparently, but maybe I am wrong.
>>
>> deployment: I tried using dynamic properties options here but
>> unfortunately we normalize (lowercase) the YAML keys so it is impossible to
>> pass options like -yD or -D.
>>
>> Does anyone have any suggestions?
>>
>> Thanks
>> Gyula
>>
>


Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-05 Thread Yang Wang
 Hi Andrey,


Thanks for driving this significant FLIP. From the user ML, we could also
know there are
many users running Flink in container environment. Then the docker image
will be the
very basic requirement. Just as you say, we should provide a unified place
for all various
usage(e.g. session, job, native k8s, swarm, etc.).


> About docker utils

I really like the idea to provide some utils for the docker file and entry
point. The
`flink_docker_utils` will help to build the image easier. I am not sure
about the
`flink_docker_utils start_jobmaster`. Do you mean when we build a docker
image, we
need to add `RUN flink_docker_utils start_jobmaster` in the docker file?
Why do we need this?


> About docker entry point

I agree with you that the docker entry point could more powerful with more
functionality.
Mostly, it is about to override the config options. If we support dynamic
properties, i think
it is more convenient for users without any learning curve.
`docker run flink session_jobmanager -D rest.bind-port=8081`


> About the logging

Updating the `log4j-console.properties` to support multiple appender is a
better option.
Currently, the native K8s is suggesting users to debug the logs in this
way[1]. However,
there is also some problems. The stderr and stdout of JM/TM processes could
not be
forwarded to the docker container console.


[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#log-files


Best,
Yang




Andrey Zagrebin  于2020年3月4日周三 下午5:34写道:

> Hi All,
>
> If you have ever touched the docker topic in Flink, you
> probably noticed that we have multiple places in docs and repos which
> address its various concerns.
>
> We have prepared a FLIP [1] to simplify the perception of docker topic in
> Flink by users. It mostly advocates for an approach of extending official
> Flink image from the docker hub. For convenience, it can come with a set of
> bash utilities and documented examples of their usage. The utilities allow
> to:
>
>- run the docker image in various modes (single job, session master,
>task manager etc)
>- customise the extending Dockerfile
>- and its entry point
>
> Eventually, the FLIP suggests to remove all other user facing Dockerfiles
> and building scripts from Flink repo, move all docker docs to
> apache/flink-docker and adjust existing docker use cases to refer to this
> new approach (mostly Kubernetes now).
>
> The first contributed version of Flink docker integration also contained
> example and docs for the integration with Bluemix in IBM cloud. We also
> suggest to maintain it outside of Flink repository (cc Markus Müller).
>
> Thanks,
> Andrey
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-111%3A+Docker+image+unification
>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Caizhi Weng
Hi Gyula.

I'm afraid there is no way to override all Flink configurations currently.
SQL client yaml file can only override some of the Flink configurations.

Configuration entries indeed can only set Table specific configs, while
deployment entires are used to set the result fetching address and port.
There is currently no way to change the execution target from the SQL
client.

Gyula Fóra  于2020年3月5日周四 下午4:31写道:

> Hi All!
>
> I am trying to understand if there is any way to override flink
> configuration parameters when starting the SQL Client.
>
> It seems that the only way to pass any parameters is through the
> environment yaml.
>
> There I found 2 possible routes:
>
> configuration: this doesn't work as it only sets Table specific configs
> apparently, but maybe I am wrong.
>
> deployment: I tried using dynamic properties options here but
> unfortunately we normalize (lowercase) the YAML keys so it is impossible to
> pass options like -yD or -D.
>
> Does anyone have any suggestions?
>
> Thanks
> Gyula
>


How to override flink-conf parameters for SQL Client

2020-03-05 Thread Gyula Fóra
Hi All!

I am trying to understand if there is any way to override flink
configuration parameters when starting the SQL Client.

It seems that the only way to pass any parameters is through the
environment yaml.

There I found 2 possible routes:

configuration: this doesn't work as it only sets Table specific configs
apparently, but maybe I am wrong.

deployment: I tried using dynamic properties options here but unfortunately
we normalize (lowercase) the YAML keys so it is impossible to pass options
like -yD or -D.

Does anyone have any suggestions?

Thanks
Gyula