flink1.11.2 sql????datahub sls

2020-11-21 Thread ????
@all??
  Flink1.11.2
??
  ??flink sql??datahub??sls?? join 
??connector

Re:关于 Flink on K8S Deploy Job Cluster 部署问题

2020-11-21 Thread RS
taskmanager-job-deployment.yaml 和 jobmanager-job.yaml 部署的时候只用启动一次服务,

后续启动实际job的时候,就占用slot,available slot就少了 
当job执行完之后,slot资源就释放了,available的slot又恢复了,可以给下一次的job提供资源 
如果你的slot用完了的话,那就是资源不够了,需要重新配置taskmanager-job-deployment.yaml












在 2020-11-20 15:20:58,"WeiXubin" <18925434...@163.com> 写道:
>我们打算采用 Flink on K8S Job
>Cluster(perjob)的部署方式。我们使用taskmanager-job-deployment.yaml
>在K8S启动taskmananger,副本数为2,每个taskMananger的solt为8。我们把TaskMananger理解为资源池,当有一个Job启动时,会根据任务情况自动分配一定数量的TaskMananger给它,当它用完时把TaskMananger归还。
>
>当我们使用 jobmanager-job.yaml
>启动Job(Job只需要一个solt)时候,发现该Job会占用这两个TaskMananger,即使其并不需要那么多solt。这导致第二个Job启动时没有可用的TaskMananger,导致资源浪费。
>
>问题:
>是否pre-job模式每次启动都是需要创建 taskmanager-job-deployment.yaml 和
>jobmanager-job.yaml,然后这部分taskmananger归属于这个job,当运行完需要销毁掉
>taskmanager?但这样就会导致每次都要创建和销毁taskmanager
>
>Thanks,
>Bin
>
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Lateral join not finding correlate variable

2020-11-21 Thread Dylan Forciea
Godfrey,

Glad I could help! I suspected that was what the problem was. I have made a 
view in my postgres database to perform the inner lateral join, so that should 
let me work around this for the time being.

Thanks,
Dylan

From: godfrey he 
Date: Friday, November 20, 2020 at 1:09 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Lateral join not finding correlate variable

Hi Dylan,

I have reproduced your issue based on your code,
Currently Flink does not support such nested correlate pattern query.
I have created a issue to track this [1].
Thanks for your reporting and help.

[1] https://issues.apache.org/jira/browse/FLINK-20255

Best,
Godfrey

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年11月19日周四 
下午12:10写道:
Godfrey,

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace 
running exactly this code:

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction


@FunctionHint(output = new DataTypeHint("ROW"))
class SplitStringToRows extends TableFunction[Row] {
  def eval(str: String, separator: String = ";"): Unit = {
if (str != null) {
  str.split(separator).foreach(s => collect(Row.of(s.trim(
}
  }
}
object Job {

  def main(args: Array[String]): Unit = {
val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

streamTableEnv.createTemporarySystemFunction(
  "SplitStringToRows",
  classOf[SplitStringToRows]
) // Class defined in previous email

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id_source BIGINT PRIMARY KEY,
attr1_source STRING,
attr2 STRING
  ) WITH (
   'connector' = 'jdbc',
   'url' = 
'jdbc:postgresql://host.domain.com/db1?ssl=true',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr1_source STRING,
attr2 STRING,
attr3 DECIMAL,
attr4 DATE
  ) WITH (
   'connector' = 'jdbc',
   'url' = 
'jdbc:postgresql://host.domain.com/db1?ssl=true',
   'table-name' = '',
   'username' = '',
   'password' = '',
   'scan.fetch-size' = '500',
   'scan.auto-commit' = 'false')
""")

val q1 = streamTableEnv.sqlQuery("""
  SELECT
id_source AS id,
attr1_source AS attr1,
attr2
  FROM table1
""")
streamTableEnv.createTemporaryView("view1", q1)

val q2 = streamTableEnv.sqlQuery(
  """
SELECT
  a.attr1 AS attr1,
  attr2,
  attr3,
  attr4
FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS 
a(attr1)
""")
streamTableEnv.createTemporaryView("view2", q2)

val q3 = streamTableEnv.sqlQuery("""
SELECT
  w.attr1,
  p.attr3
FROM view1 w
LEFT JOIN LATERAL (
  SELECT
attr1,
attr3
  FROM (
SELECT
  attr1,
  attr3,
  ROW_NUMBER() OVER (
PARTITION BY attr1
ORDER BY
  attr4 DESC NULLS LAST,
  w.attr2 = attr2 DESC NULLS LAST
  ) AS row_num
  FROM view2)
  WHERE row_num = 1) p
ON (w.attr1 = p.attr1)
""")
streamTableEnv.createTemporaryView("view3", q3)

val view3 = streamTableEnv.from("view3")

view3
  .toRetractStream[Row]
  .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
  .setParallelism(1)

streamEnv.execute()
  }
}

Thanks,
Dylan Forciea

From: godfrey he mailto:godfre...@gmail.com>>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable

Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as the 
input tables,
but I still don't reproduce the exception. Could you provide your full code, 
including ddl, query, etc.
Thanks so much.

Best,
Godfrey




Re: Non uniform distribution of subtasks even with cluster.evenly-spread-out-slots

2020-11-21 Thread Till Rohrmann
Hi Harshit,

the cluster.evenly-spread-out-slots strategy works the following way. If
you schedule a task w/o preferred inputs (e.g. no inputs or too many inputs
(I think the threshold is 8 inputs)), then it will pick a slot from a
TaskManager with the lowest utilization. If, however, the task has an input
preference, then the input preference has precedence over the utilization.
In fact, the utilization is only a tie breaker in case that there are
multiple slots being local to the inputs or on the same hosts. Moreover,
the spread out strategy only works for the currently available set of
TaskManagers. If you are running on Yarn, Mesos or K8s where Flink can
dynamically allocate new TMs, then the system will first fill up the
available TMs before it allocates new ones.

The cluster.evenly-spread-out-slots strategy won't give you a guaranteed
spread out of slots but acts more as a heuristic. If you want to achieve
that all your tasks are evenly spread out across the cluster, then I would
suggest to set the parallelism of your sources to a multiple of the number
of TMs. That way the sources will be spread out and thus also the consumers
of these sources.

For your other questions:

1. The key group allocation will be non-uniform iff max-parallelism %
parallelism != 0. If this is the case, then you will have tasks which have
one additional key group than some of the other tasks. Depending on the
actual values and the number of slots s on a TaskManager, it can happen
that you process s more keygroups on TM1 compared to TM2.
2. No, the behaviour is still the same.
3. In order to answer this question you would have to give us a bit more
information about the actual distribution of tasks (which tasks are running
where).

Cheers,
Till

On Fri, Nov 20, 2020 at 6:46 PM Harshit Hajela 
wrote:

> Hi Flink Community,
>
> I'm currently running a heavy flink job on Flink 1.9.3 that has a lot of
> subtasks and observing some subtask distribution issues. The job in
> question has 9288 sub tasks and they are running on a large set of TMs
> (total available slots are 1792).
>
> I'm using the *cluster.evenly-spread-out-slots* configuration option to
> have the slots be allocated uniformly but I am still seeing non-uniform
> subtask distribution that seems to be affecting performance. It looks like
> some TMs are being overloaded and seeing a much greater than uniform
> allocation of subtasks. I've been trying to reproduce this situation at a
> smaller scale but have not been successful in doing so.
>
> As part of debugging the scheduling process when trying to reproduce this
> at a smaller scale I observed that the non-location preference
> selectWithoutLocationPreference override introduced by the evenly spread
> out strategy option is not being invoked at all as the execution vertices
> still have a location preference to be assigned the same slots as their
> input vertices.
>
> This happens at job startup time and not during recovery, so I'm not sure
> if recovery is where the non preference code path is invoked. In essence
> the only impact of using the evenly spread out strategy seems to be a
> slightly different candidate score calculation.
>
> I wanted to know:-
> 1. Is the evenly spread out strategy the right option to choose for
> achieving the uniform distribution of subtasks?
> 2. Is the observed scheduling behaviour expected for the evenly spread out
> strategy? When do we expect the non location preference code path to be
> invoked? For us this only happens on sources since they have no incoming
> edges.
>
> Apart from this I am still trying to understand the nature of scheduling
> in Flink and how that could bring about this situation, I was wondering if
> there were known issues or peculiarities of the Flink job scheduler that
> could lead to this situation occurring. For example I'm looking at the
> known issues mentioned in the ticket 
> https://issues.apache.org/jira/browse/FLINK-11815
> .
>
> I was hoping to understand :-
> 1. The conditions that would give rise to these kinds of situations or how
> to verify if we are running into them. For example, how to verify that key
> group allocation is non-uniform
> 2. If these issues have been addressed in subsequent versions of flink
> 3. If there is any other information about the nature of scheduling jobs
> in flink that could give rise to the non-uniform distribution observed.
>
> Please let me know if further information needs to be provided.
>
> Thanks,
> Harshit
>


Re: Force Join Unique Key

2020-11-21 Thread Jark Wu
Hi Rex,

The join key already has been used to organize records. As I said before,
"the join key is the key of the keyed states". So an iterate on the
MapState actually is a range scan (scan the join key prefix). However, this
will perform "seek" operation which is rather slow than "get" operation.

Best,
Jark

On Sat, 21 Nov 2020 at 09:47, Rex Fenley  wrote:

> I have a few more questions.
>
> Even if a join has no unique keys, couldn't the join key be used to
> organize records into a tree, of groups of records, per join key so that
> lookups are faster?
>
> I also have been looking at RocksDB docs and it looks like it has a
> RangeScan operation. I'm guessing then join keys could also be hashed in
> such a way to enable faster lookup by RangeScan. I also noticed mention of
> Prefix Iterators, which might actually do what I'm suggesting.
>
> Have either of these been considered?
>
> Thanks!
>
> On Thu, Nov 19, 2020 at 6:51 PM Rex Fenley  wrote:
>
>> I'm reading your response as rocksdb having to seek across the whole
>> dataset for the whole table, which we hope to avoid.
>>
>> What are the rules for the unique key and unique join key inference?
>> Maybe we can reorganize our plan to allow it to infer unique keys more
>> correctly.
>>
>> Thanks
>>
>> On Wed, Nov 18, 2020 at 9:50 PM Jark Wu  wrote:
>>
>>> Yes, exactly. The rocksdb has to "seek" data sets because it doesn't
>>> know how many entries are under the join key.
>>>
>>> On Thu, 19 Nov 2020 at 13:38, Rex Fenley  wrote:
>>>
 Ok, but if there is only 1 row per Join key on either side of the join,
 then wouldn't "iterate all the values in the MapState under the current
 key" effectively be "iterate 1 value in MapState under the current key"
 which would be O(1)? Or are you saying that it must seek across the entire
 dataset for the whole table even for that 1 row on either side of the join?

 Thanks for the help so far!

 On Wed, Nov 18, 2020 at 6:30 PM Jark Wu  wrote:

> Actually, if there is no unique key, it's not O(1), because there
> maybe multiple rows are joined by the join key, i.e. iterate all the 
> values
> in the MapState under the current key, this is a "seek" operation on
> rocksdb which is not efficient.
>
> Are you asking where the join key is set? The join key is set by the
> framework via `AbstractStreamOperator#setKeyContextElement1`.
>
> Best,
> Jark
>
> On Thu, 19 Nov 2020 at 03:18, Rex Fenley  wrote:
>
>> Thanks for the info.
>>
>> So even if there is no unique key inferred for a Row, the set of rows
>> to join on each Join key should effectively still be an O(1) lookup if 
>> the
>> join key is unique right?
>>
>> Also, I've been digging around the code to find where the lookup of
>> rows for a join key happens and haven't come across anything. Mind 
>> pointing
>> me in the right direction?
>>
>> Thanks!
>>
>> cc Brad
>>
>> On Wed, Nov 18, 2020 at 7:39 AM Jark Wu  wrote:
>>
>>> Hi Rex,
>>>
>>> Currently, the join operator may use 3 kinds of state structure
>>> depending on the input key and join key information.
>>>
>>> 1) input doesn't have a unique key => MapState,
>>> where the map key is the input row and the map value is the number
>>> of equal rows.
>>>
>>> 2) input has unique key, but the unique key is not a subset of join
>>> key => MapState
>>> this is better than the above one, because it has a shorter map key
>>> and
>>> is more efficient when retracting records.
>>>
>>> 3) input has a unique key, and the unique key is a subset of join
>>> key => ValueState
>>> this is the best performance, because it only performs a "get"
>>> operation rather than "seek" on rocksdb
>>>  for each record of the other input side.
>>>
>>> Note: the join key is the key of the keyed states.
>>>
>>> You can see the implementation differences
>>> in 
>>> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 02:30, Rex Fenley  wrote:
>>>
 Ok, what are the performance consequences then of having a join
 with NoUniqueKey if the left side's key actually is unique in practice?

 Thanks!


 On Tue, Nov 17, 2020 at 7:35 AM Jark Wu  wrote:

> Hi Rex,
>
> Currently, the unique key is inferred by the optimizer. However,
> the inference is not perfect.
> There are known issues that the unique key is not derived
> correctly, e.g. FLINK-20036 (is this opened by you?). If you think 
> you have
> the same case, please open an issue.
>
> Query hint is a nice way for this, but it is not supported yet.
> We have an issue to track