Re: [Flink SQL] Lookup join hbase problem

2021-06-27 Thread JING ZHANG
Hi houyin,
> It maybe because the conditions in where clause, being pushed down as  a 
> predicate
into join clause ?
Yes, after pushdown, hbase lookupkeys are `rowKey` and `city_code`, which
trigger above exception.
> How can I solve this problem ?
Because only constant value and field input ref only be parsed as
LookupKey, you could add some operation on t2.city_code, for example update
`AND t1.city_code = t2.city_code` to `AND UPPER(t1.city_code) =
UPPER(t2.city_code)
` or `AND CAST(t1.city_code AS VARCHAR) = CAST(t2.city_code AS VARCHAR) `

Best regards,
JING ZHANG

纳兰清风  于2021年6月28日周一 下午12:48写道:

> Hi,
>
>   When I was using hbase table as my lookup table, I got this error:
>
> Caused by: java.lang.IllegalArgumentException: Currently, HBase table
> can only be lookup by single row key.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>
> My SQL is
>
> insert into sink_kafka(user_ucid,city_code,`source`,system_type)
> SELECT t1.ucid AS user_ucid,
>  t1.city_code,
>  t1.`source`,
>  t1.system_type
> FROM tmp_ucid_check t1
> LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache
> for SYSTEM_TIME AS OF t1.proctime AS t2
> ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
> WHERE t2.city_code is NOT null
> AND t1.city_code = t2.city_code;
>
> It maybe because the conditions in where clause, being pushed down as  a 
> predicate
> into join clause ?
> How can I solve this problem ?
>
> Thank you
>
>
>
>


Event Time Timers in Process functions when time characteristic is ProcessingTime

2021-06-27 Thread Deniz Koçak
Hi,

In environment configuration when set the time characteristics to
ProcessingTime via setStreamTimeCharacteristic(...) call, I cannot see
watermarks in the Flink UI. I think this is expected, because
watermarks disabled in the source (using Kafka as source)?

Another point here is, can I use Event Time Timers even if I set the
time characteristics to ProcessingTime (via
ctx.timerService().registerEventTimeTimer(...))? As far as I
understood from the documentation, Event Time Timers needs watermarks
which sets  the operator time, so I wonder if I can use event time
timers in case of ProcessingTime selected in the environment? Also
when I set the time characteristics to Event Time can I use both
processing time timers & event time timers without any problem?

Thanks,


Re: Exception in snapshotState suppresses subsequent checkpoints

2021-06-27 Thread Yun Tang
Hi Tao,

I'm afraid that your Flink job continues to be in high backpressued and all 
subsequent checkpoints did not ever run 'FromElementsFunctionT#snapshotState' 
which means your code to throw exception never be executed. You could check 
those expired checkpoints to see whether your tasks containing 
'FromElementsFunctionT' has ever been completed.

Best
Yun Tang

From: tao xiao 
Sent: Saturday, June 26, 2021 16:40
To: user 
Subject: Re: Exception in snapshotState suppresses subsequent checkpoints

Btw here is the checkpoint related log

[2021-06-26 16:08:52,352] INFO Triggering checkpoint 1 (type=CHECKPOINT) @ 
1624694932345 for job afde4a82f41e8284cb0bfff20497a5cc. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator)
[2021-06-26 16:08:52,372] INFO Could not complete snapshot 1 for operator 
Source: Custom Source -> Sink: Print to Std. Out (1/1)#0. Failure reason: 
Checkpoint was declined. 
(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl)
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 1 for operator Source: Custom Source -> Sink: Print to Std. Out 
(1/1)#0. Failure reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:880)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 [flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-runtime_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
Caused by: org.apache.flink.util.SerializedThrowable: npe
at 
com.smartnews.dp.kafka.sample.flink.FromElementsFunctionT.snapshotState(FromElementsFunctionT.java:111)
 ~[classes/:?]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at 
org.a

Re: [Flink SQL] Lookup join hbase problem

2021-06-27 Thread Jark Wu
Yes. Currently, the HBase lookup source only supports lookup on rowkey.
If there is more than one join on condition, it may fail.

We should support lookup HBase on multiple fields (by Get#setFilter).
Feel free to open issues.

Best,
Jark

On Mon, 28 Jun 2021 at 12:48, 纳兰清风  wrote:

> Hi,
>
>   When I was using hbase table as my lookup table, I got this error:
>
> Caused by: java.lang.IllegalArgumentException: Currently, HBase table
> can only be lookup by single row key.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>
> My SQL is
>
> insert into sink_kafka(user_ucid,city_code,`source`,system_type)
> SELECT t1.ucid AS user_ucid,
>  t1.city_code,
>  t1.`source`,
>  t1.system_type
> FROM tmp_ucid_check t1
> LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache
> for SYSTEM_TIME AS OF t1.proctime AS t2
> ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
> WHERE t2.city_code is NOT null
> AND t1.city_code = t2.city_code;
>
> It maybe because the conditions in where clause, being pushed down as  a 
> predicate
> into join clause ?
> How can I solve this problem ?
>
> Thank you
>
>
>
>


Re: Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-27 Thread Jark Wu
UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
example:

SELECT *
FROM (
  SELECT word, count(*) as cnt
  FROM T
  GROUP BY word
) WHERE cnt < 3;

There is more discussion in this issue:
https://issues.apache.org/jira/browse/FLINK-9528

Best,
Jark

On Mon, 28 Jun 2021 at 13:52, Kai Fu  wrote:

> Hi team,
>
> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
> as in code
> .
> We're aware that this is useful to retract output records in some cases,
> but we cannot come up with such a scenario, could anyone name a few cases
> for it.
>
> The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
> connector to reduce the sink traffic since almost all of our records are
> update. In our case, the records are generated by joining with a couple of
> upsert-kafka data sources. Only primary-key participants in the join
> condition for all join cases, with some granularity/cardinality fan-out in
> the middle. We want to know whether it impacts the final result correctness
> if we drop the records with UPDATE_BEFORE row kind.
>
> --
> *Best wishes,*
> *- Kai*
>


Questions about UPDATE_BEFORE row kind in ElasticSearch sink

2021-06-27 Thread Kai Fu
Hi team,

We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
as in code
.
We're aware that this is useful to retract output records in some cases,
but we cannot come up with such a scenario, could anyone name a few cases
for it.

The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
connector to reduce the sink traffic since almost all of our records are
update. In our case, the records are generated by joining with a couple of
upsert-kafka data sources. Only primary-key participants in the join
condition for all join cases, with some granularity/cardinality fan-out in
the middle. We want to know whether it impacts the final result correctness
if we drop the records with UPDATE_BEFORE row kind.

-- 
*Best wishes,*
*- Kai*


Re: [Question] Why doesn't Flink use Calcite adapter?

2021-06-27 Thread JING ZHANG
Hi guangyuan,
The question is an interesting and broad topic. I try to give my opinion
based on my limited knowledge.

Flink introduces dynamic sources to read from an external system[1]. Flink
connector modules are completely decoupled with Calcite. There are two
benefits:
(1) If users need to develop a custom, user-defined connector, no
background knowledge of Calcite is required.
(2) Remove unnecessary external dependencies in the Flink connector module.

Besides, since Flink is distributed for stateful computations over *unbounded
and bounded* data streams, there are more things to be taken into
consideration when connected with an external system.
For example, how to complete data reading with multiple concurrency, how to
store metastore to state in order to recover after failover.
I list a few issues as follows. These issues are strongly related to the
Flink engine which are not defined in Calcite built-in adapters.
(1) Required: define how to read from an external storage system
 1.1 scan all rows or lookup rows by one or more keys
 1.2 if choose scan mode, define how to split source, how to store
metadata to state in order to recover them after recovery from failover.
(2) Required: mapping from data type in external system to Flink data type
system
(3) Optional: for planner optimization, define optionally
ability interfaces, e.g SupportsProjectionPushDown/SupportFilterPushDown
and so on.
(4) Optional: define encoding/ decoding formats

Hope it helps. Please correct me if I'm wrong.

Best regards,
JING ZHANG

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/

Israel Ekpo  于2021年6月28日周一 上午8:28写道:

> Maybe this question was better addressed to the DEV list.
>
> On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang 
> wrote:
>
>>
>> 
>>
>> I have read the design doc of the Flink planner recently. I've found the
>> Flink only uses Calcite as an SQL optimizer. It translates an optimized
>> RelNode to Flink(or Blink)RelNode, and then transfers it to the physical
>> plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier
>> way to use calcite?
>>
>>
>> The link of calcite daptor:calcite.apache.org/docs/adapter.html.
>>
>


[Flink SQL] Lookup join hbase problem

2021-06-27 Thread 纳兰清风
Hi,


  When I was using hbase table as my lookup table, I got this error:


Caused by: java.lang.IllegalArgumentException: Currently, HBase table can 
only be lookup by single row key.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)


My SQL is


insert into sink_kafka(user_ucid,city_code,`source`,system_type)
SELECT t1.ucid AS user_ucid,
 t1.city_code,
 t1.`source`,
 t1.system_type
FROM tmp_ucid_check t1
LEFT JOIN dim_hbase_valid_im_commercial_cust_di_cache for SYSTEM_TIME AS OF 
t1.proctime AS t2
ON concat(t1.ucid,'&',t1.city_code) = t2.rowKey
WHERE t2.city_code is NOT null
AND t1.city_code = t2.city_code; 


It maybe because the conditions in where clause, being pushed down as  a 
predicate into join clause ?
How can I solve this problem ? 


Thank you

Re: [Question] Why doesn't Flink use Calcite adapter?

2021-06-27 Thread Israel Ekpo
Maybe this question was better addressed to the DEV list.

On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang 
wrote:

>
> 
>
> I have read the design doc of the Flink planner recently. I've found the
> Flink only uses Calcite as an SQL optimizer. It translates an optimized
> RelNode to Flink(or Blink)RelNode, and then transfers it to the physical
> plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier
> way to use calcite?
>
>
> The link of calcite daptor:calcite.apache.org/docs/adapter.html.
>


Re: Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
Just found some additional info. It looks like one of the EC2 instances got
terminated at the time the crash happened and this job had 7 Task Managers
running on that EC2 instance. Now I suspect it's possible that when Yarn
tried to migrate the Task Managers, there were no idle containers as this
job was using like 99% of the entire cluster. However in that case
shouldn't Yarn wait for containers to become available? I'm not quite sure
how Flink would behave in this case. Could someone provide some insights
here? Thanks.

Thomas

On Sun, Jun 27, 2021 at 4:24 PM Thomas Wang  wrote:

> Hi,
>
> I recently experienced a job crash due to the underlying Yarn application
> failing for some reason. Here is the only error message I saw. It seems I
> can no longer see any of the Flink job logs.
>
> Application application_1623861596410_0010 failed 1 times (global limit
> =2; local limit is =1) due to ApplicationMaster for attempt
> appattempt_1623861596410_0010_01 timed out. Failing the application.
>
> I was running the Flink job using the Yarn session mode with the following
> command.
>
> export HADOOP_CLASSPATH=`hadoop classpath` &&
> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached
>
> I didn't have HA setup, but I believe the underlying Yarn application
> caused the crash because if, for some reason, the Flink job failed, the
> Yarn application should still survive. Please correct me if this is not the
> right assumption.
>
> My question is how I should find the root cause in this case and what's
> the recommended way to avoid this going forward?
>
> Thanks.
>
> Thomas
>


Yarn Application Crashed?

2021-06-27 Thread Thomas Wang
Hi,

I recently experienced a job crash due to the underlying Yarn application
failing for some reason. Here is the only error message I saw. It seems I
can no longer see any of the Flink job logs.

Application application_1623861596410_0010 failed 1 times (global limit =2;
local limit is =1) due to ApplicationMaster for attempt
appattempt_1623861596410_0010_01 timed out. Failing the application.

I was running the Flink job using the Yarn session mode with the following
command.

export HADOOP_CLASSPATH=`hadoop classpath` &&
/usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g -s 4 --detached

I didn't have HA setup, but I believe the underlying Yarn application
caused the crash because if, for some reason, the Flink job failed, the
Yarn application should still survive. Please correct me if this is not the
right assumption.

My question is how I should find the root cause in this case and what's the
recommended way to avoid this going forward?

Thanks.

Thomas