回复: flink1.12.2 "Failed to execute job"

2021-04-14 Thread 太平洋
Thanks. My Program read hundreds of small files from s3 by SQL. What has 
happened in the instantiation of the SplitEnumerator? What can i do to reduce 
the time now?


--原始邮件--
发件人:
"Becket Qin"



Re: 使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-14 Thread JasonLee
hi

你需要使用 Temporal Table Join 的语法,具体操作可以参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Great! Thanks for letting me know~

> 2021年4月15日 上午11:01,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thanks for the reminder. Yes, the original udf implementation does not 
> qualify the input and output type requirement. After adding a unit test, I 
> was able to find what's wrong, and fix my UDF implementation. Here is the new 
> implementation FYI.
> 
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(users, items):
> n_users, n_items = 943, 1682
> model = MatrixFactorization(n_users, n_items)
> model.load_state_dict(torch.load("model.pth"))
> return pd.Series(model(users, items).detach().numpy())
> 
> And here is the unit test.
> 
> def test_predict():
> f = predict._func
> users = pd.Series([1, 2, 3])
> items = pd.Series([1, 4, 9])
> preds = f(users, items)
> assert isinstance(preds, pd.Series)
> assert len(preds) == 3
> 
> Thank you so much!
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  > wrote:
> Hi Yik San,
> 
> 1) There are two kinds of Python UDFs in PyFlink: 
> - General Python UDFs which process input elements at row basis. That is, it 
> will process one row at a time. 
> -  Pandas UDFs which process input elements at batch basis.
> So you are correct that you need to use Pandas UDF for your requirements.
> 
> 2) For Pandas UDF, the input type for each input argument is Pandas.Series 
> and the result type should also be a Pandas.Series. Besides, the length of 
> the result should be the same as the inputs. Could you check if this is the 
> case for your Pandas UDF implementation? 
> 
> Regards,
> Dian
> 
> 
> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan  > wrote:
> The question is cross-posted on Stack Overflow 
> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>  
> .
> 
> I have a ML model that takes two numpy.ndarray - `users` and `items` - and 
> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
> 
> ```python
> model = load_model()
> 
> df = load_data() # the DataFrame includes 4 columns, namely, user_id, 
> movie_id, rating, and timestamp
> 
> users = df.user_id.values
> items = df.movie_id.values
> 
> predictions = model(users, items)
> ```
> 
> I am looking into porting this code into Flink to leverage its distributed 
> nature. My assumption is: by distributing the prediction workload on multiple 
> Flink nodes, I should be able to run the whole prediction faster.
> 
> So I compose a PyFlink job. Note I implement an UDF called `predict` to run 
> the prediction.
> 
> ```python
> # batch_prediction.py
> 
> model = load_model()
> 
> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)
> 
> SOURCE_DDL = """
> CREATE TABLE source (
> user_id INT,
> movie_id INT,
> rating TINYINT,
> event_ms BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'csv.field-delimiter' = '\t',
> 'path' = 'ml-100k/u1.test'
> )
> """
> 
> SINK_DDL = """
> CREATE TABLE sink (
> prediction DOUBLE
> ) WITH (
> 'connector' = 'print'
> )
> """
> 
> t_env.execute_sql(SOURCE_DDL)
> t_env.execute_sql(SINK_DDL)
> t_env.execute_sql(
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
> ).wait()
> ```
> 
> Here is the UDF.
> 
> ```python
> # batch_prediction.py (cont)
> 
> @udf(result_type=DataTypes.DOUBLE())
> def predict(user, item):
> return model([user], [item]).item()
> 
> t_env.create_temporary_function("predict", predict)
> ```
> 
> The job runs fine. However, the prediction actually runs on each and every 
> row of the `source` table, which is not performant. Instead, I want to split 
> the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, with each 
> batch having 800 rows. The job triggers the `model(users, items)` function 
> 100 times (= # of batch), where both `users` and `items` have 800 elements.
> 
> I couldn't find a way to do this. By looking at the 
> [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html
>  
> ),
>  vectorized user-defined functions may work.
> 
> ```python
> # batch_prediction.py (snippet)
> 
> # I add the func_type="pandas"
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(user, item):
> ...
> ```
> 
> Unfortunately, it doesn't.
> 
> ```
> > python batch_prediction.py
> ...
> Traceback (most recent call last):
>   File "batch_prediction.py", line 55, in 
> "INSERT INTO sink SELECT PREDICT(user_id, 

Re: 提交flink-sql 出现无法部署到yarn集群

2021-04-14 Thread Rui Li
可以按照log里提示的container ID去找对应的container日志来看看

On Wed, Apr 14, 2021 at 8:00 PM 张锴  wrote:

> 在用flink-sql的方式连接hive时 ,出现以下错误:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
> at
>
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
> at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 11 more
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1618298202025_0017 failed 1
> times (global limit =2; local limit is =1) due to AM Container for
> appattempt_1618298202025_0017_01 exited with  exitCode: 2
> Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from
> container-launch.
> Container id: container_e13_1618298202025_0017_01_01
> Exit code: 2。
>
> 由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。
>


-- 
Best regards!
Rui Li


Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
Hi Dian,

Thanks for the reminder. Yes, the original udf implementation does not
qualify the input and output type requirement. After adding a unit test, I
was able to find what's wrong, and fix my UDF implementation. Here is the
new implementation FYI.

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(users, items):
n_users, n_items = 943, 1682
model = MatrixFactorization(n_users, n_items)
model.load_state_dict(torch.load("model.pth"))
return pd.Series(model(users, items).detach().numpy())
And here is the unit test.

def test_predict():
f = predict._func
users = pd.Series([1, 2, 3])
items = pd.Series([1, 4, 9])
preds = f(users, items)
assert isinstance(preds, pd.Series)
assert len(preds) == 3

Thank you so much!

Best,
Yik San

On Wed, Apr 14, 2021 at 11:03 PM Dian Fu  wrote:

> Hi Yik San,
>
> 1) There are two kinds of Python UDFs in PyFlink:
> - General Python UDFs which process input elements at row basis. That is,
> it will process one row at a time.
> -  Pandas UDFs which process input elements at batch basis.
> So you are correct that you need to use Pandas UDF for your requirements.
>
> 2) For Pandas UDF, the input type for each input argument is Pandas.Series
> and the result type should also be a Pandas.Series. Besides, the length of
> the result should be the same as the inputs. Could you check if this is the
> case for your Pandas UDF implementation?
>
> Regards,
> Dian
>
>
> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
> wrote:
>
>> The question is cross-posted on Stack Overflow
>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>> .
>>
>> I have a ML model that takes two numpy.ndarray - `users` and `items` -
>> and returns an numpy.ndarray `predictions`. In normal Python code, I would
>> do:
>>
>> ```python
>> model = load_model()
>>
>> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
>> movie_id, rating, and timestamp
>>
>> users = df.user_id.values
>> items = df.movie_id.values
>>
>> predictions = model(users, items)
>> ```
>>
>> I am looking into porting this code into Flink to leverage its
>> distributed nature. My assumption is: by distributing the prediction
>> workload on multiple Flink nodes, I should be able to run the whole
>> prediction faster.
>>
>> So I compose a PyFlink job. Note I implement an UDF called `predict` to
>> run the prediction.
>>
>> ```python
>> # batch_prediction.py
>>
>> model = load_model()
>>
>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(exec_env,
>> environment_settings=settings)
>>
>> SOURCE_DDL = """
>> CREATE TABLE source (
>> user_id INT,
>> movie_id INT,
>> rating TINYINT,
>> event_ms BIGINT
>> ) WITH (
>> 'connector' = 'filesystem',
>> 'format' = 'csv',
>> 'csv.field-delimiter' = '\t',
>> 'path' = 'ml-100k/u1.test'
>> )
>> """
>>
>> SINK_DDL = """
>> CREATE TABLE sink (
>> prediction DOUBLE
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """
>>
>> t_env.execute_sql(SOURCE_DDL)
>> t_env.execute_sql(SINK_DDL)
>> t_env.execute_sql(
>> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>> ).wait()
>> ```
>>
>> Here is the UDF.
>>
>> ```python
>> # batch_prediction.py (cont)
>>
>> @udf(result_type=DataTypes.DOUBLE())
>> def predict(user, item):
>> return model([user], [item]).item()
>>
>> t_env.create_temporary_function("predict", predict)
>> ```
>>
>> The job runs fine. However, the prediction actually runs on each and
>> every row of the `source` table, which is not performant. Instead, I want
>> to split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
>> with each batch having 800 rows. The job triggers the `model(users, items)`
>> function 100 times (= # of batch), where both `users` and `items` have 800
>> elements.
>>
>> I couldn't find a way to do this. By looking at the [docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
>> vectorized user-defined functions may work.
>>
>> ```python
>> # batch_prediction.py (snippet)
>>
>> # I add the func_type="pandas"
>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
>> def predict(user, item):
>> ...
>> ```
>>
>> Unfortunately, it doesn't.
>>
>> ```
>> > python batch_prediction.py
>> ...
>> Traceback (most recent call last):
>>   File "batch_prediction.py", line 55, in 
>> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>>   File
>> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py",
>> line 76, in wait
>> get_method(self._j_table_result, "await")()
>>   File
>> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>>   File
>> 

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread Dian Fu
你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。

On Thu, Apr 15, 2021 at 10:24 AM magichuang  wrote:

> 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java
> version "16" 2021-03-16,这个有影响吗?  我是在"1.8.0_202" 上提交的
>
> 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1
> -ytm 1024m -p 1 -py traffic.py
>
>
>
>
> > -- 原始邮件 --
> > 发 件 人:"Dian Fu" 
> > 发送时间:2021-04-14 23:11:57
> > 收 件 人:user-zh 
> > 抄 送:
> > 主 题:Re: pyflink 运行提示:Function class 'class
> org.apache.flink.table.functions.python.PythonScalarFunction' is not
> serializable
> >
> > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
> >
> > [1]
> >
> https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
> >
> > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
> >
> > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > > yarn,per-job模式
> > >
> > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf
> 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> > >
> > >
> > >
> > >
> > > 主要代码
> > >
> > >
> t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > > '128m')
> > >
> > > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > >
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > >
> > >
> t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > >
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> > >
> > >
> > >
> > >
> > > t_env.add_python_archive("venv.zip")
> > >
> > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> > >
> > >
> > >
> > >
> > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > > result_type=DataTypes.INT())
> > >
> > > def judge_ip(src_ip, dst_ip):
> > >
> > > import IPy
> > >
> > > .
> > >
> > > t_env.register_function('judge_ip', judge_ip)
> > >
> > >
> > >
> > > 下面是主要报错信息
> > >
> > > Traceback (most recent call last):
> > >
> > > File "traffic-tuple-sf.py", line 59, in
> > >
> > > t_env.register_function('judge_ip', judge_ip)
> > >
> > > File
> > >
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > > line 876, in register_function
> > >
> > > File
> > >
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > > line 1286, in __call__
> > >
> > > File
> > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line
> > > 147, in deco
> > >
> > > File
> > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line
> > > 328, in get_return_value
> > >
> > > py4j.protocol.Py4JJavaError: An error occurred while calling
> > > o5.registerFunction.
> > >
> > > : org.apache.flink.table.api.ValidationException: Function class 'class
> > > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > > serializable. Make sure that the class is self-contained (i.e. no
> > > references to outer classes) and all inner fields are serializable as
> well.
> > >
> > > at
> > >
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
> > >
> > > at
> > >
> org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
> > >
> > > at
> > >
> org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
> > >
> > > at
> > >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
> > >
> > > at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > Method)
> > >
> > > at
> > >
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> > >
> > > at
> > >
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > >
> > > at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> > >
> > > at
> > >
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> > >
> > > at
> > >
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> > >
> > > at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> > >
> > > at
> > >
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> > >
> > > at
> > >
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> > >
> > > at
> > >
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> > >
> > > at java.base/java.lang.Thread.run(Thread.java:831)
> > >
> > > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to
> make
> > > field private final byte[] java.lang.String.value accessible: module
> > > java.base does not "opens java.lang" to unnamed 

Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread magichuang
您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java version 
"16" 2021-03-16,这个有影响吗?  我是在"1.8.0_202" 上提交的

提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 -ytm 
1024m -p 1 -py traffic.py




> -- 原始邮件 --
> 发 件 人:"Dian Fu" 
> 发送时间:2021-04-14 23:11:57
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: pyflink 运行提示:Function class 'class 
> org.apache.flink.table.functions.python.PythonScalarFunction' is not 
> serializable
>
> 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
>
> [1]
> https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m
>
> On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote:
>
> > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on
> > yarn,per-job模式
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
> >
> >
> >
> >
> > 主要代码
> >
> > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> > '128m')
> >
> > t_env.get_config().get_configuration().set_string("pipeline.jars",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> > t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
> >
> >
> >
> >
> > t_env.add_python_archive("venv.zip")
> >
> > t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
> >
> >
> >
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> > result_type=DataTypes.INT())
> >
> > def judge_ip(src_ip, dst_ip):
> >
> > import IPy
> >
> > .
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> >
> >
> > 下面是主要报错信息
> >
> > Traceback (most recent call last):
> >
> > File "traffic-tuple-sf.py", line 59, in
> >
> > t_env.register_function('judge_ip', judge_ip)
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> > line 876, in register_function
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> > line 1286, in __call__
> >
> > File
> > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> > 147, in deco
> >
> > File
> > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
> > 328, in get_return_value
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o5.registerFunction.
> >
> > : org.apache.flink.table.api.ValidationException: Function class 'class
> > org.apache.flink.table.functions.python.PythonScalarFunction' is not
> > serializable. Make sure that the class is self-contained (i.e. no
> > references to outer classes) and all inner fields are serializable as well.
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
> >
> > at
> > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
> >
> > at
> > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
> >
> > at
> > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
> >
> > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> >
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
> >
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >
> > at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> >
> > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> >
> > at
> > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> >
> > at java.base/java.lang.Thread.run(Thread.java:831)
> >
> > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
> > field private final byte[] java.lang.String.value accessible: module
> > java.base does not "opens java.lang" to unnamed module @1311d9fb
> >
> > at
> > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
> >
> > at
> > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
> >
> > at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
> >
> > at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
> >
> > at 

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Becket Qin
Hi,

Piotr is correct. The cause of this issue is likely because the
instantiation of the SplitEnumerator is done in the JM main thread.
FLINK-22282 has been created to address this issue.

Thanks,

Jiangjie (Becket) Qin

On Wed, Apr 14, 2021 at 10:32 PM Piotr Nowojski 
wrote:

> Hi,
>
> I haven't found anything strange in the logs (I've received logs in a
> separate message). It looks like the problem is that split enumeration is
> taking a long time, and currently this is being done in the Job Manager's
> main thread, blocking other things from executing. For the time being I
> think the only thing you can do is to either speed up the split enumeration
> (probably difficult) or increase the timeouts that are failing. I don't
> know if there is some other workaround at the moment (Becket?).
>
> Piotrek
>
> śr., 14 kwi 2021 o 15:57 Piotr Nowojski  napisał(a):
>
>> Hey,
>>
>> could you provide full logs from both task managers and job managers?
>>
>> Piotrek
>>
>> śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a):
>>
>>> After submit job, I received 'Failed to execute job' error. And the
>>> time between initialization and scheduling last 214s. What has happened
>>> during this period?
>>>
>>> version: flink: 1.12.2
>>> deployment: k8s standalone
>>> logs:
>>>
>>> 2021-04-14 12:47:58,547 WARN
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
>>> Property [transaction.timeout.ms] not specified. Setting it to 360
>>> ms
>>> 2021-04-14 12:48:04,175 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Job 1276000e99efdb77bdae0df88ab91da3 is submitted.
>>> 2021-04-14 12:48:04,175 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Submitting Job with JobId=1276000e99
>>> efdb77bdae0df88ab91da3.
>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>>> StandaloneDispatcher [] - Received JobGraph submission 
>>> 1276000e99efdb77bdae0df88ab91da3
>>> (Prediction Program).
>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>>> StandaloneDispatcher [] - Submitting job 1276000e99efdb77bdae0df88ab91da3
>>> (Prediction Program).
>>> 2021-04-14 12:48:04,250 INFO org.apache.flink.runtime.rpc.akka.
>>> AkkaRpcService [] - Starting RPC endpoint for
>>> org.apache.flink.runtime.jobmaster.JobMaster at
>>> akka://flink/user/rpc/jobmanager_8 .
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Initializing job Prediction Program (1276000e99
>>> efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using restart back off time strategy
>>> NoRestartBackoffTimeStrategy for Prediction Program (1276000e99
>>> efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Running initialization on master for job Prediction
>>> Program (1276000e99efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,252 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Successfully ran initialization on master in 0 ms.
>>> 2021-04-14 12:48:04,254 INFO org.apache.flink.runtime.scheduler.adapter.
>>> DefaultExecutionTopology [] - Built 10 pipelined regions in 0 ms
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using application-defined state backend:
>>> org.apache.flink.streaming.api.operators.sorted.state.
>>> BatchExecutionStateBackend@3ea8cd5a
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.checkpoint.
>>> CheckpointCoordinator [] - No checkpoint found during restore.
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using failover strategy
>>> org.apache.flink.runtime.executiongraph.failover.flip1.
>>> RestartPipelinedRegionFailoverStrategy@26845997 for Prediction Program (
>>> 1276000e99efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobManagerRunnerImpl [] - JobManager runner for job Prediction Program (
>>> 1276000e99efdb77bdae0df88ab91da3) was granted leadership with session
>>> id ---- at akka.tcp://flink@flink
>>> -jobmanager:6123/user/rpc/jobmanager_8.
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Starting execution of job Prediction Program 
>>> (1276000e99efdb77bdae0df88ab91da3)
>>> under job master id .
>>> 2021-04-14 12:48:04,255 INFO
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
>>> Starting split enumerator for source Source: 
>>> TableSourceScan(table=[[default_catalog,
>>> default_database, cpu_util, filter=[], project=[instance_id, value,
>>> timestamp]]], fields=[instance_id, value, timestamp]) -> 
>>> Calc(select=[instance_id,
>>> value, timestamp], where=[(timestamp > 1618145278)]) ->
>>> SinkConversionToDataPoint -> Map.
>>> org.apache.flink.util.FlinkException: 

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Additional observation: From the Flink repo, the file
"flink-python/pyflink/table/table.py" seems to support map(), flat_map()
and other row based operations although the 1.12 documentation doesn't
reflect that. Is that correct? From the code, it appears that these
operations are supported in Python.

Thanks,
Sumeet

On Thu, Apr 15, 2021 at 6:31 AM Sumeet Malhotra 
wrote:

> Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly
> Table APIs. The documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
> suggests that Map() function is not currently supported in Python. So, what
> do you think would be my options here. Should I convert to a data stream to
> perform this in Python?
>
> Thanks again,
> Sumeet
>
>
> On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> One thing that you can do is to read this record using Avro keeping
>> `Result` as `bytes` and in a subsequent mapping function, you could change
>> the record type and deserialize the result. In Data Stream API:
>>
>> source.map(new MapFunction> record_with_deserialized_result> { ...} )
>>
>> Best,
>> Piotrek
>>
>> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
>> napisał(a):
>>
>>> Hi,
>>>
>>> I'm reading data from Kafka, which is Avro encoded and has the following
>>> general schema:
>>>
>>> {
>>>   "name": "SomeName",
>>>   "doc": "Avro schema with variable embedded encodings",
>>>   "type": "record",
>>>   "fields": [
>>> {
>>>   "name": "Name",
>>>   "doc": "My name",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "ID",
>>>   "doc": "My ID",
>>>   "type": "string"
>>> },
>>> {
>>>   "name": "Result",
>>>   "doc": "Result data, could be encoded differently",
>>>   "type": "bytes"
>>> },
>>> {
>>>   "name": "ResultEncoding",
>>>   "doc": "Result encoding media type (e.g. application/avro,
>>> application/json)",
>>>   "type": "string"
>>> },
>>>   ]
>>> }
>>>
>>> Basically, the "Result" field is bytes whose interpretation depends upon
>>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>>> stream has its own well defined schema also.
>>>
>>> My use case involves extracting/aggregating data from within the
>>> embedded "Result" field. What would be the best approach to perform this
>>> runtime decoding and extraction of fields from the embedded byte data?
>>> Would user defined functions help in this case?
>>>
>>> Thanks in advance!
>>> Sumeet
>>>
>>>


WindowFunction is stuck until next message is processed although Watermark with idle timeout is applied.

2021-04-14 Thread Sung Gon Yi
Hello,

I have a question about watermark with idle timeout.

I made an example about it, 
https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala
 


There is WindowFunction with 5 sec tumbling window and messages are imported 
per 120 sec.
And idle timeout is set by 30 sec.

However, when running, first message had been processed after 120 sec, which 
means next message has been imported.

Please, tell me what I misunderstand about idle timeout and how to solve this 
problem.

Thanks,
Sung Gon

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly Table
APIs. The documentation (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations)
suggests that Map() function is not currently supported in Python. So, what
do you think would be my options here. Should I convert to a data stream to
perform this in Python?

Thanks again,
Sumeet


On Wed, Apr 14, 2021 at 7:09 PM Piotr Nowojski  wrote:

> Hi,
>
> One thing that you can do is to read this record using Avro keeping
> `Result` as `bytes` and in a subsequent mapping function, you could change
> the record type and deserialize the result. In Data Stream API:
>
> source.map(new MapFunction record_with_deserialized_result> { ...} )
>
> Best,
> Piotrek
>
> śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
> napisał(a):
>
>> Hi,
>>
>> I'm reading data from Kafka, which is Avro encoded and has the following
>> general schema:
>>
>> {
>>   "name": "SomeName",
>>   "doc": "Avro schema with variable embedded encodings",
>>   "type": "record",
>>   "fields": [
>> {
>>   "name": "Name",
>>   "doc": "My name",
>>   "type": "string"
>> },
>> {
>>   "name": "ID",
>>   "doc": "My ID",
>>   "type": "string"
>> },
>> {
>>   "name": "Result",
>>   "doc": "Result data, could be encoded differently",
>>   "type": "bytes"
>> },
>> {
>>   "name": "ResultEncoding",
>>   "doc": "Result encoding media type (e.g. application/avro,
>> application/json)",
>>   "type": "string"
>> },
>>   ]
>> }
>>
>> Basically, the "Result" field is bytes whose interpretation depends upon
>> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
>> stream has its own well defined schema also.
>>
>> My use case involves extracting/aggregating data from within the embedded
>> "Result" field. What would be the best approach to perform this runtime
>> decoding and extraction of fields from the embedded byte data? Would user
>> defined functions help in this case?
>>
>> Thanks in advance!
>> Sumeet
>>
>>


Flink 1.11 FlinkKafkaConsumer not propagating watermarks

2021-04-14 Thread Edward Bingham
Hi everyone,

I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some
Flink processors using Flink 1.12, and tried to get them working on Amazon
EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I
went to downgrade, I found, inexplicably, that watermarks were no longer
propagating.

There is only one partition on the topic, and parallelism is set to 1. Is
there something I'm missing here? I feel like I'm going a bit crazy.

I've cross-posted this on stackoverflow, but I figure the mailing list is
probably a better avenue for this question.

Thanks,
Ned


Here's the output for Flink 1.12 (correctly propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=8640 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=86400 watermark=0] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=864000 watermark=77760] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=864 watermark=855360] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=9223372036854775807 watermark=8631360] "test message"
Emitting watermark 9223372036768375807

And here is the output for Flink 1.11 (not propagating the watermark):

input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
  }, {
"id" : 2,
"type" : "Process",
"pact" : "Operator",
"contents" : "Process",
"parallelism" : 1,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}
input 1
(name=input, internal=false, partitions=(partition=0,
leader=mycomputer:9092 (id: 0 rack: null), replicas=mycomputer:9092
(id: 0 rack: null), isr=mycomputer:9092 (id: 0 rack: null)),
authorizedOperations=null)
Assigning timestamp 8640
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 0
Assigning timestamp 86400
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 77760
Assigning timestamp 864000
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 855360
Assigning timestamp 864
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 8631360
Assigning timestamp 9223372036854775807
Source [timestamp=0 watermark=-9223372036854775808] "test message"
Emitting watermark 9223372036768375807

Here's the integration test that exposes it:

package mytest;
import com.fasterxml.jackson.core.JsonProcessingException;import
com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;import java.io.InputStream;import
java.io.IOException;
import java.nio.file.Files;import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Arrays;import
java.util.concurrent.CompletableFuture;import
java.util.concurrent.TimeUnit;import java.util.Date;import
java.util.HashMap;import java.util.Map;import java.util.Properties;
import kafka.server.KafkaConfig;import kafka.server.KafkaServer;
import kafka.utils.MockTime;import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.flink.api.common.eventtime.TimestampAssigner;import
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;import
org.apache.flink.api.common.eventtime.Watermark;import
org.apache.flink.api.common.eventtime.WatermarkGenerator;import
org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;import
org.apache.flink.api.common.eventtime.WatermarkOutput;import
org.apache.flink.api.common.eventtime.WatermarkStrategy;import
org.apache.flink.api.common.JobExecutionResult;import

Re: Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Thanks Piotrek for the references.

Cheers.
Hemant

On Wed, Apr 14, 2021 at 7:18 PM Piotr Nowojski  wrote:

> Hi,
>
> Depending how you configured your FlinkKafkaSource, but you can make the
> source to commit consumed offsets back to Kafka. So one way to examine
> them, would be to check those offsets in Kafka (I don't know how, but I'm
> pretty sure there is a way to do it).
>
> Secondly, if you want to examine Flink's checkpoint state you can use
> State Processor API to do that [1]. As far as I know you could hook up your
> checkpointed data to Table API/SQL and use SQL to query/analyse the state.
>
> Best
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> śr., 14 kwi 2021 o 11:25 bat man  napisał(a):
>
>> Hi All,
>>
>> Is there any way I can inspect/query the checkpointed data. Scenario is
>> like this -
>>
>> We have a high volume of data coming in the data stream pipeline for
>> which kafka is source, in case if fails bcoz of bad data I want to
>> analyse the data which caused the issue. It could be that some data source
>> starts sending bad data so I want to go in kafka to that particular offset
>> and do some analysis before I start the job with checkpointed data.
>>
>> Can anyone suggest how this can be achieved.
>>
>> Thanks,
>> Hemant
>>
>>
>>


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
On a side note - I changed to use the batch mode per your suggestion Timo, and 
my job ran much faster and with deterministic counts with parallelism turned 
on. So I'll probably utilize that for now. However, it would still be nice to 
dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea"  wrote:

Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary 
info without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
attr, attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
  :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], 
$f4=[CASE(IS NOT NULL($3), $3, $1)])
  :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
  : :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
  : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
  :+- LogicalProject(id2=[$1], attr=[$0])
  :   +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
  +- LogicalTableScan(table=[[default_catalog, default_database, 
table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, 
attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
  :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), 
attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
  : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], 
select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
  ::  +- TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr], changelogMode=[I])
  :+- Exchange(distribution=[hash[id2]], 
changelogMode=[I,UB,UA])
  :   +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr], changelogMode=[I,UB,UA])
  :  +- Exchange(distribution=[hash[id2]], 
changelogMode=[I])
  : +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
  +- Exchange(distribution=[hash[attr]], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
ship_strategy : HASH

Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 8 : Attr
content : Calc(select=[id1, attr, id2, attr0, 
(attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
ship_strategy : FORWARD

Stage 10 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table3]], fields=[attr, attr_mapped])

Stage 12 : Attr
content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], 
select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 13 : Attr
content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE 
id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS 
NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS 
attr_mapped])
ship_strategy : FORWARD


Aw: Re: JSON source for pyflink stream

2021-04-14 Thread G . G . M . 5611
Thanks to everyone for the tips. It helps a lot. I'll try the table API first and if that doesn't succeed I'll do as Klemens says.

Cheers,

Giacomo

 
 

Gesendet: Mittwoch, 14. April 2021 um 16:18 Uhr
Von: "Dian Fu" 
An: "Klemens Muthmann" 
Cc: "Yik San Chan" , g.g.m.5...@web.de, "user" 
Betreff: Re: JSON source for pyflink stream


Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in PyFlink Table API and so you could use file system connector in PyFlink Table API. AFAIK, it supports new line delimited json in the filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian
 


On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann  wrote:


Hi,
 

We are loading our JSON from a Mongo Database. But we also found no readily available way to stream JSON Data into a Flink Pipeline. I guess this would be hard to implement since you have to know details about the JSON structure to do this. So I guess your best bet would be to implement your own input source, which can stream in your file and create results based on the JSON structure. We are not using Pyflink so I can not give any details about this, but it should not matter, which language you use.

 

Just implement a Source reading your input and employ any JSON parser you like, creating for example domain objects with the same attributes as your JSON structure and forward those into your Flink Pipeline for further processing.

 


Regards

Klemens



 

Am 14.04.2021 um 04:40 schrieb Yik San Chan :
 


Hi Giacomo,
 

I think you can try using Flink SQL connector. For JSON input such as {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:

 

CREATE TABLE data (

  a INT,

  b ROW>

) WITH (...)

 

Let me know if that helps.

 

Best,

Yik San

 


On Wed, Apr 14, 2021 at 2:00 AM  wrote:




Hi,
I'm new to Flink and I am trying to create a stream from locally downloaded tweets. The tweets are in json format, like in this example:
 
{"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
"author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter for Android","lang":"in"},
"includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan Pareda","created_at":"2021-03-05T14:07:56.000Z",
"public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
"username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
 
I would like to do it in Python using Pyflink, but could also use Java if there is no reasonable way to do it in Python. I've been looking at different options for loading these objects into a stream, but am not sure what to do. Here's my situation so far:
 
1. There doesn't seem to be a fitting connector. The filesystem-connector doesn't seem to support json format.
2. I've seen in the archive of this mailing list that some reccomend to use the Table API. But I am not sure if this is a viable option given how nested the json objects are.
3. I could of course try to implement a custom DataSource, but that seems to be quite difficult so I'd only consider this if there's no other way.

I'll be very grateful for any kind of input.
Cheers,
Giacomo
 

















Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Arvid Heise
Hi Agnelo,

if you reprocess all data and delete all old records with the old schema,
then you have to add the schema to DDL and it will work.

If you have records with old and new schema in your topic, you need to
attach the schema information to the records. Avro records themselves do
not have any metadata and Flink (or any other consumer) cannot convert them
in any way.
The usual approach to attach the schema information is to use the schema
registry which Flink also supports.

On Wed, Apr 14, 2021 at 5:22 PM Agnelo Dcosta 
wrote:

> Hi Arvid, thanks for the reply.
> We are following the 1.12 documentation here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro.html#data-type-mapping
> *Currently, the Avro schema is always derived from table schema.
> Explicitly defining an Avro schema is not supported yet.*
>
> And the issue is we need to have our producers use a newer topic
> schema(one additional column). Flink will also have this new column
> eventually. However for the time after deploying producer changes and
> before flink changes, flink will crash with failure to deserialize.
> Trying to see if there is any setting that could enable flink to continue
> reading new schema, without having that field specified in the table
> definition.
>
>
> On Wed, Apr 14, 2021 at 12:46 AM Arvid Heise  wrote:
>
>> For any schema change to be gracefully supported. You need to know both
>> schemas (old + new) on reader side (=Flink). I'm curious how Flink should
>> know the old schema as you only provide the new schema, right?
>>
>> Usually, you use the schema registry, such that each record has it's own
>> schema attached to it (=old). You opted to not go for it, so I'm not sure
>> how Flink is supposed to know the old schema.
>>
>> On Wed, Apr 14, 2021 at 9:09 AM Agnelo Dcosta 
>> wrote:
>>
>>> Hi Arvid,
>>> > writer schema encoded if you are using no schema registry?
>>> on the producer side we are using node with
>>> https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
>>> and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to
>>> publish messages. We specify the avro schema file to encode messages in
>>> avro format.
>>>
>>> >Flink know with which schema the data has been written so that it can
>>> map it to the new schema?
>>> With 1.11 we used to specify the schema file as part of the flink sql
>>> table definition.
>>> However with 1.12 the schema is derived from the message/table
>>> definition. We do not specify any schema as such.
>>>
>>>
>>> On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise  wrote:
>>>
 Hi Agnelo,

 How is the writer schema encoded if you are using no schema registry?
 Or phrased differently: how does Flink know with which schema the data has
 been written so that it can map it to the new schema?

 On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <
 agnelo.m.dco...@gmail.com> wrote:

> Hi, we are using Flink SQL 1.12 and have a couple of tables created
> from kafka topics. Format is avro (not confluent avro) and no schema
> registry as such.
>
> In flink 1.11 we used to specify the schema, however in 1.12 the
> schema is derived from the message itself.
>
> Is it possible for the producers to start sending new fields without
> changes in the flink app?
>
>
>
> For example :
>
> {
>
>   "name": "topic1",
>
>   "type": "record",
>
>   "fields": [
>
>   {
>
>   "name": "field1",
>
>   "type": "string"
>
> },
>
> {
>
>   "name": "field2",
>
>   "type": "string"
>
> },
>
> {
>
>   *"name": "field3",*
>
> *  "type": "string"*
>
> },
>
> ]
>
> }
>
>
>
> Flink table has:
>
> CREATE TABLE topic1(\n"
>
> + " field1 string not null \n"
>
> + " ,field2 string not null \n"
>
> "'connector' = 'kafka' \n"
>
>  + ",'topic' = 'topic1' \n"
>
>  + ",'scan.startup.mode' = 'latest-offset' \n"
>
>  + ",'properties.group.id' = 'topic1' \n"
>
>  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>
>   + ",'properties.enable.auto.commit' = 'true' \n"
>
>  + ",'format' = 'avro' \n";
>
>
>
> With above settings I get a deserialization error:
>
>
>
> *java.io.IOException: Failed to deserialize Avro record.*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
> 

Flink Hadoop config on docker-compose

2021-04-14 Thread Flavio Pompermaier
Hi everybody,
I'm trying to set up reading from HDFS using docker-compose and Flink
1.11.3.
If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
using FLINK_PROPERTIES (under environment section of the docker-compose
service) I see in the logs the following line:

"Could not find Hadoop configuration via any of the supported method"

If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
generated by the run scripts.
Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
environment section of the docker-compose service) I don't see that line.

Is this the expected behavior?

Below the relevant docker-compose service I use (I've removed the content
of HADOOP_CLASSPATH content because is too long and I didn't report the
taskmanager that is similar):

flink-jobmanager:
container_name: flink-jobmanager
build:
  context: .
  dockerfile: Dockerfile.flink
  args:
FLINK_VERSION: 1.11.3-scala_2.12-java11
image: 'flink-test:1.11.3-scala_2.12-java11'
ports:
  - "8091:8081"
  - "8092:8082"
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.port: 8081
historyserver.web.port: 8082
web.upload.dir: /opt/flink
env.hadoop.conf.dir: /opt/hadoop/conf
env.yarn.conf.dir: /opt/hadoop/conf
  - |
HADOOP_CLASSPATH=...
  - HADOOP_CONF_DIR=/opt/hadoop/conf
  - YARN_CONF_DIR=/opt/hadoop/conf
volumes:
  - 'flink_shared_folder:/tmp/test'
  - 'flink_uploads:/opt/flink/flink-web-upload'
  - 'flink_hadoop_conf:/opt/hadoop/conf'
  - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'


Thanks in advance for any support,
Flavio


Re: 提交flink-sql 出现无法部署到yarn集群

2021-04-14 Thread Jeff Zhang
看 yarn app log

张锴  于2021年4月14日周三 下午8:00写道:

> 在用flink-sql的方式连接hive时 ,出现以下错误:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not deploy Yarn job cluster.
> at
>
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
> at
>
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
> at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 11 more
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1618298202025_0017 failed 1
> times (global limit =2; local limit is =1) due to AM Container for
> appattempt_1618298202025_0017_01 exited with  exitCode: 2
> Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from
> container-launch.
> Container id: container_e13_1618298202025_0017_01_01
> Exit code: 2。
>
> 由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。
>


-- 
Best Regards

Jeff Zhang


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary info 
without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
  :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS 
NOT NULL($3), $3, $1)])
  :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
  : :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
  : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
  :+- LogicalProject(id2=[$1], attr=[$0])
  :   +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
  +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, 
id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
  :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, 
attr) AS $f4], changelogMode=[I,UB,UA,D])
  : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, 
attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
  ::  +- TableSourceScan(table=[[default_catalog, default_database, 
table1]], fields=[id1, attr], changelogMode=[I])
  :+- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
  :   +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS 
attr], changelogMode=[I,UB,UA])
  :  +- Exchange(distribution=[hash[id2]], changelogMode=[I])
  : +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
  +- Exchange(distribution=[hash[attr]], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
ship_strategy : HASH

Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 8 : Attr
content : Calc(select=[id1, attr, id2, attr0, 
(attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
ship_strategy : FORWARD

Stage 10 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table3]], fields=[attr, attr_mapped])

Stage 12 : Attr
content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], 
select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH

Stage 13 : Attr
content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE 
id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS 
NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS 
attr_mapped])
ship_strategy : FORWARD

Stage 14 : Data Sink
content : Sink: 
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped])
ship_strategy : FORWARD

On 4/14/21, 10:08 AM, "Timo Walther"  wrote:

Can you share the resulting plan with us? Ideally with the ChangelogMode 
detail enabled as well.

statementSet.explain(...)

Maybe this could help.

Regards,
Timo



On 14.04.21 16:47, Dylan 

Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Agnelo Dcosta
Hi Arvid, thanks for the reply.
We are following the 1.12 documentation here:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro.html#data-type-mapping
*Currently, the Avro schema is always derived from table schema. Explicitly
defining an Avro schema is not supported yet.*

And the issue is we need to have our producers use a newer topic schema(one
additional column). Flink will also have this new column eventually.
However for the time after deploying producer changes and before flink
changes, flink will crash with failure to deserialize.
Trying to see if there is any setting that could enable flink to continue
reading new schema, without having that field specified in the table
definition.


On Wed, Apr 14, 2021 at 12:46 AM Arvid Heise  wrote:

> For any schema change to be gracefully supported. You need to know both
> schemas (old + new) on reader side (=Flink). I'm curious how Flink should
> know the old schema as you only provide the new schema, right?
>
> Usually, you use the schema registry, such that each record has it's own
> schema attached to it (=old). You opted to not go for it, so I'm not sure
> how Flink is supposed to know the old schema.
>
> On Wed, Apr 14, 2021 at 9:09 AM Agnelo Dcosta 
> wrote:
>
>> Hi Arvid,
>> > writer schema encoded if you are using no schema registry?
>> on the producer side we are using node with
>> https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
>> and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to
>> publish messages. We specify the avro schema file to encode messages in
>> avro format.
>>
>> >Flink know with which schema the data has been written so that it can
>> map it to the new schema?
>> With 1.11 we used to specify the schema file as part of the flink sql
>> table definition.
>> However with 1.12 the schema is derived from the message/table
>> definition. We do not specify any schema as such.
>>
>>
>> On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise  wrote:
>>
>>> Hi Agnelo,
>>>
>>> How is the writer schema encoded if you are using no schema registry? Or
>>> phrased differently: how does Flink know with which schema the data has
>>> been written so that it can map it to the new schema?
>>>
>>> On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta 
>>> wrote:
>>>
 Hi, we are using Flink SQL 1.12 and have a couple of tables created
 from kafka topics. Format is avro (not confluent avro) and no schema
 registry as such.

 In flink 1.11 we used to specify the schema, however in 1.12 the schema
 is derived from the message itself.

 Is it possible for the producers to start sending new fields without
 changes in the flink app?



 For example :

 {

   "name": "topic1",

   "type": "record",

   "fields": [

   {

   "name": "field1",

   "type": "string"

 },

 {

   "name": "field2",

   "type": "string"

 },

 {

   *"name": "field3",*

 *  "type": "string"*

 },

 ]

 }



 Flink table has:

 CREATE TABLE topic1(\n"

 + " field1 string not null \n"

 + " ,field2 string not null \n"

 "'connector' = 'kafka' \n"

  + ",'topic' = 'topic1' \n"

  + ",'scan.startup.mode' = 'latest-offset' \n"

  + ",'properties.group.id' = 'topic1' \n"

  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"

   + ",'properties.enable.auto.commit' = 'true' \n"

  + ",'format' = 'avro' \n";



 With above settings I get a deserialization error:



 *java.io.IOException: Failed to deserialize Avro record.*

 *at
 org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
 ~[flink-sql-avro-1.12.0.jar:1.12.0]*

 *at
 org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
 ~[flink-sql-avro-1.12.0.jar:1.12.0]*

 *at
 org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
 ~[flink-core-1.12.0.jar:1.12.0]*

 *at
 org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
 ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

 *at
 org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
 ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

 *at
 

Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread Dian Fu
你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。

[1]
https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m

On Wed, Apr 14, 2021 at 4:58 PM magichuang  wrote:

> flink版本:1.11.2   Python版本 3.6 apache-flink==1.11.2,  用的是flink on
> yarn,per-job模式
>
> 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中
>
>
>
>
> 主要代码
>
> t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
> '128m')
>
> t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
>
> t_env.get_config().get_configuration().set_string("pipeline.classpaths",
> "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")
>
>
>
>
> t_env.add_python_archive("venv.zip")
>
> t_env.get_config().set_python_executable("venv.zip/venv/bin/python")
>
>
>
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()],
> result_type=DataTypes.INT())
>
> def judge_ip(src_ip, dst_ip):
>
> import IPy
>
> .
>
> t_env.register_function('judge_ip', judge_ip)
>
>
>
> 下面是主要报错信息
>
> Traceback (most recent call last):
>
> File "traffic-tuple-sf.py", line 59, in 
>
> t_env.register_function('judge_ip', judge_ip)
>
> File
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 876, in register_function
>
> File
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>
> File
> "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
> 147, in deco
>
> File
> "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
> 328, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o5.registerFunction.
>
> : org.apache.flink.table.api.ValidationException: Function class 'class
> org.apache.flink.table.functions.python.PythonScalarFunction' is not
> serializable. Make sure that the class is self-contained (i.e. no
> references to outer classes) and all inner fields are serializable as well.
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)
>
> at
> org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)
>
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
>
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.base/java.lang.Thread.run(Thread.java:831)
>
> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
> field private final byte[] java.lang.String.value accessible: module
> java.base does not "opens java.lang" to unnamed module @1311d9fb
>
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
>
> at
> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>
> at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
>
> at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>
> at
> org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346)
>
> ... 14 more
>
>
>
>
> 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~   感谢
>
>
>
>
>
>
>
>


Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Hi Yik San,

1) There are two kinds of Python UDFs in PyFlink:
- General Python UDFs which process input elements at row basis. That is,
it will process one row at a time.
-  Pandas UDFs which process input elements at batch basis.
So you are correct that you need to use Pandas UDF for your requirements.

2) For Pandas UDF, the input type for each input argument is Pandas.Series
and the result type should also be a Pandas.Series. Besides, the length of
the result should be the same as the inputs. Could you check if this is the
case for your Pandas UDF implementation?

Regards,
Dian


On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
> .
>
> I have a ML model that takes two numpy.ndarray - `users` and `items` - and
> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
>
> ```python
> model = load_model()
>
> df = load_data() # the DataFrame includes 4 columns, namely, user_id,
> movie_id, rating, and timestamp
>
> users = df.user_id.values
> items = df.movie_id.values
>
> predictions = model(users, items)
> ```
>
> I am looking into porting this code into Flink to leverage its distributed
> nature. My assumption is: by distributing the prediction workload on
> multiple Flink nodes, I should be able to run the whole prediction faster.
>
> So I compose a PyFlink job. Note I implement an UDF called `predict` to
> run the prediction.
>
> ```python
> # batch_prediction.py
>
> model = load_model()
>
> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(exec_env,
> environment_settings=settings)
>
> SOURCE_DDL = """
> CREATE TABLE source (
> user_id INT,
> movie_id INT,
> rating TINYINT,
> event_ms BIGINT
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'csv.field-delimiter' = '\t',
> 'path' = 'ml-100k/u1.test'
> )
> """
>
> SINK_DDL = """
> CREATE TABLE sink (
> prediction DOUBLE
> ) WITH (
> 'connector' = 'print'
> )
> """
>
> t_env.execute_sql(SOURCE_DDL)
> t_env.execute_sql(SINK_DDL)
> t_env.execute_sql(
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
> ).wait()
> ```
>
> Here is the UDF.
>
> ```python
> # batch_prediction.py (cont)
>
> @udf(result_type=DataTypes.DOUBLE())
> def predict(user, item):
> return model([user], [item]).item()
>
> t_env.create_temporary_function("predict", predict)
> ```
>
> The job runs fine. However, the prediction actually runs on each and every
> row of the `source` table, which is not performant. Instead, I want to
> split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
> with each batch having 800 rows. The job triggers the `model(users, items)`
> function 100 times (= # of batch), where both `users` and `items` have 800
> elements.
>
> I couldn't find a way to do this. By looking at the [docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
> vectorized user-defined functions may work.
>
> ```python
> # batch_prediction.py (snippet)
>
> # I add the func_type="pandas"
> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
> def predict(user, item):
> ...
> ```
>
> Unfortunately, it doesn't.
>
> ```
> > python batch_prediction.py
> ...
> Traceback (most recent call last):
>   File "batch_prediction.py", line 55, in 
> "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py",
> line 76, in wait
> get_method(self._j_table_result, "await")()
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File
> "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o51.await.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.table.api.TableException: Failed to wait job finish
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
> at
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> 

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Piotrek,

I am looking at the count of records present in the sink table in Postgres 
after the entire job completes, not the number of inserts/retracts. I can see 
as the job runs that records are added and removed from the “sink” table. With 
parallelism set to 1, it always comes out to the same number (which is 
consistent with the number of ids in the source tables “table1” and “table2”), 
at about 491k records in table “sink” when the job is complete. With the 
parallelism set to 16, the “sink” table will have somewhere around 360k records 
+/- 20k when the job is complete. I truncate the “sink” table before I run the 
job, and this is a test environment where the source databases are static.

I removed my line for setting to Batch mode per Timo’s suggestion, and am still 
running with MAX which should have deterministic output.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:38 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting 
retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the 
first record comes in it will be immediately emitted with NULLs (not matched, 
as the other table is empty). Later if a matching record is received from the 
second table, the previous result will be retracted and the new one, updated, 
will be re-emitted. Maybe this is what you are observing in the varying output?

Maybe you could try to analyse how the results differ between different runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I replaced the FIRST_VALUE with MAX to ensure that the results should be 
identical even in their content, and my problem still remains – I end up with a 
nondeterministic count of records being emitted into the sink when the 
parallelism is over 1, and that count is about 20-25% short (and not 
consistent) of what comes out consistently when parallelism is set to 1.

Dylan

From: Dylan Forciea mailto:dy...@oseberg.io>>
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski mailto:pnowoj...@apache.org>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Piotr Nowojski
Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting
retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when
the first record comes in it will be immediately emitted with NULLs (not
matched, as the other table is empty). Later if a matching record is
received from the second table, the previous result will be retracted and
the new one, updated, will be re-emitted. Maybe this is what you are
observing in the varying output?

Maybe you could try to analyse how the results differ between different
runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea  napisał(a):

> I replaced the FIRST_VALUE with MAX to ensure that the results should be
> identical even in their content, and my problem still remains – I end up
> with a nondeterministic count of records being emitted into the sink when
> the parallelism is over 1, and that count is about 20-25% short (and not
> consistent) of what comes out consistently when parallelism is set to 1.
>
>
>
> Dylan
>
>
>
> *From: *Dylan Forciea 
> *Date: *Wednesday, April 14, 2021 at 9:08 AM
> *To: *Piotr Nowojski 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> Pitorek,
>
>
>
> I was actually originally using a group function that WAS deterministic
> (but was a custom UDF I made), but chose something here built in. By
> non-deterministic, I mean that the number of records coming out is not
> consistent. Since the FIRST_VALUE here is on an attribute that is not part
> of the key, that shouldn’t affect the number of records coming out I
> wouldn’t think.
>
>
>
> Dylan
>
>
>
> *From: *Piotr Nowojski 
> *Date: *Wednesday, April 14, 2021 at 9:06 AM
> *To: *Dylan Forciea 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> Hi,
>
>
>
> Yes, it looks like your query is non deterministic because of
> `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel
> sources, each time you run your query your first value might be different.
> If that's the case, you could try to confirm it with even smaller query:
>
>
>
>SELECT
>   id2,
>   FIRST_VALUE(attr) AS attr
> FROM table2
> GROUP BY id2
>
>
>
> Best,
>
> Piotrek
>
>
>
> śr., 14 kwi 2021 o 14:45 Dylan Forciea  napisał(a):
>
> I am running Flink 1.12.2, and I was trying to up the parallelism of my
> Flink SQL job to see what happened. However, once I did that, my results
> became nondeterministic. This happens whether I set the
> table.exec.resource.default-parallelism config option or I set the default
> local parallelism to something higher than 1. I would end up with less
> records in the end, and each time I ran the output record count would come
> out differently.
>
>
>
> I managed to distill an example, as pasted below (with attribute names
> changed to protect company proprietary info), that causes the issue. I feel
> like I managed to get it to happen with a LEFT JOIN rather than a FULL
> JOIN, but the distilled version wasn’t giving me wrong results with that.
> Maybe it has to do with joining to a table that was formed using a GROUP
> BY? Can somebody tell if I’m doing something that is known not to work, or
> if I have run across a bug?
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
>
>
> object Job {
>
>   def main(args: Array[String]): Unit = {
>
> StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
>
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> val configuration = streamTableEnv.getConfig().getConfiguration()
>
> configuration.setInteger("table.exec.resource.default-parallelism", 16
> )
>
>
>
> streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table1 (
>
> id1 STRING PRIMARY KEY NOT ENFORCED,
>
> attr STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table1’,
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table2 (
>
> attr STRING,
>
> id2 STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table2',
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table3 (
>
> attr STRING 

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Piotr Nowojski
Hi,

I haven't found anything strange in the logs (I've received logs in a
separate message). It looks like the problem is that split enumeration is
taking a long time, and currently this is being done in the Job Manager's
main thread, blocking other things from executing. For the time being I
think the only thing you can do is to either speed up the split enumeration
(probably difficult) or increase the timeouts that are failing. I don't
know if there is some other workaround at the moment (Becket?).

Piotrek

śr., 14 kwi 2021 o 15:57 Piotr Nowojski  napisał(a):

> Hey,
>
> could you provide full logs from both task managers and job managers?
>
> Piotrek
>
> śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a):
>
>> After submit job, I received 'Failed to execute job' error. And the time
>> between initialization and scheduling last 214s. What has happened
>> during this period?
>>
>> version: flink: 1.12.2
>> deployment: k8s standalone
>> logs:
>>
>> 2021-04-14 12:47:58,547 WARN org.apache.flink.streaming.connectors.kafka.
>> FlinkKafkaProducer [] - Property [transaction.timeout.ms] not specified.
>> Setting it to 360 ms
>> 2021-04-14 12:48:04,175 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Job 1276000e99efdb77bdae0df88ab91da3 is submitted.
>> 2021-04-14 12:48:04,175 INFO
>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
>> [] - Submitting Job with JobId=1276000e99efdb77bdae0df88ab91da3.
>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>> StandaloneDispatcher [] - Received JobGraph submission 
>> 1276000e99efdb77bdae0df88ab91da3
>> (Prediction Program).
>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>> StandaloneDispatcher [] - Submitting job 1276000e99efdb77bdae0df88ab91da3
>> (Prediction Program).
>> 2021-04-14 12:48:04,250 INFO org.apache.flink.runtime.rpc.akka.
>> AkkaRpcService [] - Starting RPC endpoint for
>> org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_8 .
>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Initializing job Prediction Program (1276000e99
>> efdb77bdae0df88ab91da3).
>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Using restart back off time strategy NoRestartBackoffTimeStrategy
>> for Prediction Program (1276000e99efdb77bdae0df88ab91da3).
>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Running initialization on master for job Prediction Program (
>> 1276000e99efdb77bdae0df88ab91da3).
>> 2021-04-14 12:48:04,252 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Successfully ran initialization on master in 0 ms.
>> 2021-04-14 12:48:04,254 INFO org.apache.flink.runtime.scheduler.adapter.
>> DefaultExecutionTopology [] - Built 10 pipelined regions in 0 ms
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Using application-defined state backend:
>> org.apache.flink.streaming.api.operators.sorted.state.
>> BatchExecutionStateBackend@3ea8cd5a
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.checkpoint.
>> CheckpointCoordinator [] - No checkpoint found during restore.
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Using failover strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.
>> RestartPipelinedRegionFailoverStrategy@26845997 for Prediction Program (
>> 1276000e99efdb77bdae0df88ab91da3).
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>> JobManagerRunnerImpl [] - JobManager runner for job Prediction Program (
>> 1276000e99efdb77bdae0df88ab91da3) was granted leadership with session id
>> ---- at akka.tcp://flink@flink
>> -jobmanager:6123/user/rpc/jobmanager_8.
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
>> [] - Starting execution of job Prediction Program 
>> (1276000e99efdb77bdae0df88ab91da3)
>> under job master id .
>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.source.coordinator.
>> SourceCoordinator [] - Starting split enumerator for source Source:
>> TableSourceScan(table=[[default_catalog, default_database, cpu_util,
>> filter=[], project=[instance_id, value, timestamp]]], fields=[instance_id,
>> value, timestamp]) -> Calc(select=[instance_id, value, timestamp], 
>> where=[(timestamp
>> > 1618145278)]) -> SinkConversionToDataPoint -> Map.
>> org.apache.flink.util.FlinkException: Failed to execute job 'Prediction
>> Program'. at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>> at
>> 

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I replaced the FIRST_VALUE with MAX to ensure that the results should be 
identical even in their content, and my problem still remains – I end up with a 
nondeterministic count of records being emitted into the sink when the 
parallelism is over 1, and that count is about 20-25% short (and not 
consistent) of what comes out consistently when parallelism is set to 1.

Dylan

From: Dylan Forciea 
Date: Wednesday, April 14, 2021 at 9:08 AM
To: Piotr Nowojski 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
  FROM table1 

Re: JSON source for pyflink stream

2021-04-14 Thread Dian Fu
Hi Giacomo,

All the connectors supported in the Table & SQL connectors could be used in
PyFlink Table API and so you could use file system connector in PyFlink
Table API. AFAIK, it supports new line delimited json in the
filesystem connector in Flink 1.12. You could refer to [1] for more details.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-formats

Regards,
Dian

On Wed, Apr 14, 2021 at 3:09 PM Klemens Muthmann 
wrote:

> Hi,
>
> We are loading our JSON from a Mongo Database. But we also found no
> readily available way to stream JSON Data into a Flink Pipeline. I guess
> this would be hard to implement since you have to know details about the
> JSON structure to do this. So I guess your best bet would be to implement
> your own input source, which can stream in your file and create results
> based on the JSON structure. We are not using Pyflink so I can not give any
> details about this, but it should not matter, which language you use.
>
> Just implement a Source reading your input and employ any JSON parser you
> like, creating for example domain objects with the same attributes as your
> JSON structure and forward those into your Flink Pipeline for further
> processing.
>
> Regards
> Klemens
>
> Am 14.04.2021 um 04:40 schrieb Yik San Chan :
>
> Hi Giacomo,
>
> I think you can try using Flink SQL connector. For JSON input such as
> {"a": 1, "b": {"c": 2, {"d": 3}}}, you can do:
>
> CREATE TABLE data (
>   a INT,
>   b ROW>
> ) WITH (...)
>
> Let me know if that helps.
>
> Best,
> Yik San
>
> On Wed, Apr 14, 2021 at 2:00 AM  wrote:
>
>> Hi,
>> I'm new to Flink and I am trying to create a stream from locally
>> downloaded tweets. The tweets are in json format, like in this example:
>>
>> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC
>> 
>> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
>> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>> for Android","lang":"in"},
>> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>> Pareda","created_at":"2021-03-05T14:07:56.000Z",
>>
>> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
>>
>> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>>
>> I would like to do it in Python using Pyflink, but could also use Java if
>> there is no reasonable way to do it in Python. I've been looking at
>> different options for loading these objects into a stream, but am not sure
>> what to do. Here's my situation so far:
>>
>> 1. There doesn't seem to be a fitting connector. The filesystem-connector
>> doesn't seem to support json format.
>> 2. I've seen in the archive of this mailing list that some reccomend to
>> use the Table API. But I am not sure if this is a viable option given how
>> nested the json objects are.
>> 3. I could of course try to implement a custom DataSource, but that seems
>> to be quite difficult so I'd only consider this if there's no other way.
>>
>> I'll be very grateful for any kind of input.
>> Cheers,
>> Giacomo
>>
>>
>
>


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Timo Walther

Hi Dylan,

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

is currently not supported by the Table & SQL API. For now,

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()


determines the mode. Thus, I would remove the line again.

If you want to use `inBatchMode()`, you can use the unified 
TableEnvironment that is not connected to the StreamExecutionEnvironment:


TableEnvironment.create(settings);

So Pitorek's answer hopefully makes more sense now.

Regards,
Timo


On 14.04.21 16:08, Dylan Forciea wrote:

Pitorek,

I was actually originally using a group function that WAS deterministic 
(but was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not 
part of the key, that shouldn’t affect the number of records coming out 
I wouldn’t think.


Dylan

*From: *Piotr Nowojski 
*Date: *Wednesday, April 14, 2021 at 9:06 AM
*To: *Dylan Forciea 
*Cc: *"user@flink.apache.org" 
*Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of 
`FIRST_VALUE` used inside `GROUP BY`. If you have many different 
parallel sources, each time you run your query your first value might be 
different. If that's the case, you could try to confirm it with even 
smaller query:


        SELECT
           id2,
           FIRST_VALUE(attr) AS attr
         FROM table2
         GROUP BY id2

Best,

Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea > napisał(a):


I am running Flink 1.12.2, and I was trying to up the parallelism of
my Flink SQL job to see what happened. However, once I did that, my
results became nondeterministic. This happens whether I set the
table.exec.resource.default-parallelism config option or I set the
default local parallelism to something higher than 1. I would end up
with less records in the end, and each time I ran the output record
count would come out differently.

I managed to distill an example, as pasted below (with attribute
names changed to protect company proprietary info), that causes the
issue. I feel like I managed to get it to happen with a LEFT JOIN
rather than a FULL JOIN, but the distilled version wasn’t giving me
wrong results with that. Maybe it has to do with joining to a table
that was formed using a GROUP BY? Can somebody tell if I’m doing
something that is known not to work, or if I have run across a bug?

Regards,

Dylan Forciea

objectJob{

defmain(args: Array[String]): Unit= {

StreamExecutionEnvironment.setDefaultLocalParallelism(1)

valsettings=

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment

valstreamTableEnv= StreamTableEnvironment.create(streamEnv, settings)

valconfiguration= streamTableEnv.getConfig().getConfiguration()


configuration.setInteger("table.exec.resource.default-parallelism", 16)


     streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

     streamTableEnv.executeSql(

"""

   CREATE TABLE table1 (

     id1 STRING PRIMARY KEY NOT ENFORCED,

     attr STRING

   ) WITH (

     'connector' = 'jdbc',

     'url' = 'jdbc:postgresql://…',

     'table-name' = 'table1’,

     'username' = 'username',

     'password' = 'password',

     'scan.fetch-size' = '500',

     'scan.auto-commit' = 'false'

   )""")

     streamTableEnv.executeSql(

"""

   CREATE TABLE table2 (

     attr STRING,

     id2 STRING

   ) WITH (

     'connector' = 'jdbc',

     'url' = 'jdbc:postgresql://…',

     'table-name' = 'table2',

     'username' = 'username',

     'password' = 'password',

     'scan.fetch-size' = '500',

     'scan.auto-commit' = 'false'

   )""")

     streamTableEnv.executeSql(

"""

   CREATE TABLE table3 (

     attr STRING PRIMARY KEY NOT ENFORCED,

     attr_mapped STRING

   ) WITH (

     'connector' = 'jdbc',

     'url' = 'jdbc:postgresql://…',

     'table-name' = ‘table3',

     'username' = ‘username',

     'password' = 'password',

     'scan.fetch-size' = '500',

     'scan.auto-commit' = 'false'

   )""")

     streamTableEnv.executeSql("""

   CREATE TABLE sink (

     id STRING PRIMARY KEY NOT ENFORCED,

     attr STRING,

     attr_mapped STRING

   ) WITH (

     'connector' = 'jdbc',

     'url' = 'jdbc:postgresql://…,

     'table-name' = 'sink',

     

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Pitorek,

I was actually originally using a group function that WAS deterministic (but 
was a custom UDF I made), but chose something here built in. By 
non-deterministic, I mean that the number of records coming out is not 
consistent. Since the FIRST_VALUE here is on an attribute that is not part of 
the key, that shouldn’t affect the number of records coming out I wouldn’t 
think.

Dylan

From: Piotr Nowojski 
Date: Wednesday, April 14, 2021 at 9:06 AM
To: Dylan Forciea 
Cc: "user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` 
used inside `GROUP BY`. If you have many different parallel sources, each time 
you run your query your first value might be different. If that's the case, you 
could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea 
mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
  FROM table1 t1
  FULL JOIN (
SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
  ) t2
   ON (t1.id1 = t2.id2)
  LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO sink SELECT * FROM view
""")

statementSet.execute().await()
  }
}




Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Piotr Nowojski
Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE`
used inside `GROUP BY`. If you have many different parallel sources, each
time you run your query your first value might be different. If that's the
case, you could try to confirm it with even smaller query:

   SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea  napisał(a):

> I am running Flink 1.12.2, and I was trying to up the parallelism of my
> Flink SQL job to see what happened. However, once I did that, my results
> became nondeterministic. This happens whether I set the
> table.exec.resource.default-parallelism config option or I set the default
> local parallelism to something higher than 1. I would end up with less
> records in the end, and each time I ran the output record count would come
> out differently.
>
>
>
> I managed to distill an example, as pasted below (with attribute names
> changed to protect company proprietary info), that causes the issue. I feel
> like I managed to get it to happen with a LEFT JOIN rather than a FULL
> JOIN, but the distilled version wasn’t giving me wrong results with that.
> Maybe it has to do with joining to a table that was formed using a GROUP
> BY? Can somebody tell if I’m doing something that is known not to work, or
> if I have run across a bug?
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
>
>
> object Job {
>
>   def main(args: Array[String]): Unit = {
>
> StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
>
>
> val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
> val configuration = streamTableEnv.getConfig().getConfiguration()
>
> configuration.setInteger("table.exec.resource.default-parallelism", 16
> )
>
>
>
> streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table1 (
>
> id1 STRING PRIMARY KEY NOT ENFORCED,
>
> attr STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table1’,
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table2 (
>
> attr STRING,
>
> id2 STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table2',
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> streamTableEnv.executeSql(
>
>   """
>
>   CREATE TABLE table3 (
>
> attr STRING PRIMARY KEY NOT ENFORCED,
>
> attr_mapped STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = ‘table3',
>
> 'username' = ‘username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> streamTableEnv.executeSql("""
>
>   CREATE TABLE sink (
>
> id STRING PRIMARY KEY NOT ENFORCED,
>
> attr STRING,
>
> attr_mapped STRING
>
>   ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…,
>
> 'table-name' = 'sink',
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
>   )""")
>
>
>
> val view =
>
>   streamTableEnv.sqlQuery("""
>
>   SELECT
>
> COALESCE(t1.id1, t2.id2) AS id,
>
> COALESCE(t2.attr, t1.attr) AS operator,
>
> COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
>
>   FROM table1 t1
>
>   FULL JOIN (
>
> SELECT
>
>   id2,
>
>   FIRST_VALUE(attr) AS attr
>
> FROM table2
>
> GROUP BY id2
>
>   ) t2
>
>ON (t1.id1 = t2.id2)
>
>   LEFT JOIN table3 t3
>
> ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>
> streamTableEnv.createTemporaryView("view", view)
>
>
>
> val statementSet = streamTableEnv.createStatementSet()
>
> statementSet.addInsertSql("""
>
>   INSERT INTO sink SELECT * FROM view
>
> """)
>
>
>
> statementSet.execute().await()
>
>   }
>
> }
>
>
>
>
>


Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Piotr Nowojski
Hey,

could you provide full logs from both task managers and job managers?

Piotrek

śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a):

> After submit job, I received 'Failed to execute job' error. And the time
> between initialization and scheduling last 214s. What has happened during
> this period?
>
> version: flink: 1.12.2
> deployment: k8s standalone
> logs:
>
> 2021-04-14 12:47:58,547 WARN org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaProducer [] - Property [transaction.timeout.ms] not specified.
> Setting it to 360 ms
> 2021-04-14 12:48:04,175 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 1276000e99efdb77bdae0df88ab91da3 is submitted.
> 2021-04-14 12:48:04,175 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=1276000e99efdb77bdae0df88ab91da3.
> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
> StandaloneDispatcher [] - Received JobGraph submission 
> 1276000e99efdb77bdae0df88ab91da3
> (Prediction Program).
> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
> StandaloneDispatcher [] - Submitting job 1276000e99efdb77bdae0df88ab91da3
> (Prediction Program).
> 2021-04-14 12:48:04,250 INFO org.apache.flink.runtime.rpc.akka.
> AkkaRpcService [] - Starting RPC endpoint for
> org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/rpc/jobmanager_8 .
> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Initializing job Prediction Program (1276000e99
> efdb77bdae0df88ab91da3).
> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for
> Prediction Program (1276000e99efdb77bdae0df88ab91da3).
> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Running initialization on master for job Prediction Program (
> 1276000e99efdb77bdae0df88ab91da3).
> 2021-04-14 12:48:04,252 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Successfully ran initialization on master in 0 ms.
> 2021-04-14 12:48:04,254 INFO org.apache.flink.runtime.scheduler.adapter.
> DefaultExecutionTopology [] - Built 10 pipelined regions in 0 ms
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using application-defined state backend:
> org.apache.flink.streaming.api.operators.sorted.state.
> BatchExecutionStateBackend@3ea8cd5a
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator [] - No checkpoint found during restore.
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.flip1.
> RestartPipelinedRegionFailoverStrategy@26845997 for Prediction Program (
> 1276000e99efdb77bdae0df88ab91da3).
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
> JobManagerRunnerImpl [] - JobManager runner for job Prediction Program (
> 1276000e99efdb77bdae0df88ab91da3) was granted leadership with session id
> ---- at akka.tcp://flink@flink-jobmanager:
> 6123/user/rpc/jobmanager_8.
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Starting execution of job Prediction Program 
> (1276000e99efdb77bdae0df88ab91da3)
> under job master id .
> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.source.coordinator.
> SourceCoordinator [] - Starting split enumerator for source Source:
> TableSourceScan(table=[[default_catalog, default_database, cpu_util,
> filter=[], project=[instance_id, value, timestamp]]], fields=[instance_id,
> value, timestamp]) -> Calc(select=[instance_id, value, timestamp], 
> where=[(timestamp
> > 1618145278)]) -> SinkConversionToDataPoint -> Map.
> org.apache.flink.util.FlinkException: Failed to execute job 'Prediction
> Program'. at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at com.jd.app.StreamingJob.main(StreamingJob.java:265) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> at
> 

Re: Call run() of another SourceFunction inside run()?

2021-04-14 Thread Piotr Nowojski
Hi,

I think it should be working. At least from the top of my head I do not see
any reason why it shouldn't be working.

Just make sure that you are proxying all relevant methods, not only those
defined in `SourceFunction`. For example `FlinkKafkaConsumer` is
implementing/extending: `RichParallelSourceFunction`,
`CheckpointListener`, `CheckpointedFunction` and  `ResultTypeQueryable`,
so if you want to wrap `FlinkKafkaConsumer`, you would need to proxy all of
those interfaces/calls from your `WrappingSourceFunction` to the
`innerSourceFunction`.

Best,
Piotrek

śr., 14 kwi 2021 o 11:36 Schneider, Jochen 
napisał(a):

> Hi!
>
> To work around FLINK-2491
>  which causes
> checkpointing issues for us I am trying to chain SourceFunctions so that
> the first one never quits. The basic idea is as follows:
>
> class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
> extends SourceFunction[Outer] {
>
>   override def run(outerCtx: SourceContext[Outer]): Unit = {
>
> outerCtx.collect(...)
>
>
>
>
>
> val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
>
> innerSourceFunction.run(innerCtx)
>
>   }
>
>
>
>   override def cancel() = innerSourceFunction.cancel()
>
> }
>
> Is it ok to call run() of a different SourceFunction inside of run() and
> implement my own SourceContext delegating to another one? It works for a
> small test running on a local Flink environment, but I am wondering if
> there could be any issues doing that on production.
>
> Thanks,
>
> Jochen
>


Re: Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread Piotr Nowojski
Hi,

Depending how you configured your FlinkKafkaSource, but you can make the
source to commit consumed offsets back to Kafka. So one way to examine
them, would be to check those offsets in Kafka (I don't know how, but I'm
pretty sure there is a way to do it).

Secondly, if you want to examine Flink's checkpoint state you can use State
Processor API to do that [1]. As far as I know you could hook up your
checkpointed data to Table API/SQL and use SQL to query/analyse the state.

Best
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

śr., 14 kwi 2021 o 11:25 bat man  napisał(a):

> Hi All,
>
> Is there any way I can inspect/query the checkpointed data. Scenario is
> like this -
>
> We have a high volume of data coming in the data stream pipeline for which
> kafka is source, in case if fails bcoz of bad data I want to analyse the
> data which caused the issue. It could be that some data source starts
> sending bad data so I want to go in kafka to that particular offset and do
> some analysis before I start the job with checkpointed data.
>
> Can anyone suggest how this can be achieved.
>
> Thanks,
> Hemant
>
>
>


PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
.

I have a ML model that takes two numpy.ndarray - `users` and `items` - and
returns an numpy.ndarray `predictions`. In normal Python code, I would do:

```python
model = load_model()

df = load_data() # the DataFrame includes 4 columns, namely, user_id,
movie_id, rating, and timestamp

users = df.user_id.values
items = df.movie_id.values

predictions = model(users, items)
```

I am looking into porting this code into Flink to leverage its distributed
nature. My assumption is: by distributing the prediction workload on
multiple Flink nodes, I should be able to run the whole prediction faster.

So I compose a PyFlink job. Note I implement an UDF called `predict` to run
the prediction.

```python
# batch_prediction.py

model = load_model()

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=settings)

SOURCE_DDL = """
CREATE TABLE source (
user_id INT,
movie_id INT,
rating TINYINT,
event_ms BIGINT
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'csv.field-delimiter' = '\t',
'path' = 'ml-100k/u1.test'
)
"""

SINK_DDL = """
CREATE TABLE sink (
prediction DOUBLE
) WITH (
'connector' = 'print'
)
"""

t_env.execute_sql(SOURCE_DDL)
t_env.execute_sql(SINK_DDL)
t_env.execute_sql(
"INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
).wait()
```

Here is the UDF.

```python
# batch_prediction.py (cont)

@udf(result_type=DataTypes.DOUBLE())
def predict(user, item):
return model([user], [item]).item()

t_env.create_temporary_function("predict", predict)
```

The job runs fine. However, the prediction actually runs on each and every
row of the `source` table, which is not performant. Instead, I want to
split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches,
with each batch having 800 rows. The job triggers the `model(users, items)`
function 100 times (= # of batch), where both `users` and `items` have 800
elements.

I couldn't find a way to do this. By looking at the [docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
vectorized user-defined functions may work.

```python
# batch_prediction.py (snippet)

# I add the func_type="pandas"
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(user, item):
...
```

Unfortunately, it doesn't.

```
> python batch_prediction.py
...
Traceback (most recent call last):
  File "batch_prediction.py", line 55, in 
"INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py",
line 76, in wait
get_method(self._j_table_result, "await")()
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File
"/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.await.
: java.util.concurrent.ExecutionException:
org.apache.flink.table.api.TableException: Failed to wait job finish
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
at
org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job
finish
at

flink1.12.2 "Failed to execute job"

2021-04-14 Thread ??????
After submit job, I received 'Failed to execute job' error. And the time 
betweeninitialization and scheduling last 214s. What has happened during 
this period?



version: flink: 1.12.2
deployment: k8s standalone
logs:


2021-04-14 12:47:58,547 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Property 
[transaction.timeout.ms] not specified. Setting it to 360 ms
2021-04-14 12:48:04,175 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Job 1276000e99efdb77bdae0df88ab91da3 is submitted.
2021-04-14 12:48:04,175 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Submitting Job with JobId=1276000e99efdb77bdae0df88ab91da3.
2021-04-14 12:48:04,249 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
JobGraph submission 1276000e99efdb77bdae0df88ab91da3 (Prediction Program).
2021-04-14 12:48:04,249 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting 
job 1276000e99efdb77bdae0df88ab91da3 (Prediction Program).
2021-04-14 12:48:04,250 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_8 .
2021-04-14 12:48:04,251 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Initializing job Prediction Program 
(1276000e99efdb77bdae0df88ab91da3).
2021-04-14 12:48:04,251 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using restart back off time strategy 
NoRestartBackoffTimeStrategy for Prediction Program 
(1276000e99efdb77bdae0df88ab91da3).
2021-04-14 12:48:04,251 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Running initialization on master for job Prediction Program 
(1276000e99efdb77bdae0df88ab91da3).
2021-04-14 12:48:04,252 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Successfully ran initialization on master in 0 ms.
2021-04-14 12:48:04,254 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
10 pipelined regions in 0 ms
2021-04-14 12:48:04,255 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using application-defined state backend: 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend@3ea8cd5a
2021-04-14 12:48:04,255 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No checkpoint 
found during restore.
2021-04-14 12:48:04,255 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@26845997
 for Prediction Program (1276000e99efdb77bdae0df88ab91da3).
2021-04-14 12:48:04,255 INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  [] - JobManager 
runner for job Prediction Program (1276000e99efdb77bdae0df88ab91da3) was 
granted leadership with session id ---- at 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_8.
2021-04-14 12:48:04,255 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Starting execution of job Prediction Program 
(1276000e99efdb77bdae0df88ab91da3) under job master id 
.
2021-04-14 12:48:04,255 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting 
split enumerator for source Source: TableSourceScan(table=[[default_catalog, 
default_database, cpu_util, filter=[], project=[instance_id, value, 
timestamp]]], fields=[instance_id, value, timestamp]) - 
Calc(select=[instance_id, value, timestamp], where=[(timestamp  
1618145278)]) - SinkConversionToDataPoint - Map.
org.apache.flink.util.FlinkException: Failed to execute job 'Prediction 
Program'.   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
   at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
  at com.jd.app.StreamingJob.main(StreamingJob.java:265)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
  at 

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Piotr Nowojski
Hi,

One thing that you can do is to read this record using Avro keeping
`Result` as `bytes` and in a subsequent mapping function, you could change
the record type and deserialize the result. In Data Stream API:

source.map(new MapFunction { ...} )

Best,
Piotrek

śr., 14 kwi 2021 o 03:17 Sumeet Malhotra 
napisał(a):

> Hi,
>
> I'm reading data from Kafka, which is Avro encoded and has the following
> general schema:
>
> {
>   "name": "SomeName",
>   "doc": "Avro schema with variable embedded encodings",
>   "type": "record",
>   "fields": [
> {
>   "name": "Name",
>   "doc": "My name",
>   "type": "string"
> },
> {
>   "name": "ID",
>   "doc": "My ID",
>   "type": "string"
> },
> {
>   "name": "Result",
>   "doc": "Result data, could be encoded differently",
>   "type": "bytes"
> },
> {
>   "name": "ResultEncoding",
>   "doc": "Result encoding media type (e.g. application/avro,
> application/json)",
>   "type": "string"
> },
>   ]
> }
>
> Basically, the "Result" field is bytes whose interpretation depends upon
> the "ResultEncoding" field i.e. either avro or json. The "Result" byte
> stream has its own well defined schema also.
>
> My use case involves extracting/aggregating data from within the embedded
> "Result" field. What would be the best approach to perform this runtime
> decoding and extraction of fields from the embedded byte data? Would user
> defined functions help in this case?
>
> Thanks in advance!
> Sumeet
>
>


yarn-application模式下pipeline.classpaths绑定的资源包,应该一起上传

2021-04-14 Thread todd
pipeline.classpaths绑定的参数,需要从container本地加载,但是客户端提交的时候并未上传。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


yarn-application模式下pipeline.classpaths绑定的资源包,应该一起上传

2021-04-14 Thread todd





--
Sent from: http://apache-flink.147419.n8.nabble.com/


????1.13??Changelog????????

2021-04-14 Thread ????buaa
flink1.13??changelog

Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink 
SQL job to see what happened. However, once I did that, my results became 
nondeterministic. This happens whether I set the 
table.exec.resource.default-parallelism config option or I set the default 
local parallelism to something higher than 1. I would end up with less records 
in the end, and each time I ran the output record count would come out 
differently.

I managed to distill an example, as pasted below (with attribute names changed 
to protect company proprietary info), that causes the issue. I feel like I 
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the 
distilled version wasn’t giving me wrong results with that. Maybe it has to do 
with joining to a table that was formed using a GROUP BY? Can somebody tell if 
I’m doing something that is known not to work, or if I have run across a bug?

Regards,
Dylan Forciea


object Job {
  def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)

val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

streamTableEnv.executeSql(
  """
  CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table2 (
attr STRING,
id2 STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql(
  """
  CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

streamTableEnv.executeSql("""
  CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
  ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
  )""")

val view =
  streamTableEnv.sqlQuery("""
  SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
  FROM table1 t1
  FULL JOIN (
SELECT
  id2,
  FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
  ) t2
   ON (t1.id1 = t2.id2)
  LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)

val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
  INSERT INTO sink SELECT * FROM view
""")

statementSet.execute().await()
  }
}




提交flink-sql 出现无法部署到yarn集群

2021-04-14 Thread 张锴
在用flink-sql的方式连接hive时 ,出现以下错误:
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not deploy Yarn job cluster.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
at
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
at com.erwan.flinksql.FlinkConnectHive.main(FlinkConnectHive.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 11 more
Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1618298202025_0017 failed 1
times (global limit =2; local limit is =1) due to AM Container for
appattempt_1618298202025_0017_01 exited with  exitCode: 2
Failing this attempt.Diagnostics: [2021-04-14 19:04:02.506]Exception from
container-launch.
Container id: container_e13_1618298202025_0017_01_01
Exit code: 2。

由于错误原因不明显,不好排查,也不确定是到底是哪里的问题,请问有什么办法能够定位问题。


flink1.12版本,yarn-application模式Flink web ui看不到日志

2021-04-14 Thread todd
 yarn上只有.out,.error的日志信息,但是从flink web ui的log框,无法显示日志内容。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Call run() of another SourceFunction inside run()?

2021-04-14 Thread Schneider, Jochen
Hi!

To work around FLINK-2491 
which causes checkpointing issues for us I am trying to chain SourceFunctions 
so that the first one never quits. The basic idea is as follows:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
extends SourceFunction[Outer] {

  override def run(outerCtx: SourceContext[Outer]): Unit = {

outerCtx.collect(...)





val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)

innerSourceFunction.run(innerCtx)

  }



  override def cancel() = innerSourceFunction.cancel()

}

Is it ok to call run() of a different SourceFunction inside of run() and 
implement my own SourceContext delegating to another one? It works for a small 
test running on a local Flink environment, but I am wondering if there could be 
any issues doing that on production.

Thanks,

Jochen


Re: Flink docker 1.11.3 actually runs 1.11.2

2021-04-14 Thread Chesnay Schepler
Works properly for me. I think your suspicion about the .env is correct; 
it is probably not considered when checking whether something has 
changed, so docker juse re-uses the previous image.


On 4/13/2021 9:51 PM, Flavio Pompermaier wrote:

Hi Chesnay,
my tests were done using docker-compose (with the command 
'docker-compose up --build -d flink-jobmanager flink-taskmanager').
These are the necessary files (./flink/db-libs/* contains the jdbc 
libraries I use while /opt/flink/data is used as a volume to share 
files with other dockers):
PS: before I've used FLINK_VERSION=1.11-scala_2.12-java11 in the .env 
file...so if you didn't encounter the problem it's probably caused by 
some docker-compose internal that is not overriding the old 
image/container (I'm indeed relatively new to docker and docker-compose..)

---
.env
---
FLINK_VERSION=1.11.3-scala_2.12-java11


docker-compose.yml


version: '3'
services:
  flink-jobmanager:
    container_name: flink-jobmanager
    build:
      context: .
      dockerfile: Dockerfile.flink
      args:
        - FLINK_VERSION=${FLINK_VERSION}
    image: 'flink-test:${FLINK_VERSION}'
    ports:
      - "8091:8081"
      - "8092:8082"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        rest.port: 8081
        historyserver.web.port: 8082
        web.upload.dir: /opt/flink
    volumes:
      - '/opt/flink/data:/tmp/flink'
    networks:
      - test-network
  flink-taskmanager:
    container_name: flink-taskmanager
    build:
      context: .
      dockerfile: Dockerfile.flink
      args:
        - FLINK_VERSION=${FLINK_VERSION}
    image: 'flink-test:${FLINK_VERSION}'
    depends_on:
      - flink-jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.memory.process.size: 1728m
        taskmanager.numberOfTaskSlots: 2
    volumes:
      - '/opt/flink/data:/tmp/flink'
    networks:
      - test-network

networks:
  test-network:
    driver: bridge

---
Dockerfile.flink
---

ARG FLINK_VERSION
FROM flink:$FLINK_VERSION

USER root
RUN set -ex; apt-get update; apt-get -y install openssh-client ssh # 
python

USER flink
WORKDIR /opt/flink

COPY flink/db-libs/*   lib/

Thanks for the support,
Flavio

On Tue, Apr 13, 2021 at 7:43 PM Chesnay Schepler > wrote:


Please provide steps to reproduce the issue.

I can't see anything wrong in the dockerfiles (they reference the
correct release url), and the referenced release correctly identifies
itself as 1.11.3 .
I also started a container with the image, started a jobmanager,
and the
logs show 1.11.3 like they are supposed to do.

On 4/13/2021 6:31 PM, Flavio Pompermaier wrote:
> Hi to all,
> I've just build a docker that use the image
> flink:1.11.3-scala_2.12-java11 but the web UI (and logs too)
display
> Flink 1.11.2 (Commit: fe36135). Was there an error with the release?
>
> Best,
> Flavio




--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809





Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Hi All,

Is there any way I can inspect/query the checkpointed data. Scenario is
like this -

We have a high volume of data coming in the data stream pipeline for which
kafka is source, in case if fails bcoz of bad data I want to analyse the
data which caused the issue. It could be that some data source starts
sending bad data so I want to go in kafka to that particular offset and do
some analysis before I start the job with checkpointed data.

Can anyone suggest how this can be achieved.

Thanks,
Hemant


pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable

2021-04-14 Thread magichuang
flink版本:1.11.2   Python版本 3.6 apache-flink==1.11.2,  用的是flink on 
yarn,per-job模式

程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中




主要代码

t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size',
 '128m')

t_env.get_config().get_configuration().set_string("pipeline.jars", 
"file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")

t_env.get_config().get_configuration().set_string("pipeline.classpaths", 
"file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ")




t_env.add_python_archive("venv.zip")

t_env.get_config().set_python_executable("venv.zip/venv/bin/python")




@udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.INT())

def judge_ip(src_ip, dst_ip):

import IPy

.

t_env.register_function('judge_ip', judge_ip)



下面是主要报错信息

Traceback (most recent call last):

File "traffic-tuple-sf.py", line 59, in 

t_env.register_function('judge_ip', judge_ip)

File 
"/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", 
line 876, in register_function

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1286, in __call__

File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 147, in deco

File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", 
line 328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
o5.registerFunction.

: org.apache.flink.table.api.ValidationException: Function class 'class 
org.apache.flink.table.functions.python.PythonScalarFunction' is not 
serializable. Make sure that the class is self-contained (i.e. no references to 
outer classes) and all inner fields are serializable as well.

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349)

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204)

at 
org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383)

at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)

at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)

at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:567)

at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)

at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)

at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.base/java.lang.Thread.run(Thread.java:831)

Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field 
private final byte[] java.lang.String.value accessible: module java.base does 
not "opens java.lang" to unnamed module @1311d9fb

at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)

at 
java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)

at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)

at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)

at 
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346)

... 14 more




麻烦各位大佬给看看是哪里有问题呀,应该如何修改~   感谢









Re:使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-14 Thread anonnius
+1, 目前也遇到了
在 2021-01-21 17:52:06,"刘海"  写道:
>HI!
>这边做测试时遇到一个问题:
>在流应用中使用了一个mysql  jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表:
>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid 
>DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" 
>+
>") WITH (" +
>"'connector' = 'jdbc'," +
>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," +
>"'table-name' = 'tm_dealers'," +
>"'driver' = 'com.mysql.cj.jdbc.Driver'," +
>"'username' = 'root'," +
>"'password' = 'Cdh2020:1'," +
>"'lookup.cache.max-rows' = '500',"+
>"'lookup.cache.ttl' = '1800s',"+
>"'sink.buffer-flush.interval' = '60s'"+
>")");
>
>
>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误:
>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED 
>instead. Aborting checkpoint.
>
>
>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?
>
>
>感谢大佬指导一下,拜谢!
>| |
>刘海
>|
>|
>liuha...@163.com
>|
>签名由网易邮箱大师定制


Re: Flink 1.11.4?

2021-04-14 Thread Roman Khachatryan
Hi Yuval,

I'd expect 1.13 to be available in 2-3 weeks (there are no exact estimates).

Regards,
Roman

On Tue, Apr 13, 2021 at 12:08 PM Yuval Itzchakov  wrote:
>
> Roman, is there an ETA on 1.13?
>
> On Mon, Apr 12, 2021, 16:17 Roman Khachatryan  wrote:
>>
>> Hi Maciek,
>>
>> There are no specific plans for 1.11.4 yet as far as I know.
>> The official policy is to support the current and previous minor
>> release [1]. So 1.12 and 1.13 will be officially supported once 1.13
>> is released.
>> However, it's likely that 1.11.4 will still be released.
>>
>> [1]
>> https://flink.apache.org/downloads.html#update-policy-for-old-releases
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Apr 12, 2021 at 10:35 AM Maciek Próchniak  wrote:
>> >
>> > Hello,
>> >
>> > I'd like to ask if there are any plans to release 1.11.4 - I understand
>> > it will be last bugfix release for 1.11.x branch, as 1.13.0 is "just
>> > round the corner"?
>> >
>> > There are a few fixes we'd like to use - e.g.
>> > https://issues.apache.org/jira/browse/FLINK-9844,
>> > https://issues.apache.org/jira/browse/FLINK-21164
>> >
>> >
>> > thanks,
>> >
>> > maciek
>> >


Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Arvid Heise
For any schema change to be gracefully supported. You need to know both
schemas (old + new) on reader side (=Flink). I'm curious how Flink should
know the old schema as you only provide the new schema, right?

Usually, you use the schema registry, such that each record has it's own
schema attached to it (=old). You opted to not go for it, so I'm not sure
how Flink is supposed to know the old schema.

On Wed, Apr 14, 2021 at 9:09 AM Agnelo Dcosta 
wrote:

> Hi Arvid,
> > writer schema encoded if you are using no schema registry?
> on the producer side we are using node with
> https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
> and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to
> publish messages. We specify the avro schema file to encode messages in
> avro format.
>
> >Flink know with which schema the data has been written so that it can
> map it to the new schema?
> With 1.11 we used to specify the schema file as part of the flink sql
> table definition.
> However with 1.12 the schema is derived from the message/table definition.
> We do not specify any schema as such.
>
>
> On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise  wrote:
>
>> Hi Agnelo,
>>
>> How is the writer schema encoded if you are using no schema registry? Or
>> phrased differently: how does Flink know with which schema the data has
>> been written so that it can map it to the new schema?
>>
>> On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta 
>> wrote:
>>
>>> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
>>> kafka topics. Format is avro (not confluent avro) and no schema registry as
>>> such.
>>>
>>> In flink 1.11 we used to specify the schema, however in 1.12 the schema
>>> is derived from the message itself.
>>>
>>> Is it possible for the producers to start sending new fields without
>>> changes in the flink app?
>>>
>>>
>>>
>>> For example :
>>>
>>> {
>>>
>>>   "name": "topic1",
>>>
>>>   "type": "record",
>>>
>>>   "fields": [
>>>
>>>   {
>>>
>>>   "name": "field1",
>>>
>>>   "type": "string"
>>>
>>> },
>>>
>>> {
>>>
>>>   "name": "field2",
>>>
>>>   "type": "string"
>>>
>>> },
>>>
>>> {
>>>
>>>   *"name": "field3",*
>>>
>>> *  "type": "string"*
>>>
>>> },
>>>
>>> ]
>>>
>>> }
>>>
>>>
>>>
>>> Flink table has:
>>>
>>> CREATE TABLE topic1(\n"
>>>
>>> + " field1 string not null \n"
>>>
>>> + " ,field2 string not null \n"
>>>
>>> "'connector' = 'kafka' \n"
>>>
>>>  + ",'topic' = 'topic1' \n"
>>>
>>>  + ",'scan.startup.mode' = 'latest-offset' \n"
>>>
>>>  + ",'properties.group.id' = 'topic1' \n"
>>>
>>>  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>>>
>>>   + ",'properties.enable.auto.commit' = 'true' \n"
>>>
>>>  + ",'format' = 'avro' \n";
>>>
>>>
>>>
>>> With above settings I get a deserialization error:
>>>
>>>
>>>
>>> *java.io.IOException: Failed to deserialize Avro record.*
>>>
>>> *at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
>>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>> ~[flink-core-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>>
>>> *at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>>
>>


Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-04-14 Thread Matthias Pohl
Thanks for everyone's feedback. I'm gonna initiate a vote in a separate
thread.

On Mon, Mar 29, 2021 at 9:18 AM Robert Metzger  wrote:

> +1
>
>
>
> On Mon, Mar 29, 2021 at 5:44 AM Yangze Guo  wrote:
>
> > +1
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Mar 29, 2021 at 11:31 AM Xintong Song 
> > wrote:
> > >
> > > +1
> > > It's already a matter of fact for a while that we no longer port new
> > features to the Mesos deployment.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Fri, Mar 26, 2021 at 10:37 PM Till Rohrmann 
> > wrote:
> > >>
> > >> +1 for officially deprecating this component for the 1.13 release.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf 
> > wrote:
> > >>>
> > >>> Hi Matthias,
> > >>>
> > >>> Thank you for following up on this. +1 to officially deprecate Mesos
> > in the code and documentation, too. It will be confusing for users if
> this
> > diverges from the roadmap.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Konstantin
> > >>>
> > >>> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl <
> matth...@ververica.com>
> > wrote:
> > 
> >  Hi everyone,
> >  considering the upcoming release of Flink 1.13, I wanted to revive
> the
> >  discussion about the Mesos support ones more. Mesos is also already
> > listed
> >  as deprecated in Flink's overall roadmap [1]. Maybe, it's time to
> > align the
> >  documentation accordingly to make it more explicit?
> > 
> >  What do you think?
> > 
> >  Best,
> >  Matthias
> > 
> >  [1] https://flink.apache.org/roadmap.html#feature-radar
> > 
> >  On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann  >
> > wrote:
> > 
> >  > Hi Oleksandr,
> >  >
> >  > yes you are right. The biggest problem is at the moment the lack
> of
> > test
> >  > coverage and thereby confidence to make changes. We have some e2e
> > tests
> >  > which you can find here [1]. These tests are, however, quite
> coarse
> > grained
> >  > and are missing a lot of cases. One idea would be to add a Mesos
> > e2e test
> >  > based on Flink's end-to-end test framework [2]. I think what needs
> > to be
> >  > done there is to add a Mesos resource and a way to submit jobs to
> a
> > Mesos
> >  > cluster to write e2e tests.
> >  >
> >  > [1] https://github.com/apache/flink/tree/master/flink-jepsen
> >  > [2]
> >  >
> >
> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
> >  >
> >  > Cheers,
> >  > Till
> >  >
> >  > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
> >  > o.nitavs...@criteo.com> wrote:
> >  >
> >  >> Hello Xintong,
> >  >>
> >  >> Thanks for the insights and support.
> >  >>
> >  >> Browsing the Mesos backlog and didn't identify anything critical,
> > which
> >  >> is left there.
> >  >>
> >  >> I see that there are were quite a lot of contributions to the
> > Flink Mesos
> >  >> in the recent version:
> >  >> https://github.com/apache/flink/commits/master/flink-mesos.
> >  >> We plan to validate the current Flink master (or release 1.12
> > branch) our
> >  >> Mesos setup. In case of any issues, we will try to propose
> changes.
> >  >> My feeling is that our test results shouldn't affect the Flink
> 1.12
> >  >> release cycle. And if any potential commits will land into the
> > 1.12.1 it
> >  >> should be totally fine.
> >  >>
> >  >> In the future, we would be glad to help you guys with any
> >  >> maintenance-related questions. One of the highest priorities
> > around this
> >  >> component seems to be the development of the full e2e test.
> >  >>
> >  >> Kind Regards
> >  >> Oleksandr Nitavskyi
> >  >> 
> >  >> From: Xintong Song 
> >  >> Sent: Tuesday, October 27, 2020 7:14 AM
> >  >> To: dev ; user 
> >  >> Cc: Piyush Narang 
> >  >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
> >  >>
> >  >> Hi Piyush,
> >  >>
> >  >> Thanks a lot for sharing the information. It would be a great
> > relief that
> >  >> you are good with Flink on Mesos as is.
> >  >>
> >  >> As for the jira issues, I believe the most essential ones should
> > have
> >  >> already been resolved. You may find some remaining open issues
> > here [1],
> >  >> but not all of them are necessary if we decide to keep Flink on
> > Mesos as is.
> >  >>
> >  >> At the moment and in the short future, I think helps are mostly
> > needed on
> >  >> testing the upcoming release 1.12 with Mesos use cases. The
> > community is
> >  >> currently actively preparing the new release, and hopefully we
> > could come
> >  >> up with a release candidate early next month. It would be greatly
> >  >> appreciated if you fork as experienced Flink on Mesos users can
> > 

关于Flink SQL中Interval Join使用时watermark的疑惑

2021-04-14 Thread xuty
在Flink SQL定义了两张kafka表(A表和B表),类型为debezium-json,然后要进行区间JOIN,SQL类似这样:

select * from A left join B 
on A.id = B.id
and B.dt BETWEEN A.dt and A.dt + INTERVAL '30' SECOND 

第一个问题是:想要在A和B表中显示定义watermark(dt字段即event_time)来解决可能出现的乱序问题,但是报错了,不太明白这个报错,是否是flink
sql中目前还不支持Interval Join中定义watermark?

Interval Join doesn't support consuming update and delete changes

第二个问题是:假如没有在A和B表中显示定义watermark,Job可以成功运行,是否会自动根据区间条件生成一个watermark,用于移除过期的state?

延迟问题:比如我的B流一直延迟不来新数据,A流是不是就一直state大于watermark的数据,是否有类似于datastream中定义statettl一样可以配置。

乱序问题:测试B流来了一条比较旧的数据,但是实际也能连接到A流中理论上已经过期的数据,这个不知道是不是和什么配置有关,还是state没有及时根据watermark清理导致,望解答。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: JSON source for pyflink stream

2021-04-14 Thread Klemens Muthmann
Hi,

We are loading our JSON from a Mongo Database. But we also found no readily 
available way to stream JSON Data into a Flink Pipeline. I guess this would be 
hard to implement since you have to know details about the JSON structure to do 
this. So I guess your best bet would be to implement your own input source, 
which can stream in your file and create results based on the JSON structure. 
We are not using Pyflink so I can not give any details about this, but it 
should not matter, which language you use.

Just implement a Source reading your input and employ any JSON parser you like, 
creating for example domain objects with the same attributes as your JSON 
structure and forward those into your Flink Pipeline for further processing.

Regards
Klemens

> Am 14.04.2021 um 04:40 schrieb Yik San Chan :
> 
> Hi Giacomo,
> 
> I think you can try using Flink SQL connector. For JSON input such as {"a": 
> 1, "b": {"c": 2, {"d": 3}}}, you can do:
> 
> CREATE TABLE data (
>   a INT,
>   b ROW>
> ) WITH (...)
> 
> Let me know if that helps.
> 
> Best,
> Yik San
> 
> On Wed, Apr 14, 2021 at 2:00 AM  > wrote:
> Hi,
> I'm new to Flink and I am trying to create a stream from locally downloaded 
> tweets. The tweets are in json format, like in this example:
>  
> {"data":{"text":"Polsek Kakas Cegah Covid-19 https://t.co/ADjEgpt7bC 
> ","public_metrics":"retweet_count":0,"reply_count":0,"like_count":0,"quote_count":0},
> "author_id":"1367839185764151302","id":"1378275866279469059","created_at":"2021-04-03T09:19:08.000Z","source":"Twitter
>  for Android","lang":"in"},
> "includes":{"users":[{"protected":false,"id":"1367839185764151302","name":"Nathan
>  Pareda","created_at":"2021-03-05T14:07:56.000Z",
> "public_metrics":{"followers_count":0,"following_count":0,"tweet_count":557,"listed_count":0},
> "username":"NathanPareda"}]},"matching_rules":[{"id":1378112825051246596,"tag":"coronavirus"}]}
>  
> I would like to do it in Python using Pyflink, but could also use Java if 
> there is no reasonable way to do it in Python. I've been looking at different 
> options for loading these objects into a stream, but am not sure what to do. 
> Here's my situation so far:
>  
> 1. There doesn't seem to be a fitting connector. The filesystem-connector 
> doesn't seem to support json format.
> 2. I've seen in the archive of this mailing list that some reccomend to use 
> the Table API. But I am not sure if this is a viable option given how nested 
> the json objects are.
> 3. I could of course try to implement a custom DataSource, but that seems to 
> be quite difficult so I'd only consider this if there's no other way.
> 
> I'll be very grateful for any kind of input.
> Cheers,
> Giacomo
>  



Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Agnelo Dcosta
Hi Arvid,
> writer schema encoded if you are using no schema registry?
on the producer side we are using node with
https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to publish
messages. We specify the avro schema file to encode messages in avro format.

>Flink know with which schema the data has been written so that it can map
it to the new schema?
With 1.11 we used to specify the schema file as part of the flink sql table
definition.
However with 1.12 the schema is derived from the message/table definition.
We do not specify any schema as such.


On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise  wrote:

> Hi Agnelo,
>
> How is the writer schema encoded if you are using no schema registry? Or
> phrased differently: how does Flink know with which schema the data has
> been written so that it can map it to the new schema?
>
> On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta 
> wrote:
>
>> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
>> kafka topics. Format is avro (not confluent avro) and no schema registry as
>> such.
>>
>> In flink 1.11 we used to specify the schema, however in 1.12 the schema
>> is derived from the message itself.
>>
>> Is it possible for the producers to start sending new fields without
>> changes in the flink app?
>>
>>
>>
>> For example :
>>
>> {
>>
>>   "name": "topic1",
>>
>>   "type": "record",
>>
>>   "fields": [
>>
>>   {
>>
>>   "name": "field1",
>>
>>   "type": "string"
>>
>> },
>>
>> {
>>
>>   "name": "field2",
>>
>>   "type": "string"
>>
>> },
>>
>> {
>>
>>   *"name": "field3",*
>>
>> *  "type": "string"*
>>
>> },
>>
>> ]
>>
>> }
>>
>>
>>
>> Flink table has:
>>
>> CREATE TABLE topic1(\n"
>>
>> + " field1 string not null \n"
>>
>> + " ,field2 string not null \n"
>>
>> "'connector' = 'kafka' \n"
>>
>>  + ",'topic' = 'topic1' \n"
>>
>>  + ",'scan.startup.mode' = 'latest-offset' \n"
>>
>>  + ",'properties.group.id' = 'topic1' \n"
>>
>>  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>>
>>   + ",'properties.enable.auto.commit' = 'true' \n"
>>
>>  + ",'format' = 'avro' \n";
>>
>>
>>
>> With above settings I get a deserialization error:
>>
>>
>>
>> *java.io.IOException: Failed to deserialize Avro record.*
>>
>> *at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>> ~[flink-core-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>
>> *at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>
>


Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Arvid Heise
Hi Agnelo,

How is the writer schema encoded if you are using no schema registry? Or
phrased differently: how does Flink know with which schema the data has
been written so that it can map it to the new schema?

On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta 
wrote:

> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
> kafka topics. Format is avro (not confluent avro) and no schema registry as
> such.
>
> In flink 1.11 we used to specify the schema, however in 1.12 the schema is
> derived from the message itself.
>
> Is it possible for the producers to start sending new fields without
> changes in the flink app?
>
>
>
> For example :
>
> {
>
>   "name": "topic1",
>
>   "type": "record",
>
>   "fields": [
>
>   {
>
>   "name": "field1",
>
>   "type": "string"
>
> },
>
> {
>
>   "name": "field2",
>
>   "type": "string"
>
> },
>
> {
>
>   *"name": "field3",*
>
> *  "type": "string"*
>
> },
>
> ]
>
> }
>
>
>
> Flink table has:
>
> CREATE TABLE topic1(\n"
>
> + " field1 string not null \n"
>
> + " ,field2 string not null \n"
>
> "'connector' = 'kafka' \n"
>
>  + ",'topic' = 'topic1' \n"
>
>  + ",'scan.startup.mode' = 'latest-offset' \n"
>
>  + ",'properties.group.id' = 'topic1' \n"
>
>  + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>
>   + ",'properties.enable.auto.commit' = 'true' \n"
>
>  + ",'format' = 'avro' \n";
>
>
>
> With above settings I get a deserialization error:
>
>
>
> *java.io.IOException: Failed to deserialize Avro record.*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> ~[flink-core-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>
> *at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>


Re: 关于状态恢复后计算加快的疑问

2021-04-14 Thread 克兰弗
我想我知道了问题所在,应该是kafka生产者的生成速率相对较慢,导致正常处理时flink的处理速度较慢,在故障后恢复时,因为重新消费之前的数据,生产者段的数据相对充足,所以flink的处理速率就变大了

On 2021/04/14 04:14:30, "刘阳buaa" <1844061...@qq.com> wrote: 
> 你好!    
> 我针对WordCount写了一个kafka数据源的有状态的版本,我在模拟算子故障,见附件源码中抛出的InterruptException(算子运行20s后抛出异常),我发现恢复的时间大约为1s,而数据追赶花费的时间竟然只有200ms,而且在某个时间点又会变慢,和故障前的处理速度相似,这是什么机制呢?


In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Agnelo Dcosta
Hi, we are using Flink SQL 1.12 and have a couple of tables created from
kafka topics. Format is avro (not confluent avro) and no schema registry as
such.

In flink 1.11 we used to specify the schema, however in 1.12 the schema is
derived from the message itself.

Is it possible for the producers to start sending new fields without
changes in the flink app?



For example :

{

  "name": "topic1",

  "type": "record",

  "fields": [

  {

  "name": "field1",

  "type": "string"

},

{

  "name": "field2",

  "type": "string"

},

{

  *"name": "field3",*

*  "type": "string"*

},

]

}



Flink table has:

CREATE TABLE topic1(\n"

+ " field1 string not null \n"

+ " ,field2 string not null \n"

"'connector' = 'kafka' \n"

 + ",'topic' = 'topic1' \n"

 + ",'scan.startup.mode' = 'latest-offset' \n"

 + ",'properties.group.id' = 'topic1' \n"

 + ",'properties.bootstrap.servers' = 'localhost:8082' \n"

  + ",'properties.enable.auto.commit' = 'true' \n"

 + ",'format' = 'avro' \n";



With above settings I get a deserialization error:



*java.io.IOException: Failed to deserialize Avro record.*

*at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-core-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*

*at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*