Re: BIGDECIMAL data handling

2022-04-07 Thread Francesco Guardiani
Is there any reason for not using DECIMAL provided by Flink SQL?

On Tue, Apr 5, 2022 at 4:06 PM Anitha Thankappan <
anitha.thankap...@quantiphi.com> wrote:

> Hi Martijn,
>
> I am using flink version 1.11.0.
> The flink application code snippet is like:
>
> [image: image.png]
>
> The Error I am receiving while providing BIGDECIMAL as datatype is :
> [image: image.png]
>
> Can I use unregistered structured custom data types in DDLs like Create
> Table?
>
> Thanks and Regards,
> Anitha Thankappan
>
>
> On Tue, Apr 5, 2022 at 7:21 PM Martijn Visser 
> wrote:
>
>> Hi Anitha,
>>
>> Looking at Bigquery's documentation, they're aliasing it as a BIGDECIMAL
>> [1]. According to Flink's documentation, you can create an unregistered
>> structured type as an user-defined data type [2]. You're mentioning that
>> you've failed to implement this, but what is the issue that you're running
>> into? Bigdecimal is mentioned in Flink's documentation.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>> https://github.com/MartijnVisser
>>
>> [1]
>> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types
>>
>>
>> On Tue, 5 Apr 2022 at 15:41, Anitha Thankappan <
>> anitha.thankap...@quantiphi.com> wrote:
>>
>>> Hi,
>>>
>>> I am doing a developed a Fink connector for GCP BigQuery.
>>> When we are reading BIGNUMERIC data from  BigQuery  table, we
>>> didn't find any matching data types in Flink. Also failed to implement user
>>> defined data type for  BIGNUMERIC.
>>>
>>> Please let me know if there is any way to handle this.
>>>
>>> Thanks in Advance,
>>> Anitha Thankappan
>>>
>>> *This message contains information that may be privileged or
>>> confidential and is the property of the Quantiphi Inc and/or its 
>>> affiliates**.
>>> It is intended only for the person to whom it is addressed. **If you
>>> are not the intended recipient, any review, dissemination, distribution,
>>> copying, storage or other use of all or any portion of this message is
>>> strictly prohibited. If you received this message in error, please
>>> immediately notify the sender by reply e-mail and delete this message in
>>> its **entirety*
>>>
>>
> *This message contains information that may be privileged or confidential
> and is the property of the Quantiphi Inc and/or its affiliates**. It is
> intended only for the person to whom it is addressed. **If you are not
> the intended recipient, any review, dissemination, distribution, copying,
> storage or other use of all or any portion of this message is strictly
> prohibited. If you received this message in error, please immediately
> notify the sender by reply e-mail and delete this message in its *
> *entirety*
>


Re: HOP_PROCTIME is returning null

2022-04-07 Thread Francesco Guardiani
Maybe the reason is because the HOP_PROCTIME gets the name of the column?
Can you share query and plan?

On Mon, Apr 4, 2022 at 3:41 PM Surendra Lalwani 
wrote:

> Hi Team,
>
> HOP_PROCTIME in flink version 1.13.6 is returning null while in previous
> versions it used to output a time attribute, any idea why this behaviour is
> observed?
>
> If anybody has any alternative, it will be highly appreciable.
>
> Example: HOP_PROCTIME(PROCTIME() , INTERVAL '30' SECOND, INTERVAL '30'
> SECOND)
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.


Re: python table api

2022-04-07 Thread Francesco Guardiani
As Dian sad, your insert into query is just selecting records from source
to print, flowing them without any computation whatsoever.

Please check out [1] and [2] to learn how to develop queries that perform
aggregations over windows. Note that the second method (window tvf) is
preferred and recommended over the first.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/

On Thu, Apr 7, 2022 at 3:09 AM Dian Fu  wrote:

> You have not configured the tumbling window at all. Please refer to [1]
> for more details.
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-agg/#group-window-aggregation
>
> On Wed, Apr 6, 2022 at 10:46 PM ivan.ros...@agilent.com <
> ivan.ros...@agilent.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’m trying to understand tumbling windows at the level of the python
>> table api.For this short example:
>>
>>
>>
>> Input csv
>>
>> Print output
>>
>> 2022-01-01 10:00:23.0, "data line 3"
>>
>> 2022-01-01 10:00:24.0, "data line 4"
>>
>> 2022-01-01 10:00:18.0, "data line 1"
>>
>> 2022-01-01 10:00:25.0, "data line 5"
>>
>> 2022-01-01 10:00:26.0, "data line 6"
>>
>> 2022-01-01 10:00:27.0, "data line 7"
>>
>> 2022-01-01 10:00:22.0, "data line 2"
>>
>> 2022-01-01 10:00:28.0, "data line 8"
>>
>> 2022-01-01 10:00:29.0, "data line 9"
>>
>> 2022-01-01 10:00:30.0, "data line 10"
>>
>> +I[2022-01-01T10:00:23,  "data line 3"]
>>
>> +I[2022-01-01T10:00:24,  "data line 4"]
>>
>> +I[2022-01-01T10:00:18,  "data line 1"]
>>
>> +I[2022-01-01T10:00:25,  "data line 5"]
>>
>> +I[2022-01-01T10:00:26,  "data line 6"]
>>
>> +I[2022-01-01T10:00:27,  "data line 7"]
>>
>> +I[2022-01-01T10:00:28,  "data line 8"]
>>
>> +I[2022-01-01T10:00:29,  "data line 9"]
>>
>> *+I[2022-01-01T10:00:22,  "data line 2"]*
>>
>> +I[2022-01-01T10:00:30,  "data line 10"]
>>
>>
>>
>> Below, I’m trying to process this data in 5 second windows.  So I would
>> at least expect not to see the bold line above, in print output.
>>
>>
>>
>> Am I not really configuring tumbling windows in the source table?
>>
>>
>>
>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>
>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>
>> t_env.get_config().get_configuration().set_string("parallelism.default",
>> "1")
>>
>>
>>
>> t_env.execute_sql("""
>>
>> create table source (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING,
>>
>> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
>>
>> ) with (
>>
>> 'connector' = 'filesystem',
>>
>> 'format' = 'csv',
>>
>> 'path' = '{}'
>>
>> )
>>
>> """.format("source.csv"))
>>
>>
>>
>> t_env.execute_sql("""
>>
>> CREATE TABLE print (
>>
>> ts TIMESTAMP(3),
>>
>> data STRING
>>
>> ) WITH (
>>
>> 'connector' = 'print'
>>
>> )
>>
>> """)
>>
>>
>>
>> t_env.execute_sql("INSERT INTO print SELECT * FROM source").wait()
>>
>>
>>
>>
>>
>> Thank you,
>>
>>
>>
>> Ivan
>>
>


Re: Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax

2022-03-10 Thread Francesco Guardiani
> We still need some work to make the Hive dialect purely rely on public
APIs, and the Hive connector should be decopule with table planner.

>From the table perspective, I think this is the big pain point at the
moment. First of all, when we talk about the Hive syntax, we're really
talking about the Hive connector, as my understanding is that without the
Hive connector in the classpath you can't use the Hive syntax [1].

The Hive connector is heavily relying on internals [2], and this is an
important struggle for the table project, as sometimes is impedes and slows
down development of new features and creates a huge maintenance burden for
table developers [3]. The planner itself has some classes specific to Hive
[4], making the codebase of the planner more complex than it already is.
Some of these are just legacy, others exists because there are some
abstractions missing in the table planner side, but those just need some
work.

So I agree with Jark, when the two Hive modules (connector-hive and
sql-parser-hive) reach a point where they don't depend at all on
flink-table-planner, like every other connector (except for testing of
course), we should be good to move them in a separate repo and continue
committing to them. But right now I don't think It's a good idea adding new
features on top, as it will create only more maintenance burden both for
Hive developers and for table developers.

My concern with this plan is: how much realistic is to fix all the planner
internal leaks in the existing Hive connector/parser? To me this seems like
a huge task, including a non trivial amount of work to stabilize and design
new entry points in Table API.

[1] HiveParser
<https://github.com/apache/flink/blob/a5847e3871ffb9515af9c754bd10c42611976c82/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java>
[2] HiveParserCalcitePlanner
<https://github.com/apache/flink/blob/6628237f72d818baec094a2426c236480ee33380/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java>
[3] Just talking about code coupling, not even mentioning problems like
dependencies and security updates
[4] HiveAggSqlFunction
<https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveAggSqlFunction.java>

On Thu, Mar 10, 2022 at 9:05 AM Martijn Visser 
wrote:

> Thank you Yuxia for volunteering, that's really much appreciated. It would
> be great if you can create an umbrella ticket for that.
>
> It would be great to get some insights from currently Flink and Hive users
> which versions are being used.
> @Jark I would indeed deprecate the old Hive versions in Flink 1.15 and
> then drop them in Flink 1.16. That would also remove some tech debt and
> make it less work with regards to externalizing connectors.
>
> Best regards,
>
> Martijn
>
> On Thu, 10 Mar 2022 at 07:39, Jark Wu  wrote:
>
>> Thanks Martijn for the reply and summary.
>>
>> I totally agree with your plan and thank Yuxia for volunteering the Hive
>> tech debt issue.
>> I think we can create an umbrella issue for this and target version 1.16.
>> We can discuss
>> details and create subtasks there.
>>
>> Regarding dropping old Hive versions, I'm also fine with that. But I
>> would like to investigate
>> some Hive users first to see whether it's acceptable at this point. My
>> first thought was we
>> can deprecate the old Hive versions in 1.15, and we can discuss dropping
>> it in 1.16 or 1.17.
>>
>> Best,
>> Jark
>>
>>
>> On Thu, 10 Mar 2022 at 14:19, 罗宇侠(莫辞) 
>> wrote:
>>
>>> Thanks Martijn for your insights.
>>>
>>> About the tech debt/maintenance with regards to Hive query syntax, I
>>> would like to chip-in and expect it can be resolved for Flink 1.16.
>>>
>>> Best regards,
>>>
>>> Yuxia
>>>
>>> --原始邮件 --
>>> *发件人:*Martijn Visser 
>>> *发送时间:*Thu Mar 10 04:03:34 2022
>>> *收件人:*User 
>>> *主题:*Fwd: [DISCUSS] Flink's supported APIs and Hive query syntax
>>>
>>>> (Forwarding this also to the User mailing list as I made a typo when
>>>> replying to this email thread)
>>>>
>>>> -- Forwarded message -
>>>> From: Martijn Visser 
>>>> Date: Wed, 9 Mar 2022 at 20:57
>>>> Subject: Re: [DISCUSS] Flink's supported APIs and Hive query syntax
>>>> To: dev , Francesco Guardiani <
>>>> france...@ververica.com>, Timo Walther , <
>>>> us...@flink.apache.org

Re: Help with pom dependencies for Flink with Table API

2022-03-03 Thread Francesco Guardiani
Hi,
The moving of
org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory was
done in master a couple of months ago by me, and it should be only on 1.15+.

Could it be you're somehow mixing master snapshots with 1.14.x? Are you
trying to run the job on a cluster using a Flink distribution downloaded
from the website, which version is snapshot and not 1.14?

FG

On Wed, Mar 2, 2022 at 6:44 PM Adesh Dsilva  wrote:

> Hello,
>
> I think I accidentally posted this question on the wrong email list (dev)
> so I am posting it again here.
>
>
> I am struggling to run my test Flink project with Table API
>
> I am trying to run a simple piece of code:
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> Path in = Path.fromLocalFile(new File("part-v001-o000-r-00330.avro"));
> AvroInputFormat users = new AvroInputFormat<>(in, BidSample.class);
> DataStream bidsDS = env.createInput(users);
>
> Table bidsTable = tableEnv.fromDataStream(bidsDS);
> bidsTable.printSchema();
>
>
>
> And here is my pom dependencies
>
> 
>org.apache.flink
>flink-streaming-java_2.12
>1.14.3
> 
> 
>org.apache.flink
>flink-clients_2.12
>1.14.3
> 
> 
>org.apache.flink
>flink-table-api-java-bridge_2.12
>1.14.3
> 
> 
>org.apache.flink
>flink-table-planner_2.12
>1.14.3
> 
> 
>org.apache.flink
>flink-table-common
>1.14.3
> 
> 
>org.apache.flink
>flink-table-api-java
>1.14.3
> 
>
>
>
>
> But I keep getting below error:
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:766)
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
>
>
> Any help to know why this is happening and fix it would be much
> appreciated.
>
> Many thanks,
> Adesh DSilva
>
>


Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

2022-02-17 Thread Francesco Guardiani
Hi,
Filesystem source directory watching is going to be available from 1.15:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching

FG

On Fri, Feb 18, 2022 at 1:28 AM M Singh  wrote:

> Hi:
>
> I have a simple application and am using file system connector to monitor
> a directory and then print to the console (using datastream).  However, the
> application stops after reading the file in the directory (at the moment I
> have a single file in the directory).   I am using Apache Flink version
> 1.14.3.
>
>  believe there is a configuration option to be used in the 'with' clause
> but I could not find the right config - I tried 'streaming-source.enable'
> = 'true' but that results in exception.
>
> I have also tried using EnvironmentSettings in streaming mode (as shown
> below) but still the application stops after reading the file in the
> directory.
>
> Here is the code segment:
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> public class TestApplication {
>
> public static void main(String [] args) throws Exception {
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(see,
> settings);
>
> tEnv.executeSql(
> "  CREATE TEMPORARY TABLE events (" +
> "  `event_id` STRING" +
> ")" +
> "WITH (" +
> "  'connector' = 'filesystem'," +
> "  'path' = './src/main/resources/events/'," +
> "  'format' = 'json'" +
> ")"
> );
>
> Table events = tEnv.sqlQuery(
> "SELECT * from events"
> );
> tEnv.toDataStream(events).print("events");
>
> see.execute();
> }
> }
>
> Here is the console output:
>
> events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
>
> Process finished with exit code 0
>
>
> Thanks
>


Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-17 Thread Francesco Guardiani
Why do you need MapPartitionFunction?

On Wed, Feb 16, 2022 at 7:02 PM HG  wrote:

> Thanks
>
> Would the option for datastream be to write a MapPartitionFunction?
>
> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> > Which does not work since it cannot find lag function :-(
>>
>> lag and over are not supported at the moment with Table, so you need to
>> use SQL for that.
>>
>> > *Will this obey the watermark strategy of the original Datastream?
>> (see further below)*
>>
>> Yes. The code at the end of the mail is correct and should work fine. I
>> have just one comment, if you're using this DataStream only to create the
>> Table instance, you could also just define the watermark using the Schema
>> builder itself, as described here:
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>
>> On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:
>>
>>> Hello all
>>>
>>> I need to calculate the difference in time between ordered rows per
>>> transactionId. All events should arrive within the timeframe set by the
>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>> ignored.
>>>
>>> In SQL this would be :
>>> select transactionId  , handlingTime , *handlingTime -
>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>> as elapsedTime*
>>> from table
>>>
>>> When I code :
>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>>> *handlingTime
>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>
>>> *Will this obey the watermark strategy of the original Datastream? (see
>>> further below)*
>>> I have also tried to use the Table Api with a session window like :
>>> Table t = tupled3DsTable
>>>.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>> "w")).groupBy($("transactionId"), $("w"))
>>>.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>>> "handlingTime").max().over($("w")));
>>> This gives:
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Could not resolve over call.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>
>>> and also :
>>> Table t = tupled3DsTable
>>> .window(Over.partitionby($("transactionId")).orderBy($(
>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>>> Which does not work since it cannot find lag function :-(
>>>
>>> In java I have the following setup:
>>> WatermarkStrategy> wmstrategy =
>>> WatermarkStrategy
>>> .>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>> .withTimestampAssigner(new 
>>> SerializableTimestampAssigner>> String, String>>() {
>>> @Override
>>> public long extractTimestamp(Tuple3
>>> element, long handlingTime) {
>>> return element.f0;
>>>  }});
>>>
>>> DataStream> tuple3dswm = 
>>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>
>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>>  "SOURCE_WATERMARK()")
>>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>>
>>>
>>>
>>>
>>>
>>>


Re: Apache Flink - User Defined Functions - Exception when passing all arguments

2022-02-17 Thread Francesco Guardiani
Hi,

The SQL syntax is not supported, as the SQL standard itself does not allow
it. It sounds strange that it fails at validation phase rather than when
parsing, but it shouldn't work anyway.

I suggest you to just use Table API for that, as it's richer. You can even
use withColumns(range(..)) which gives you more control.

Hope it helps,
FG

On Thu, Feb 17, 2022 at 1:34 AM M Singh  wrote:

> Hi:
>
> I have a simple concatenate UDF (for testing purpose) defined as:
>
> public static class ConcatenateFunction extends ScalarFunction {
> public String eval(@DataTypeHint(inputGroup = InputGroup.ANY)
> Object ... inputs) {
> return Arrays.stream(inputs).map(i -> i.toString()).collect(
> Collectors.joining(","));
> }
> }
>
>
> and register it with the streaming table env:
>
> tEnv.createTemporarySystemFunction("concatenateFunction",
> ConcatenateFunction.class);
>
> However when I call the function as shown below - I get an exception
> indicating that the '*' is  an unknown identifier as shown below.
>
> Table concat = tEnv.sqlQuery(
> "SELECT  concatenateFunction(*) " +
> "FROM test_table"
> );
>
>
> I am printing the rows at the end of the test application.
>
> The exception is:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. At line 1, column 29: Unknown identifier '*'
>
>
> The document (User-defined Functions
> )
> shows how to call the function with all args using scala/java :
>
> env.from("MyTable").select(call(MyConcatFunction.class, $("*")));
>
>
> But I could not find how to call the UDF using SQL syntax as shown above
> (select concatenateFunction(*) from test_table).
>
> Can you please let me know if there a way to pass all arguments to a UDF
> in SQL  ?
>
> Thanks
>


Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Are you sure you're always matching the output row type provided by
DynamicTableFactory
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/factories/DynamicTableFactory.Context.html#getPhysicalRowDataType-->
?

Also looking at the javadocs it seems like you can use both internal and
external types, depending on your preference:

* AsyncTableFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>
* AsyncTableFunctionProvider

<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html>
Not sure how I can help more without looking at the full code, perhaps can
you provide a fully working reproducible?

FG
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.html>

On Wed, Feb 16, 2022 at 4:15 PM Jonathan Weaver 
wrote:

> No, I'm creating a custom SQL lookup table (which uses
> AsyncTableFunction) which requires the internal types.
>
> I implement
> the LookupTableSource, AsyncTableFunction, DynamicTableSourceFactory
> trio as per the examples in the docs.
>
> My construction is the equivalent of this, and it still errors with that
> exception when using exactly this.
>
>   Map foo = new HashMap ArrayData>();
>   foo.put(
>   StringData.fromString("foo"),
>   new GenericArrayData(new Object[]
> {StringData.fromString("bar")}));
>   MapData mapColumn = new GenericMapData(foo);
>
>   return (RowData)GenericRowData(new Object[] { mapColumn
> } );
>
>
>
>
> On Wed, Feb 16, 2022 at 8:02 AM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>>
>> From what I understand, you're creating a scalar function taking a string
>> with json and then converting it to a map using a custom function.
>>
>> Assuming I understood correctly, I think the problem here is that you're
>> using internal data types for UDFs, which is discouraged in most of the use
>> cases. Rather than using StringData, MapData, ArrayData etc you should just
>> use Java's String, Map and arrays. Check out this particular paragraph of
>> our docs that shows using complex types for scalar functions:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
>> Please try to convert
>> Looking only at the exception you provide here, it definitely seems like
>> a wrong usage of the internal data types, like that Tuple2 inserted into a
>> GenericMapData. There are no checks in GenericMapData to check that you're
>> constructing it with the correct types, and since Tuple2 is not a correct
>> type, the serializer just fails hard.
>>
>> Please correct me if I misunderstood what you're doing, and in case
>> provide more info about what your goal and how you've implemented the job.
>>
>> FG
>>
>> On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
>> wrote:
>>
>>> I've narrowed it down to a TableSource that is returning a MAP type as a
>>> column. Only errors when the column is referenced, and not on the first
>>> row, but somewhere in the stream of rows.
>>>
>>> On 1.15 master branch (I need the new JSON features in 1.15 for this
>>> project so riding the daily snapshot during development)
>>>
>>> In catalog column is defined as
>>> .column("vc", DataTypes.MAP(DataTypes.STRING(),
>>> DataTypes.ARRAY(DataTypes.STRING(
>>>
>>> My TableFunction is returning the following for the column
>>>
>>>   return new GenericMapData(
>>>   fields.toJavaMap(
>>>   v ->
>>>   new Tuple2(
>>>   StringData.fromString(v.getKey()),
>>>   new GenericArrayData(
>>>   v.getValue().isArray()
>>>   ? List.ofAll(() ->
>>> ((ArrayNode) v.getValue()).elements())
>>>   .map(vv ->
>>> StringData.fromString(vv.asText()))
>>>
>>> .toJavaArray(StringData[]::new)
>>>   :
>>> List.of(StringData.fromString(v.getValue().asText()))
>>>
>>> .toJavaArray(StringData[]::new);
>>> });
&

Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-16 Thread Francesco Guardiani
> Which does not work since it cannot find lag function :-(

lag and over are not supported at the moment with Table, so you need to use
SQL for that.

> *Will this obey the watermark strategy of the original Datastream? (see
further below)*

Yes. The code at the end of the mail is correct and should work fine. I
have just one comment, if you're using this DataStream only to create the
Table instance, you could also just define the watermark using the Schema
builder itself, as described here:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-

On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:

> Hello all
>
> I need to calculate the difference in time between ordered rows per
> transactionId. All events should arrive within the timeframe set by the
> out-of-orderness ( a couple of minutes). Events outside this time should be
> ignored.
>
> In SQL this would be :
> select transactionId  , handlingTime , *handlingTime -
> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
> as elapsedTime*
> from table
>
> When I code :
> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
> *handlingTime
> - if(null(lag(handlingTime) over (partition by transactionId order by
> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>
> *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
> I have also tried to use the Table Api with a session window like :
> Table t = tupled3DsTable
>.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
> )).groupBy($("transactionId"), $("w"))
>.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
> "handlingTime").max().over($("w")));
> This gives:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not resolve over call.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> and also :
> Table t = tupled3DsTable
> .window(Over.partitionby($("transactionId")).orderBy($(
> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
> "originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
> Which does not work since it cannot find lag function :-(
>
> In java I have the following setup:
> WatermarkStrategy> wmstrategy =
> WatermarkStrategy
> . String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
> .withTimestampAssigner(new SerializableTimestampAssigner String, String>>() {
> @Override
> public long extractTimestamp(Tuple3
> element, long handlingTime) {
> return element.f0;
>  }});
>
> DataStream> tuple3dswm = 
> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>  "SOURCE_WATERMARK()")
> .build()).as("handlingTime", "transactionId", "originalEvent");
>
>
>
>
>
>


Re: Joining Flink tables with different watermark delay

2022-02-16 Thread Francesco Guardiani
ps://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/parquet/#:~:text=CREATE%20TABLE%20user_behavior%20(%0A%20%20user_id%20BIGINT%2C%0A%20%20item_id%20BIGINT%2C%0A%20%20category_id%20BIGINT%2C%0A%20%20behavior%20STRING%2C%0A%20%20ts%20TIMESTAMP(3)%2C%0A%20%20dt%20STRING%0A)%20PARTITIONED%20BY%20(dt)%20WITH%20(%0A%20%27connector%27%20%3D%20%27filesystem%27%2C%0A%20%27path%27%20%3D%20%27/tmp/user_behavior%27%2C%0A%20%27format%27%20%3D%20%27parquet%27%0A)>
>requires specifying the column names and data types. However, in our case
>we use the Protobuf schema to read the schema for a parquet file. Also,
>some values in the parquet file need some custom type conversion ( int64 ->
>timestamp, for example).
>
>
> I had a question with regards to this point you mentioned :
> *> In other words, it won't drop the content of kafkaTable immediately,
> but after both streams are at "the same point in time" (defined by the
> watermarks of both streams).*
> Does it mean that the output of the join will be flushed to the sink at
> the period defined by the minimum watermark ? That is, 60 minutes in the
> above case ?
> Also, I read here
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#:~:text=Since%20time%20attributes%20are%20quasi%2Dmonotonic%20increasing%2C%20Flink%20can%20remove%20old%20values%20from%20its%20state%20without%20affecting%20the%20correctness%20of%20the%20result.>
>  that
> Flink will remove old data from its state in case of interval joins. Does
> this mean that data present in both the tables will be removed after the
> minimum watermark delay ( 60 minutes in this case) ?
>
> Regards,
> Meghajit
>
> On Mon, Feb 14, 2022 at 8:13 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> Hi,
>>
>> So my understanding of your query is that you want to do a join first,
>> and then group by a 60 minutes distance and aggregate them. Please correct
>> me if I'm wrong.
>>
>> First of all, the query you've posted is incorrect and should fail, as
>> its plan is invalid because it's using a regular join. Regular joins cannot
>> be concatenated with other "time operations" like a group by window, as
>> they don't produce any watermark.
>>
>> My suggestion for your query is to use an interval join
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#interval-joins>
>> first, and then a group window. For example:
>>
>> SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time,
>> MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time))
>> FROM (
>>   SELECT fileTable.id AS id, fileTable.event_time AS file_time,
>> kafkaTable.event_time AS kafka_time
>>   FROM fileTable, kafkaTable
>>   WHERE fileTable.id = kafkaTable.id AND fileTable.event_time BETWEEN
>> kafkaTable.event_time - INTERVAL '1' HOUR AND kafkaTable.event_time
>> )
>> GROUP BY id, TUMBLE(file_time, INTERVAL '60' MINUTE)
>>
>> This produces the correct result, as the interval join will produce the
>> cartesian product of the events at a maximum distance of 1 hour between
>> them, and at runtime they'll emit the minimum watermark between the two
>> inputs. In other words, it won't drop the content of kafkaTable
>> immediately, but after both streams are at "the same point in time"
>> (defined by the watermarks of both streams).
>> After the cartesian product is emitted from the interval join, the group
>> by will be executed.
>>
>> I also have some tips:
>>
>> * As described here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation>,
>> we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
>> TVF instead
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#windowing-tvfs>
>> * You can directly use Window joins
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/>
>> as well for your query, as they're meant exactly to cover your use case
>> * Any particular reason you're creating the input tables from DataStream
>> instead than creating them directly from Table API using either CREATE
>> TABLE
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
>> or TableDescriptor?
>>
>> Hope it helps,
>> FG
>>
>>
>> On Mon, Feb 14, 2022 at 8:39 AM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> w

Re: Exception Help

2022-02-16 Thread Francesco Guardiani
Hi,

>From what I understand, you're creating a scalar function taking a string
with json and then converting it to a map using a custom function.

Assuming I understood correctly, I think the problem here is that you're
using internal data types for UDFs, which is discouraged in most of the use
cases. Rather than using StringData, MapData, ArrayData etc you should just
use Java's String, Map and arrays. Check out this particular paragraph of
our docs that shows using complex types for scalar functions:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference.
Please try to convert
Looking only at the exception you provide here, it definitely seems like a
wrong usage of the internal data types, like that Tuple2 inserted into a
GenericMapData. There are no checks in GenericMapData to check that you're
constructing it with the correct types, and since Tuple2 is not a correct
type, the serializer just fails hard.

Please correct me if I misunderstood what you're doing, and in case provide
more info about what your goal and how you've implemented the job.

FG

On Wed, Feb 16, 2022 at 4:02 AM Jonathan Weaver 
wrote:

> I've narrowed it down to a TableSource that is returning a MAP type as a
> column. Only errors when the column is referenced, and not on the first
> row, but somewhere in the stream of rows.
>
> On 1.15 master branch (I need the new JSON features in 1.15 for this
> project so riding the daily snapshot during development)
>
> In catalog column is defined as
> .column("vc", DataTypes.MAP(DataTypes.STRING(),
> DataTypes.ARRAY(DataTypes.STRING(
>
> My TableFunction is returning the following for the column
>
>   return new GenericMapData(
>   fields.toJavaMap(
>   v ->
>   new Tuple2(
>   StringData.fromString(v.getKey()),
>   new GenericArrayData(
>   v.getValue().isArray()
>   ? List.ofAll(() -> ((ArrayNode)
> v.getValue()).elements())
>   .map(vv ->
> StringData.fromString(vv.asText()))
>
> .toJavaArray(StringData[]::new)
>   :
> List.of(StringData.fromString(v.getValue().asText()))
>
> .toJavaArray(StringData[]::new);
> });
>
> Where it's basically looping over a jackson JsonNode parsed from a DB
> table and returning as a MAP (the keys and values are sparse amongst
> hundreds of possibilities). The values in the Json are either a single text
> value, or an array of text values so I'm just turning all values into an
> array.
>
> There are around ~190 key-values in the map on average.
>
> The SQL that references the column is just
>
> COALESCE(ELEMENT(vc [ 'ARCHIVE_TASKING' ]), product_type) type,
>
> So looks up a specific key and uses it if it exists, otherwise coalesces
> to a generic string.
>
> And I keep getting this exception during the processing on a random row.
>
> Caused by: java.lang.IndexOutOfBoundsException: offset=0, targetOffset=24,
> numBytes=8, address=16, targetAddress=16
> at
> org.apache.flink.core.memory.MemorySegment.copyTo(MemorySegment.java:1441)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeSegmentsToVarLenPart(AbstractBinaryWriter.java:249)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeString(AbstractBinaryWriter.java:93)
> at
> org.apache.flink.table.data.writer.BinaryArrayWriter.writeString(BinaryArrayWriter.java:30)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:140)
> at
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.toBinaryArray(ArrayDataSerializer.java:210)
> at
> org.apache.flink.table.data.writer.AbstractBinaryWriter.writeArray(AbstractBinaryWriter.java:109)
> at
> org.apache.flink.table.data.writer.BinaryArrayWriter.writeArray(BinaryArrayWriter.java:30)
> at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:147)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.toBinaryMap(MapDataSerializer.java:175)
> at
> org.apache.flink.table.runtime.typeutils.MapDataSerializer.copy(MapDataSerializer.java:109)
> at TableCalcMapFunction$130.flatMap_split26(Unknown Source)
> at TableCalcMapFunction$130.flatMap(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
>
> Is that enough context or is there something else I can give you all?
>
> Thanks!
>
>
>
>
> On Tue, Feb 15, 2022 at 1:24 PM Sid Kal  wrote:
>
>> Hi Jonathan,
>>
>> It would be better if you describe your scenario along with the code. It
>> would be easier for the community to help.
>>
>> On Tue, 15 Feb 2022, 23:33 Jonathan Weaver, 
>> wrote:
>>
>>> I'm getting the following exception 

Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Yep every operator usually cleans state of records past a received watermark

On Mon, Feb 14, 2022 at 4:03 PM HG  wrote:

> Will keys that are out dated disappear?
>
> It is infact a kind of sessions window that can start at any time.
> Constantly new keys will appear.
>
>
>
>
>
>
> On Mon, Feb 14, 2022, 15:57 Francesco Guardiani 
> wrote:
>
>> Hi,
>>
>> - bounded out of orderness: This means that you have essentially a stream
>> where events can come late of a certain amount of time, compared to the
>> "newest" event received. For example, with a bounded out of orderness of 5
>> minutes, you essentially say to Flink that your stream can receive an event
>> of time 1PM today, and then immediately after that you can still receive
>> another one of time 1PM - 5 minutes, and Flink should consider it. But if
>> you rather receive one with time 1PM - 6mins, then Flink will consider this
>> one as "late" and drop it. This is essentially the way Flink is able to not
>> retain indefinitely your events.
>> - with idleness: Because the stream generator needs new records to come
>> in before advancing the stream, if your stream is stale, then no watermark
>> is produced, that means that records after that watermark will not be
>> processed.
>>
>> Reading your requirement, my understanding is that your input stream,
>> that is InputTable, requires a bounded out of orderness of 5 minutes.
>> For idleness, it really depends on whether your load can become stale at
>> some point in time or not: if your stream can be stale for long period of
>> times (say for a couple of days nothing is produced), then you should set
>> an idleness which after that, a watermark is produced.
>>
>> Idleness is
>>
>> On Fri, Feb 11, 2022 at 2:53 PM HG  wrote:
>>
>>> Hi,
>>>
>>> I am getting a headache when thinking about watermarks and timestamps.
>>> My application reads events from Kafka  (they are in json format) as a
>>> Datastream
>>> Events can be keyed by a transactionId and have a event timestamp
>>> (handlingTime)
>>>
>>> All events belonging to a single transactionId will arrive in a window
>>> of a couple of minutes (say max 5 minutes).
>>> As soon as this 5 minutes has passed it should calculate the differences
>>> in timestamp between the ordered events, add that elapsed time to every
>>> event and emit them to the Sink.
>>>
>>> I basically want to use the table api to do
>>> "SELECT transactionId, handlingTime, handlingTime - lag(handlingTime)
>>> over (partition by transactionId order by handlingTime) as elapsedTime,
>>> originalEvent FROM InputTable"
>>>
>>> After the result of this query has been pushed to the Sink all data with
>>> respect to this transactionId can be discarded.
>>>
>>> What kind of watermark do I need to use?
>>> - bounded out of orderness?
>>> - with idleness?
>>> - ...
>>>
>>> Late events can be ignored. They will rarely happen.
>>>
>>> Regards Hans-Peter
>>>
>>>
>>>
>>>


Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
Yep that should do it, perhaps are you willing to contribute to that docs
page adding the import? :)

On Mon, Feb 14, 2022 at 4:34 PM HG  wrote:

> The static was missing 
>
> import static org.apache.flink.table.api.Expressions.*;
>
>
>
>
> Op ma 14 feb. 2022 om 15:45 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> >   symbol:   method $(java.lang.String)
>> >  location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden
>>
>> What is esl.job.cag.verwerkingstijden.CagVerwerkingsTijden? Sounds like
>> a bad import? Also, have you checked you have Flink deps aligned?
>>
>> On Mon, Feb 14, 2022 at 3:17 PM HG  wrote:
>>
>>>
>>> Hi,
>>>
>>> When I do :
>>>
>>> Table counts = t
>>> .groupBy($("transactionId"))
>>> .select($("transactionId"),
>>> $("handlingTime").sum().as("summedhandlingTime"));
>>>
>>> The code below fails with :
>>>
>>> cannot find symbol
>>> .select($("transactionId"),
>>> $("handlingTime").sum().as("summedhandlingTime"));
>>> ^
>>>   symbol:   method $(java.lang.String)
>>>   location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden
>>>
>>>
>>> The same is true when I copy the code below from
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/common/
>>>
>>> DataStream> stream1 = env.fromElements(new 
>>> Tuple2<>(1, "hello"));
>>> DataStream> stream2 = env.fromElements(new 
>>> Tuple2<>(1, "hello"));
>>>
>>> // explain Table API
>>> Table table1 = tableEnv.fromDataStream(stream1, $("count"), 
>>> $("word"));
>>> Table table2 = tableEnv.fromDataStream(stream2, $("count"), 
>>> $("word"));
>>> Table table = table1
>>> .where($("word").like("F%"))
>>> .unionAll(table2);
>>>
>>>
>>> Anyone a clue?
>>>
>>>
>>> Regards Hans
>>>
>>>


Re: table api watermarks, timestamps, outoforderness and head aches

2022-02-14 Thread Francesco Guardiani
Hi,

- bounded out of orderness: This means that you have essentially a stream
where events can come late of a certain amount of time, compared to the
"newest" event received. For example, with a bounded out of orderness of 5
minutes, you essentially say to Flink that your stream can receive an event
of time 1PM today, and then immediately after that you can still receive
another one of time 1PM - 5 minutes, and Flink should consider it. But if
you rather receive one with time 1PM - 6mins, then Flink will consider this
one as "late" and drop it. This is essentially the way Flink is able to not
retain indefinitely your events.
- with idleness: Because the stream generator needs new records to come in
before advancing the stream, if your stream is stale, then no watermark is
produced, that means that records after that watermark will not be
processed.

Reading your requirement, my understanding is that your input stream, that
is InputTable, requires a bounded out of orderness of 5 minutes. For
idleness, it really depends on whether your load can become stale at some
point in time or not: if your stream can be stale for long period of times
(say for a couple of days nothing is produced), then you should set an
idleness which after that, a watermark is produced.

Idleness is

On Fri, Feb 11, 2022 at 2:53 PM HG  wrote:

> Hi,
>
> I am getting a headache when thinking about watermarks and timestamps.
> My application reads events from Kafka  (they are in json format) as a
> Datastream
> Events can be keyed by a transactionId and have a event timestamp
> (handlingTime)
>
> All events belonging to a single transactionId will arrive in a window of
> a couple of minutes (say max 5 minutes).
> As soon as this 5 minutes has passed it should calculate the differences
> in timestamp between the ordered events, add that elapsed time to every
> event and emit them to the Sink.
>
> I basically want to use the table api to do
> "SELECT transactionId, handlingTime, handlingTime - lag(handlingTime) over
> (partition by transactionId order by handlingTime) as elapsedTime,
> originalEvent FROM InputTable"
>
> After the result of this query has been pushed to the Sink all data with
> respect to this transactionId can be discarded.
>
> What kind of watermark do I need to use?
> - bounded out of orderness?
> - with idleness?
> - ...
>
> Late events can be ignored. They will rarely happen.
>
> Regards Hans-Peter
>
>
>
>


Re: select($("transactionId)) : cannot find symbol symbol: method $(java.lang.String)

2022-02-14 Thread Francesco Guardiani
>   symbol:   method $(java.lang.String)
>  location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden

What is esl.job.cag.verwerkingstijden.CagVerwerkingsTijden? Sounds like a
bad import? Also, have you checked you have Flink deps aligned?

On Mon, Feb 14, 2022 at 3:17 PM HG  wrote:

>
> Hi,
>
> When I do :
>
> Table counts = t
> .groupBy($("transactionId"))
> .select($("transactionId"),
> $("handlingTime").sum().as("summedhandlingTime"));
>
> The code below fails with :
>
> cannot find symbol
> .select($("transactionId"),
> $("handlingTime").sum().as("summedhandlingTime"));
> ^
>   symbol:   method $(java.lang.String)
>   location: class esl.job.cag.verwerkingstijden.CagVerwerkingsTijden
>
>
> The same is true when I copy the code below from
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/common/
>
> DataStream> stream1 = env.fromElements(new 
> Tuple2<>(1, "hello"));
> DataStream> stream2 = env.fromElements(new 
> Tuple2<>(1, "hello"));
>
> // explain Table API
> Table table1 = tableEnv.fromDataStream(stream1, $("count"), 
> $("word"));
> Table table2 = tableEnv.fromDataStream(stream2, $("count"), 
> $("word"));
> Table table = table1
> .where($("word").like("F%"))
> .unionAll(table2);
>
>
> Anyone a clue?
>
>
> Regards Hans
>
>


Re: Joining Flink tables with different watermark delay

2022-02-14 Thread Francesco Guardiani
Hi,

So my understanding of your query is that you want to do a join first, and
then group by a 60 minutes distance and aggregate them. Please correct me
if I'm wrong.

First of all, the query you've posted is incorrect and should fail, as its
plan is invalid because it's using a regular join. Regular joins cannot be
concatenated with other "time operations" like a group by window, as they
don't produce any watermark.

My suggestion for your query is to use an interval join

first, and then a group window. For example:

SELECT TUMBLE_START(file_time, INTERVAL '60' MINUTE) AS event_time,
MAX(TIMESTAMPDIFF(MINUTE, file_time, kafka_time))
FROM (
  SELECT fileTable.id AS id, fileTable.event_time AS file_time,
kafkaTable.event_time AS kafka_time
  FROM fileTable, kafkaTable
  WHERE fileTable.id = kafkaTable.id AND fileTable.event_time BETWEEN
kafkaTable.event_time - INTERVAL '1' HOUR AND kafkaTable.event_time
)
GROUP BY id, TUMBLE(file_time, INTERVAL '60' MINUTE)

This produces the correct result, as the interval join will produce the
cartesian product of the events at a maximum distance of 1 hour between
them, and at runtime they'll emit the minimum watermark between the two
inputs. In other words, it won't drop the content of kafkaTable
immediately, but after both streams are at "the same point in time"
(defined by the watermarks of both streams).
After the cartesian product is emitted from the interval join, the group by
will be executed.

I also have some tips:

* As described here
,
we have deprecated the syntax `GROUP BY WINDOW`, you should use windowing
TVF instead

* You can directly use Window joins

as well for your query, as they're meant exactly to cover your use case
* Any particular reason you're creating the input tables from DataStream
instead than creating them directly from Table API using either CREATE TABLE

or TableDescriptor?

Hope it helps,
FG


On Mon, Feb 14, 2022 at 8:39 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We are creating two data streams in our Flink application. Both of them
> are then formed into two Tables. The first data stream has a watermark
> delay of 24 hours while the second stream has a watermark delay of 60
> minutes. The watermark used is of BoundedOutOfOrderness strategy and uses a
> particular event_time field present within the the records themselves to
> assign watermarks.
>
> For example,
>
> DataStream fileStream = env.fromSource(
> fileSource,
> getWatermarkStrategy(8640), // custom function,
> watermark of 24 hours in ms
> "fileSource");
> Table firstTable = tableEnv.fromDataStream(fileStream, apiExpressions);
> tableEnv.createTemporaryView("fileTable", firstTable);
>
> DataStream kafkaStream = env.fromSource(
> kafkaSource,
> getWatermarkStrategy(360), // custom function, 
> watermark
> of 60 minutes in ms
> "kafkaSource");
> Table secondTable = tableEnv.fromDataStream(kafkaStream, apiExpressions);
> tableEnv.createTemporaryView("kafkaTable", secondTable);
>
> Now we want to write a continuous SQL query to join
> 
>  firstTable and secondTable with a TumbleWindow of 60 minutes
>
> "SELECT TUMBLE_START(fileTable.rowtime, INTERVAL '60' MINUTE) AS
> event_time,
> MAX(TIMESTAMPDIFF(MINUTE, fileTable.event_time, kafkaTable.event_time))," +
> "FROM fileTable, kafkaTable " +
> "where fileTable.id = kafkaTable.id " +
> "group by TUMBLE(fileTable.rowtime, INTERVAL '60'
> MINUTE)"
>
> What we want to know is, will a join or aggregation queries work correctly
> between the two tables.  Is it the case that the contents of  kafkaTable
> will be purged immediately after 60 minutes and hence a join/aggregation
> might not give correct results ?
> Will there be a data loss if tables with different watermark delays are
> joined ?
>
> --
> *Regards,*
> *Meghajit*
>


Re: Json deserialisation with .jsonValue vs format=json in Table API

2022-02-03 Thread Francesco Guardiani
Hi,

I think the more stable option would be the first one, as it also gives you
more flexibility. Reading the row as string and then parsing it in a query
definitely costs more, and makes less straightforward to use the other
Schema features of table, such as watermark definition, primary keys, etc.

I guess you can implement it straightforwardly subclassing the existing
json format provided by flink, in particular
JsonRowDataDeserializationSchema.

A third solution would be to create a SplitFunction, like the one you
created, which directly performs the parsing, outputting rows rather than
strings. This removes the double parsing issue, but still create problems
when interacting with other schema features.

Hope it helps,
FG

On Thu, Feb 3, 2022 at 3:56 PM Илья Соин  wrote:

> Hi,
>
> I’m using the Table / SQL API.
>
> I have a stream of strings, where each message contains several json
> strings separated by "\n”.
> For example:
> {“timestamp”: “2021-01-01T00:00:00”, person: {“name”: “Vasya”}}\n
> {“timestamp”: “2021-01-01T01:00:00”, person: {“name”: “Max” }}
>
> I would like to split each message by “\n”, parse each string as a json
> object and get some of the fields.
>
> AFIK there are 2 ways to do it:
>
> 1) Write custom deserialiser and provide it in source table DDL, i.e.
> CREATE TABLE source (
> timestamp STRING,
> person: ROW(name STRING)
> )
> WITH(‘format’ = ‘multiline-json’, …);
>
> 2) Use ‘format’ = ‘raw’ and extract the needed fields using .jsonValue,
> i.e.
>
> CREATE TABLE source (
> row STRING
> );
>
> env.from("source")
> .joinLateral(
> call(SplitFunction.class, $("row"), "\n").as(“msg")
> )
> .select(
>  $("msg").jsonValue("$.timestamp", DataTypes.STRING()),
>  $("msg").jsonValue(“$.person.name",
> DataTypes.STRING()).as(“name”)
>);
>
> In 2), will each call of .jsonValue parse the string all over again or
> will it reuse the same JsonNode object internally? Which option better fits
> my problem?
>
> __
> Best, Ilya


Re: Creating Flink SQL Row with named fields

2022-02-03 Thread Francesco Guardiani
Hi,

Unfortunately at the moment, creating a row with named fields is not
possible from the ROW constructor.

One solution could be to wrap it in a cast, like: CAST((f0 + 12, 'Hello
world') AS ROW)
Or you could create a UDF and use the @DataTypeHint to define the row
return type, with named fields.

Feel free to open an issue about that

FG

On Wed, Feb 2, 2022 at 5:18 PM Vladislav Keda <
vladislav.k...@glowbyteconsulting.com> wrote:

> Hi,
>
> I'm trying to create Row(..) using Flink SQL, but I can't assign names to
> its fields.
>
>
> *For example:*Input table1 structure:* (id INT, some_name STRING)*
> Query:  *select *, ROW(id, some_name) as row1 from table1*
> Output result structure:
> *(id  INT , some_name  STRING, row1 ROW (EXPR$0 INT, EXPR$1 STRING))*
>
> *Each nested field has a name like EXPR$ that does not satisfy me.*
>
> *If I write, for example:*Input table1 structure:* (id INT, some_name
> STRING)*
> Query:  *select *, ROW(id as nested_id, some_name as nested_some_name) as
> row1 from table1*
> Output result structure: *(id  INT , some_name  STRING, row1 ROW (EXPR$0
> INT, EXPR$1 STRING))*
>
>
> *I will get an exception like: *
>
>
>
>
>
>
>
>
> *Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Encountered "as" at line 1, column 20.Was expecting one of:")"
> ..."," ...at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> at
> ru.glowbyte.streaming.core.operators.internal.sql.SqlDrivenOperator.sqlQuery(SqlDrivenOperator.java:159)
> ... 59 more*
>
> How can I set the name for the field?
>
> Flink version - 1.13.3.
>
> ---
>
> Best Regards,
> Vladislav Keda
>


Re: Future support for custom FileEnumerator in FileSource?

2022-02-02 Thread Francesco Guardiani
Hi,
>From what I see here
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/AbstractFileSource.AbstractFileSourceBuilder.html#setFileEnumerator-org.apache.flink.connector.file.src.enumerate.FileEnumerator.Provider-
the file enumerator can be setup with the FileSourceBuilder:

fileSourceBuilder.setFileEnumerator(new FileEnumerator.Provider() {
@Override
public FileEnumerator create() {
// Do something
return null;
}
})


Hope it helps,
FG


Re: How to proper hashCode() for keys.

2022-02-02 Thread Francesco Guardiani
Hi,
your hash code and equals seems correct. Can you post a minimum stream
pipeline reproducer using this class?

FG

On Tue, Feb 1, 2022 at 8:39 PM John Smith  wrote:

> Hi, getting java.lang.IllegalArgumentException: Key group 39 is not in
> KeyGroupRange{startKeyGroup=96, endKeyGroup=103}. Unless you're directly
> using low level state access APIs, this is most likely caused by
> non-deterministic shuffle key (hashCode and equals implementation).
>
> This is my class, is my hashCode deterministic?
>
> public final class MyEventCountKey {
> private final String countDateTime;
> private final String domain;
> private final String event;
>
> public MyEventCountKey(final String countDateTime, final String domain, 
> final String event) {
> this.countDateTime = countDateTime;
> this.domain = domain;
> this.event = event;
> }
>
> public String getCountDateTime() {
> return countDateTime;
> }
>
> public String getDomain() {
> return domain;
> }
>
> public String getEven() {
> return event;
> }
>
> @Override
> public String toString() {
> return countDateTime + "|" + domain + "|" + event;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyEventCountKey that = (MyEventCountKey) o;
> return countDateTime.equals(that.countDateTime) &&
> domain.equals(that.domain) &&
> event.equals(that.event);
> }
>
> @Override
> public int hashCode() {
> final int prime = 31;
> int result = 1;
> result = prime * result + countDateTime.hashCode();
> result = prime * result + domain.hashCode();
> result = prime * result +  event.hashCode();
> return result;
> }
> }
>
>


Re: KafkaSource vs FlinkKafkaConsumer010

2022-02-01 Thread Francesco Guardiani
I think the FlinkKakfaConsumer010 you're talking about is the old source
api. You should use only KafkaSource now, as they use the new source
infrastructure.

On Tue, Feb 1, 2022 at 9:02 AM HG  wrote:

> Hello Francesco
> Perhaps I copied the wrong link of 1.2.
> But there is also
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>
> It seems there are 2 ways to use Kafka
>
> KafkaSource source = KafkaSource.builder()
> .setBootstrapServers(brokers)
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
>
> And like this:
>
> Properties kafkaProperties = new Properties();
> kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
> kafkaProperties.put("group.id",kafkaGroupID);
> kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
> FlinkKafkaConsumer010 kafkaConsumer = new 
> FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), 
> kafkaProperties);
> kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
>
>
> There is even a FlinkKafkaConsumer011
>
> Which one is preferable ? Or have they different use cases?
>
> Regards Hans
>
>
> Op di 1 feb. 2022 om 08:55 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> The latter link you posted refers to a very old flink release. You shold
>> use the first link, which refers to latest release
>>
>> FG
>>
>> On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:
>>
>>> Hello all
>>>
>>> I am confused.
>>> What is the difference between KafkaSource as defined in :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
>>> and FlinkKafkaConsumer010 as defined in
>>> https://nightlies.apache.org/flink/flink-docs-release-
>>> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html>
>>>
>>> When should I use which?
>>>
>>> Regards Hans
>>>
>>


Re: KafkaSource vs FlinkKafkaConsumer010

2022-01-31 Thread Francesco Guardiani
The latter link you posted refers to a very old flink release. You shold
use the first link, which refers to latest release

FG

On Tue, Feb 1, 2022 at 8:20 AM HG  wrote:

> Hello all
>
> I am confused.
> What is the difference between KafkaSource as defined in :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
> and FlinkKafkaConsumer010 as defined in
> https://nightlies.apache.org/flink/flink-docs-release-
> 1.2/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.html
> 
>
> When should I use which?
>
> Regards Hans
>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Francesco Guardiani
> Shameless plug:  Maybe the Wikipedia EventStreams
<https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
<https://stream.wikimedia.org/?doc#/streams> would make for a great
connector example in Flink?

Sounds like a great idea! Do you have a ready to use Java Client for that?

On Mon, Jan 31, 2022 at 3:47 PM Jing Ge  wrote:

> Thanks @Martijn for driving this! +1 for deprecating and removing it. All
> the concerns mentioned previously are valid. It is good to know that the
> upcoming connector template/archetype will help the user for the kickoff.
> Beyond that, speaking of using a real connector as a sample, since Flink is
> heading towards the unified batch and stream processing, IMHO, it would be
> nice to pick up a feasible connector for this trend to let the user get a
> sample close to the use cases.
>
> Best regards
> Jing
>
> On Mon, Jan 31, 2022 at 3:07 PM Andrew Otto  wrote:
>
>> Shameless plug:  Maybe the Wikipedia EventStreams
>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
>> <https://stream.wikimedia.org/?doc#/streams> would make for a great
>> connector example in Flink?
>>
>> :D
>>
>> On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks for your feedback. It's not about having this connector in the
>>> main repo, that has been voted on already. This is strictly about the
>>> connector itself, since it's not maintained and most probably also can't be
>>> used due to changes in Twitter's API that aren't reflected in our connector
>>> implementation. Therefore I propose to remove it.
>>>
>>> Fully agree on the template part, what's good to know is that a
>>> connector template/archetype is part of the goals for the external
>>> connector repository.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani <
>>> france...@ververica.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I agree with the concern about having this connector in the main repo.
>>>> But I think in general it doesn't harm to have a sample connector to show
>>>> how to develop a custom connector, and I think that the Twitter connector
>>>> can be a good candidate for such a template. It needs rework for sure, as
>>>> it has evident issues, notably it doesn't work with table.
>>>>
>>>> So i understand if we wanna remove what we have right now, but I think
>>>> we should have some replacement for a "connector template", which is both
>>>> ready to use and easy to hack to build your own connector starting from it.
>>>> Twitter API is a good example for such a template, as it's both "related"
>>>> to the known common use cases of Flink and because is quite simple to get
>>>> started with.
>>>>
>>>> FG
>>>>
>>>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
>>>> wrote:
>>>>
>>>>> I agree.
>>>>>
>>>>> The Twitter connector is used in a few (unofficial) tutorials, so if
>>>>> we remove it that will make it more difficult for those tutorials to be
>>>>> maintained. On the other hand, if I recall correctly, that connector uses
>>>>> V1 of the Twitter API, which has been deprecated, so it's really not very
>>>>> useful even for that purpose.
>>>>>
>>>>> David
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
>>>>> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I would like to discuss deprecating Flinks' Twitter connector [1].
>>>>>> This was one of the first connectors that was added to Flink, which could
>>>>>> be used to access the tweets from Twitter. Given the evolution of Flink
>>>>>> over Twitter, I don't think that:
>>>>>>
>>>>>> * Users are still using this connector at all
>>>>>> * That the code for this connector should be in the main Flink
>>>>>> codebase.
>>>>>>
>>>>>> Given the circumstances, I would propose to deprecate and remove this
>>>>>> connector. I'm looking forward to your thoughts. If you agree, please 
>>>>>> also
>>>>>> let me know if you think we should first deprecate it in Flink 1.15 and
>>>>>> remove it in a version after that, or if you think we can remove it
>>>>>> directly.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Martijn Visser
>>>>>> https://twitter.com/MartijnVisser82
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>>>>>>
>>>>>>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Francesco Guardiani
Hi,

I agree with the concern about having this connector in the main repo. But
I think in general it doesn't harm to have a sample connector to show how
to develop a custom connector, and I think that the Twitter connector can
be a good candidate for such a template. It needs rework for sure, as it
has evident issues, notably it doesn't work with table.

So i understand if we wanna remove what we have right now, but I think we
should have some replacement for a "connector template", which is both
ready to use and easy to hack to build your own connector starting from it.
Twitter API is a good example for such a template, as it's both "related"
to the known common use cases of Flink and because is quite simple to get
started with.

FG

On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
wrote:

> I agree.
>
> The Twitter connector is used in a few (unofficial) tutorials, so if we
> remove it that will make it more difficult for those tutorials to be
> maintained. On the other hand, if I recall correctly, that connector uses
> V1 of the Twitter API, which has been deprecated, so it's really not very
> useful even for that purpose.
>
> David
>
>
>
> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
> wrote:
>
>> Hi everyone,
>>
>> I would like to discuss deprecating Flinks' Twitter connector [1]. This
>> was one of the first connectors that was added to Flink, which could be
>> used to access the tweets from Twitter. Given the evolution of Flink over
>> Twitter, I don't think that:
>>
>> * Users are still using this connector at all
>> * That the code for this connector should be in the main Flink codebase.
>>
>> Given the circumstances, I would propose to deprecate and remove this
>> connector. I'm looking forward to your thoughts. If you agree, please also
>> let me know if you think we should first deprecate it in Flink 1.15 and
>> remove it in a version after that, or if you think we can remove it
>> directly.
>>
>> Best regards,
>>
>> Martijn Visser
>> https://twitter.com/MartijnVisser82
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>>
>>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread Francesco Guardiani
So in Flink we essentially have 2 main APIs to define stream topologies:
one is DataStream and the other one is Table API. My guess is that right
now you're trying to use DataStream with the Kafka connector.

DataStream allows you to statically define a stream topology, with an API
in a similar fashion to Java Streams or RxJava.
Table API on the other hand gives you the ability to define stream jobs
using SQL, where you can easily perform operations such as joins over
windows.

Flink is definitely able to solve your use case, with both APIs. You can
also mix these two APIs in your application to solve your use case in the
way you want.
I suggest you start by looking at the documentation of Table API
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/overview/
and then, for your specific use case, check
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/
.

Hope it helps.
FG

On Fri, Jan 7, 2022 at 10:58 AM HG  wrote:

> Hi Francesco.
>
> I am not using anything right now apart from Kafka.
> Just need to know whether Flink is capable of doing this and trying to
> understand the documentation and terminology etc.
> I grapple a bit to understand the whole picture.
>
> Thanks
>
> Regards Hans
>
> Op vr 7 jan. 2022 om 09:24 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> Hi,
>> Are you using SQL or DataStream? For SQL you can use the Window TVF
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-tvf/>
>> feature, where the window size is the "max" elapsed time, and then inside
>> the window you pick the beginning and end event and join them.
>>
>> Hope it helps,
>> FG
>>
>> On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:
>>
>>> Hello all,
>>>
>>> My question is basically whether it is possible to group events by a key
>>> (these will belong to a specific transaction) and then calculate the
>>> elapsed times between them based on a timestamp that is present in the
>>> event.
>>> So a transaction my have x events all timestamped and with the
>>> transaction_id as key.
>>> Is it possible to
>>> 1. group them by the key
>>> 2. order by the timestamp,
>>> 3. calculate the elapsed times between the steps/event
>>> 4. add that elapsed time to the step/event
>>> 5. output the modified events to the sink
>>>
>>>
>>>
>>> Regards Hans
>>>
>>


Re: Moving off of TypeInformation in Flink 1.11

2022-01-07 Thread Francesco Guardiani
Hi Sofya,
DataStream API doesn't use DataTypes, but it still uses TypeInformation.
DataTypes and LogicalTypes are relevant only for Table API.

If I understood what you're trying to do, you don't need to manually
transform to Row, but you only need to define the Schema when crossing the
boundary from DataStream to Table API through
StreamTableEnvironment#fromDataStream
.


Look at the javadoc of this method:
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-

Hope it helps,
FG

On Thu, Jan 6, 2022 at 4:06 PM Sofya T. Irwin  wrote:

> Hi,
>
> I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
>  of TypeInformation and onto DataTypes in preparation for upgrading Flink
>  to Flink 1.13 or higher.
>
> I’m having trouble moving off of TypeInformation.
>
> Specifically I have a section of code that maps a DataStream[Message] to a
> DataStream[Row]:
>
>   implicit val typeInformation: TypeInformation[Row] =
>  myObject.getProducedType
>   val resultStream: DataStream[Row] = dataStream.map(msg =>
> myTransform(msg))
>
> Note that myTransform() takes in a Message object and returns a Row object.
> Message is an internal class that we are using.
> The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
> later.
>
> If I comment out the implicit val above, I get a failure:
>
>   TableSource of type com.MyTableSourceFromDataStream returned a
> DataStream of data type
>   GenericType that does not match with the
> data type
>   ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
> the
>   TableSource.getProducedDataType() method. Please validate the
> implementation of the TableSource.
>
> I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
> seems that the implementation of DataStream.map() is not changed and still
> uses TypeInformation.
>
> https://github.com/apache/flink/blob/master/flink
> -streaming-scala/src/main/scala/org/apache/flink
> /streaming/api/scala/DataStream.scala#L657
>
> Based on the code above it seems that the issue is that Flink's
> DataStream.map function uses TypeInformation.
>
> I’m not sure if there’s an equivalent DataType implicit that I should be
> declaring instead. Or if I should be using some function other than map
>
> Do you have any suggestions for how to proceed? I'd like to completely
> move off of TypeInformation in my app.
>
> Thanks,
> Sofya
>


Re: adding elapsed times to events that form a transaction

2022-01-07 Thread Francesco Guardiani
Hi,
Are you using SQL or DataStream? For SQL you can use the Window TVF

feature, where the window size is the "max" elapsed time, and then inside
the window you pick the beginning and end event and join them.

Hope it helps,
FG

On Thu, Jan 6, 2022 at 3:25 PM HG  wrote:

> Hello all,
>
> My question is basically whether it is possible to group events by a key
> (these will belong to a specific transaction) and then calculate the
> elapsed times between them based on a timestamp that is present in the
> event.
> So a transaction my have x events all timestamped and with the
> transaction_id as key.
> Is it possible to
> 1. group them by the key
> 2. order by the timestamp,
> 3. calculate the elapsed times between the steps/event
> 4. add that elapsed time to the step/event
> 5. output the modified events to the sink
>
>
>
> Regards Hans
>


Re: Provide DataTypeHint for ScalarUDF where the return type is Object[]

2022-01-03 Thread Francesco Guardiani
Hi,

Can you provide a reproducer? Sounds like a bug, but I might be wrong.

I wonder why you need List, can't you infer the type?

In any case, You can workaround this issue overriding the method
UserDefinedFunction#getTypeInference to return a custom TypeInference,
which you can build with your own output TypeStrategy.

FG

On Mon, Jan 3, 2022 at 11:13 AM Arujit Pradhan 
wrote:

> Hey team,
>
> We are migrating our Flink codes from Flink-1.9 to Flink-1.14 and as a
> part of this, we are updating a bunch of UDFs. Wanted to understand, how to
> provide *data type hints for the UDFs which return Object[]*.
>
> For example, if the return type is simply Object something like this works.
>
> *@DataTypeHint(inputGroup = InputGroup.ANY)*
>
> But could not find any examples on how to add type hints in case of an
> eval function returning Object[]. If we explicitly, return something like
> List instead of providing type hints works, but this might cause
> issues downstream since lots of running jobs currently use in-built UDFs
> like `*Cardinality*` on the result of the UDFs which fails if the return
> type is List.
>
> Thanks in advance.
> //arujit
>
>


Re: Defining a RowType for FileSource - Parquet

2021-12-17 Thread Francesco Guardiani
Hi,

This API is directly used by Table to create a filesystem source with
parquet format. So I guess there are no other solutions at the moment, but
first of all I wonder, is there any way you can get the schema out of a
Parquet file to infer the parsing function?

FG


On Fri, Dec 17, 2021 at 11:01 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> I'm looking at Flink doc for DataStream usage of FileSource with Parquet
> file [1]
>
> In that example we have to manually define the row format while definign
> the source, like so:
>
> final LogicalType[] fieldTypes =
>   new LogicalType[] {
>   new DoubleType(), new IntType(), new VarCharType()
>   };
>
> I'm wondering what is the recommend pattern for cases where Parquet row
> has many columns, for example 100. Do we have to define them all by hand?
>
> Regards,
> Krzysztof Chmielewski
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/parquet/
>


Re: Parquet schema per bucket in Streaming File Sink

2021-11-30 Thread Francesco Guardiani
Hi Zack,

> I want to customize this job to "explode" the map as column names and
values

You can do this in a select statement extracting manually the map values
using the map access built-in
,
e.g.:

SELECT mymap['a'] AS a, mymap['b'] AS b

> specifically the BucketAssigner and the CheckpointRollingPolicy both
appear to be required to have a bucketId of a String.

I wonder if what you're looking for is the PARTITIONED BY feature:

CREATE TABLE MySinkTable (
  ...) PARTITIONED BY (partitionKey1, partitionKey2)

Does this solves your use case?

FG


On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel  wrote:

> Hey all,
>
> I have a job which writes data that is a similar shape to a location in
> s3. Currently it writes a map of data with each row. I want to customize
> this job to "explode" the map as column names and values, these are
> consistent for a single bucket. Is there any way to do this? Provide a
> custom parquet schema per bucket within a single dynamic sink?
>
> I've started looking at the changes within the main codebase to make this
> feasible. It seems straightforward to provide the bucketId to the
> writerFactory, and the bucketId could be a type containing the relevant
> schema information.
> Although it appears that the BulkFormatBuilder has several spots where
> BucketId appears to be required to be a String: specifically
> the BucketAssigner and the CheckpointRollingPolicy both appear to be
> required to have a bucketId of a String.
>
> I'm curious if this is a change the community would be open to, and or if
> there is another way to accomplish what I'm looking for that I've missed.
>
> Thanks,
> Zack
>
>


Re: Table API Filesystem connector - disable interval rolling policy

2021-11-22 Thread Francesco Guardiani
Hi,
Looking at the code, there is no ability to disable the rollover-interval.

But I'm wondering, what are you trying to do? Write a file up to the
configured file-size? Note that if you're using auto compaction, on every
checkpoint you'll have a rollover, regardless of the rollover-interval.

I cc'ed Fabian in the discussion which has a better knowledge than me on
file sink relates topics.

FG

On Mon, Nov 22, 2021 at 3:51 PM Matthias Pohl 
wrote:

> Hi Kamil,
> by looking at the code I'd say that the only option you have is to
> increase the parameter you already mentioned to a very high number. But I'm
> not sure about the side effects. I'm gonna add Francesco to this thread.
> Maybe he has better ideas on how to answer your question.
>
> Best,
> Matthias
>
> On Mon, Nov 22, 2021 at 10:32 AM Kamil ty  wrote:
>
>> Hey all,
>>
>> I wanted to know if there is a way to disable the interval rolling policy
>> in the Table API filesystem connector.
>> From flink docs: FileSystem | Apache Flink
>> 
>> The key to change the interval: sink.rolling-policy.rollover-interval
>> Is it possible to fully disable this rolling policy or the only solution
>> is to set a very big duration?
>>
>> Best Regards
>> Kamil
>>
>


Re: Table to DataStream conversion and field ordering

2021-11-22 Thread Francesco Guardiani
>From a first look, this seems to be a bug. If not, it's certainly a feature
worth supporting.

Mind opening an issue with a reproducer?


On Thu, Nov 18, 2021 at 1:37 PM Oliver Moser  wrote:

> Hi!
>
> I'm running into a problem when converting back and forth from a streaming 
> table to a data stream. Given the following
> table DDL:
>
>create table masterdata
>(
>   facility text,
>   manufacturer text,
>   serial integer,
>   latitude double precision,
>   longitude double precision,
>   elevation double precision
>);
>
> and a corresponding POJO
>
>public class MasterData {
>
>   public Double elevation;
>
>   public String facility;
>
>   public Double latitude;
>
>   public Double longitude;
>
>   public String manufacturer;
>
>   public Long serial;
>
>   // getter/setter omitted
>
>}
>
> I register the database using JdbcCatalog like this:
>
>JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, 
> password, baseUrl);
>tableEnv.registerCatalog("cat", catalog);
>tableEnv.useCatalog("cat");
>
> and if I try to create a table with either "SELECT * FROM masterdata" or via
>
>tableEnv.from("masterdata");
>
> It will bail out with an exception similar to
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for
>registered table 'cat.postgres.Unregistered_DataStream_Sink_1' do not 
> match.
>
>Cause: Incompatible types for sink column 'elevation' at position 1.
>
>Query schema: [facility: STRING, manufacturer: STRING, serial: INT, 
> latitude: DOUBLE, longitude: DOUBLE, elevation: DOUBLE]
>Sink schema:  [elevation: DOUBLE, facility: STRING, latitude: DOUBLE, 
> longitude: DOUBLE, manufacturer: STRING, serial: BIGINT]
>
> If i explicitly set the order of the columns in the SELECT like this:
>
>tableEnv.sqlQuery("SELECT 
> elevation,facility,latitude,longitude,manufacturer,serial from masterdata");
>
> it works. In the debugger I can see that "queryFields" and "sinkField" in the 
> call to DynamicSinkUtils.validateSchemaAndApplyImplicitCast
> () are not aligned, i.e. the order of the fields in those two lists are not 
> the same, hence the exception.
>
> According to relevant note in the docs [1]:
>
>the planner reorders fields and inserts implicit casts where possible to 
> convert internal
>data structures to the desired structured type
>
> Which makes me think that as long as the names of the fields in the POJO 
> correspond to the column names in the table,
> the planner should take care of reordering and the explicit "SELECT 
> elevation, ..." should not be needed.
>
> What am I missing?
>
> Thanks!
>
> Oliver
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-todatastream
>
>
>


Re: Flink SQL build-in function questions.

2021-11-12 Thread Francesco Guardiani
Yep I agree with waiting for calcite to support it. As a temporary
workaround you can define your own udfs with that functionality.

I also wonder, are the bitwise operators defined in the ansi sql
specification? Or should we just follow the common sense behavior of
databases supporting it?

On Fri, Nov 12, 2021 at 9:54 AM JIN FENG  wrote:

> Sure, I can take a try. Before starting the work, we should discuss the
> api of bit operation function. There are two alternatives
> 1. add some built in functions include bitAnd,bitNot,bitOr,bitXor
> 2. support  &, |, ^, ~ operators in calcite first. Currently, there is a
> relative jira https://issues.apache.org/jira/browse/CALCITE-3732 .
>
> The second would be better, But we should wait for the calcite community
> to  support bit function operators first.
>
> On Fri, Nov 12, 2021 at 1:12 PM JIN FENG  wrote:
>
>> Sure, I can take a try. Before starting the work, we should discuss the
>> api of bit operation function. There are two alternatives
>> 1. add some built in functions include bitAnd,bitNot,bitOr,bitXor
>> 2. support  &, |, ^, ~ operators in calcite first. Currently, there is a
>> relative jira https://issues.apache.org/jira/browse/CALCITE-3732 .
>>
>> The second would be better, But we should wait for the calcite community
>> to  support bit function operators first.
>>
>> On Thu, Nov 11, 2021 at 4:13 PM Martijn Visser 
>> wrote:
>>
>>> Hi,
>>>
>>> I don't think there's currently anyone in the community who is working
>>> on the bit operation functions. Would you be interested and able to make a
>>> contribution on that?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Thu, 11 Nov 2021 at 03:54, JIN FENG  wrote:
>>>
 hi all,
 I met two problems when I use FlinkSQL.

 1.   Is there any plan  to support bit operation functions ?  Currently
 there is some jira mentioned about this,
 https://issues.apache.org/jira/browse/FLINK-14990 ,
 https://issues.apache.org/jira/browse/FLINK-12451  But It seems that
 it hasn't been updated for a long time.

 2.  When I use hive module functions,  percentile_approx would fail
 because of  https://issues.apache.org/jira/browse/FLINK-15855 .   Is
 there any other way to use the percentile functions ?




 thanks,

 jinfeng

>>>


Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Francesco Guardiani
Have you tried this?

context.getConfiguration().get(org.apache.flink.configuration.PipelineOptions.GLOBAL_JOB_PARAMETERS)



On Tue, Nov 9, 2021 at 3:59 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi,
> Is there a way to access GlobalJobParameters registered as
> env.getConfig().setGlobalJobParameters(parameters);
>
> from DynamicTableSourceFactory implementation?
> To be more specific
> from DynamicTableSourceFactory::createDynamicTableSource method.
>
> The Context parameter of createDynamicTableSource has access only
> to ReadableConfig object which does not have GlobalParameters.
>
> Cheers,
> Krzysztof Chmielewski
>


Re: Need help with window TopN query

2021-11-04 Thread Francesco Guardiani
As a rule of thumb, I would first try to check that Flink ingests correctly
your csv. Perhaps try to run just a select on your input and see if the
input is parsed as expected and is ordered.

On Thu, Nov 4, 2021 at 12:47 PM Martijn Visser 
wrote:

> Hi Pavel,
>
> There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN,
> see
> https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md.
> I think that could help you with your use case too.
>
> Best regards,
>
> Martijn
>
> On Thu, 4 Nov 2021 at 12:42, Pavel Penkov  wrote:
>
>> When the query changed to
>>
>> SELECT user_id, ts, rownum
>> FROM (
>>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
>> window_end, user_id ORDER BY ts ASC) as rownum
>>   FROM TABLE(
>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>> )
>> WHERE rownum = 1
>>
>> runs but doesn't produce any results. I've tried different window sizes,
>> the source file is sorted by timestamp.
>>
>> On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> I think the issue here is that the nested select is selecting all the
>>> fields produced by the TVF, including window_time (which is implicitly
>>> added by the TVF as described here
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>).
>>> Because of that, the planner cannot resolve the timestamp to use as
>>> event-time in the result stream. Try to select only the fields you need in
>>> the nested SELECT, e.g.:
>>>
>>> SELECT *
>>> FROM (
>>>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
>>> window_end, user_id ORDER BY ts ASC) as rownum
>>>   FROM TABLE(
>>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>>> )
>>> WHERE rownum = 1
>>>
>>> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov 
>>> wrote:
>>>
>>>> I'm trying to express a supposedly simple query with Flink SQL - log
>>>> the first visit a day for each user. Source table is defined like
>>>>
>>>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS
>>>> ts)
>>>> WITH ('connector' = 'filesystem',
>>>> 'path' = 'file:///visits.csv',
>>>> 'format' = 'csv')
>>>>
>>>> The query I came with up is
>>>>
>>>> SELECT *
>>>> FROM (
>>>>   SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
>>>> user_id ORDER BY ts ASC) as rownum
>>>>   FROM TABLE(
>>>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>>>> )
>>>> WHERE rownum = 1
>>>>
>>>> But it fails with error
>>>> [ERROR] Could not execute SQL statement. Reason:
>>>> org.apache.flink.table.api.TableException: Found more than one rowtime
>>>> field: [ts, window_time] in the query when insert into
>>>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'.
>>>> Please select the rowtime field that should be used as event-time
>>>> timestamp for the DataStream by casting all other fields to TIMESTAMP.
>>>>
>>>> Any ideas on how to fix this?
>>>>
>>>


Re: Need help with window TopN query

2021-11-04 Thread Francesco Guardiani
I think the issue here is that the nested select is selecting all the
fields produced by the TVF, including window_time (which is implicitly
added by the TVF as described here
).
Because of that, the planner cannot resolve the timestamp to use as
event-time in the result stream. Try to select only the fields you need in
the nested SELECT, e.g.:

SELECT *
FROM (
  SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
window_end, user_id ORDER BY ts ASC) as rownum
  FROM TABLE(
TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
)
WHERE rownum = 1

On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov  wrote:

> I'm trying to express a supposedly simple query with Flink SQL - log the
> first visit a day for each user. Source table is defined like
>
> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS ts)
> WITH ('connector' = 'filesystem',
> 'path' = 'file:///visits.csv',
> 'format' = 'csv')
>
> The query I came with up is
>
> SELECT *
> FROM (
>   SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
> user_id ORDER BY ts ASC) as rownum
>   FROM TABLE(
> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
> )
> WHERE rownum = 1
>
> But it fails with error
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Found more than one rowtime
> field: [ts, window_time] in the query when insert into
> 'default_catalog.default_database.Unregistered_Collect_Sink_6'.
> Please select the rowtime field that should be used as event-time
> timestamp for the DataStream by casting all other fields to TIMESTAMP.
>
> Any ideas on how to fix this?
>


Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Francesco Guardiani
An alternative is to use a CDC tool like Debezium to stream your table
changes, and then ingest that stream using Flink to push data later to
Kafka.

On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma  wrote:

> Hi, Qihua
>
> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
> to archive this target.
>
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Our flink application has two sinks(DB and kafka topic). We want to push
>> same data to both sinks. Is it possible to push data to kafka topic only
>> after data is pushed to DB successfully? If the commit to DB fail, we don't
>> want those data is pushed to kafka.
>>
>> Thanks,
>> Qihua
>>
>


Re: Flink-1.12 Sql on Job two SQL sink control order

2021-10-14 Thread Francesco Guardiani
I'm not aware of any way to control the sink order, afaik each
Table#executeInsert will generate a separate job on its own. You may be
able to hack it around by having a custom DynamicTableSink that for each
record sends it to tidb and then to kafka.

May I ask why you need that? If the notification system after the Kafka
sink depends on tidb, perhaps you need a retry system there that can wait
for tidb to ingest and process those data?

On Thu, Oct 14, 2021 at 10:40 AM WuKong  wrote:

> Hi all:
>  I have two Flink SQL , the same source  from Kafka,  and one SQL
> sink data into Tidb ,another one SQL sink Kafka to notify downstream
> system, how can I control the sink order , I wish If source Kafka data
> come, first sink Tidb and after that sink Kafka .
>
> --
> ---
> Best,
> WuKong
>


Re: How to deserialize Avro enum type in Flink SQL?

2021-10-14 Thread Francesco Guardiani
It reproduces on my machine, so I've opened a JIRA issue about that:
FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
Unfortunately, I don't have any ready to use workarounds for you.

On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim  wrote:

> Can you provide a minimal reproducer (without confluent schema registry)
>> with a valid input?
>>
>
> Please download and unzip the attached file.
>
>- src/main/avro/MyProtocol.avdl
>   - MyRecord, MyEntry, and the MyEnumType is defined
>   - "mvn generate-sources" will auto-generate Java classes under
>   "target/generated-sources"
>- "org.example.fs" contains
>   - "org.example.fs.Writer" which writes a single record of MyRecord
>   type to "output.avro"
>   - "org.example.fs.Reader" which reads the record from "output.avro"
>   - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>   defined in "my_table.ddl" and shows that it successfully deserialize
>   MyRecord from a Avro record written in a file as you mentioned.
>- "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>"org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>looks up the schema from Schema Registry
>   - However, it produces the same exception unlike
>   ExampleFromFileSystem
>   - What I produced to a Kafka topic is {"entries": [{"type":
>   "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
> Avro
>   record saved on output.avro.
>   - The size of "output.avro" is 321 bytes on the disk while the size
>   of the value of a Kafka record is 10 bytes.
>
> Hope this provides enough information.
>
> Best,
>
> Dongwon
>
> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> First of all, are you sure the input data is correct? From the stacktrace
>> it seems to me the issue might be that the input data is invalid.
>>
>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>> should work with avro enums. Can you provide a minimal reproducer (without
>> confluent schema registry) with a valid input?
>>
>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim 
>> wrote:
>>
>>> Hi community,
>>>
>>> Can I get advice on this question?
>>>
>>> Another user just sent me an email asking whether I found a solution or
>>> a workaround for this question, but I'm still stuck there.
>>>
>>> Any suggestions?
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>> -- Forwarded message -
>>> From: Dongwon Kim 
>>> Date: Mon, Aug 9, 2021 at 7:26 PM
>>> Subject: How to deserialize Avro enum type in Flink SQL?
>>> To: user 
>>>
>>>
>>> Hi community,
>>>
>>> I have a Kafka topic where the schema of its values is defined by the
>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>>> Schema Registry.
>>>
>>>> @namespace("my.type.avro")
>>>> protocol MyProtocol {
>>>>   enum MyEnumType {
>>>> TypeVal1, TypeVal2
>>>>   }
>>>>   record MyEntry {
>>>> MyEnumType type;
>>>>   }
>>>>   record MyRecord {
>>>> array entries;
>>>>   }
>>>> }
>>>
>>>
>>> To read from the topic, I've defined the following DDL:
>>>
>>>> CREATE TABLE my_table
>>>
>>> (
>>>> `entries` ARRAY>>> *`type` ??? (This is the main question)*
>>>> >>
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'my-topic',
>>>> 'properties.bootstrap.servers' = '...:9092',
>>>> 'scan.startup.mode' = 'latest-offset',
>>>> 'value.format' = 'avro-confluent',
>>>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>>
>>> )
>>>
>>>
>>> And I run the following query :
>>>
>>>> SELECT * FROM my_table
>>>
>>>
>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
>>> for the type:
>>>
>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>&g

Re: How to deserialize Avro enum type in Flink SQL?

2021-10-13 Thread Francesco Guardiani
First of all, are you sure the input data is correct? From the stacktrace
it seems to me the issue might be that the input data is invalid.

Looking at the code of AvroToRowDataConverters, It sounds like STRING
should work with avro enums. Can you provide a minimal reproducer (without
confluent schema registry) with a valid input?

On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim  wrote:

> Hi community,
>
> Can I get advice on this question?
>
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
>
> Any suggestions?
>
> Thanks in advance,
>
> Dongwon
>
> -- Forwarded message -
> From: Dongwon Kim 
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user 
>
>
> Hi community,
>
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
>
>> @namespace("my.type.avro")
>> protocol MyProtocol {
>>   enum MyEnumType {
>> TypeVal1, TypeVal2
>>   }
>>   record MyEntry {
>> MyEnumType type;
>>   }
>>   record MyRecord {
>> array entries;
>>   }
>> }
>
>
> To read from the topic, I've defined the following DDL:
>
>> CREATE TABLE my_table
>
> (
>> `entries` ARRAY> *`type` ??? (This is the main question)*
>> >>
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'my-topic',
>> 'properties.bootstrap.servers' = '...:9092',
>> 'scan.startup.mode' = 'latest-offset',
>> 'value.format' = 'avro-confluent',
>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>
> )
>
>
> And I run the following query :
>
>> SELECT * FROM my_table
>
>
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
>
>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>   at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>   at
>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>   at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>   at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>> *Caused by: org.apache.avro.AvroTypeException: Found
>> my.type.avro.MyEnumType, expecting union*
>>   at
>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>   at
>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>   at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>   at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>   at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>   at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>   ... 9 more
>
> The reason I use the STRING type is just for fast-prototyping.
>
> While reading through [1], I've been thinking about using *RAW('class',
> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> 

Re: OVER IGNORE NULLS support

2021-10-11 Thread Francesco Guardiani
Seems like IGNORE NULL is specified in SQL spec 2008 (paragraph 6.10), the
opposite is called RESPECT NULLS:

 ::=
RESPECT NULLS | IGNORE NULLS

Perhaps this is worth supporting, I've opened an issue for that FLINK-24499
. @Adrian are you
interested in contributing to this issue?



On Sat, Oct 9, 2021 at 4:32 AM Caizhi Weng  wrote:

> Hi!
>
> Currently all built-in aggregate functions ignore null input values, so I
> guess this is the reason why Flink didn't support this syntax.
>
> I'm sort of curious about this syntax. Does it come from the SQL standard?
> What's the opposite of IGNORE NULLS? Is there a NOT IGNORE NULLS and if the
> user specifies this an exception will be thrown when a null value is
> encountered?
>
> Adrian Bednarz  于2021年10月8日周五 下午9:22写道:
>
>> Hi,
>>
>> we've been trying to run a query similar to
>> SELECT id, type, LAG(id) IGNORE NULLS OVER (PARTITION BY type ORDER BY
>> ts) AS lastId
>>   FROM Events
>>
>> A query without IGNORE NULLS clause executes just fine. This syntax is
>> supported by Calcite and our clients expect it to work. Our platform uses
>> FlinkSQL to execute certain types of queries and currently such syntax
>> causes jobs to fail with NPE. Here's a stack trace
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. null
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
>> at com.example.OverIgnoreNullsJob.main(OverIgnoreNullsJob.java:37)
>> Caused by: java.lang.NullPointerException
>> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>> at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:67)
>> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:530)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at
>> org.apache.calcite.sql.type.SqlTypeUtil.deriveType(SqlTypeUtil.java:178)
>> at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:71)
>> at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:122)
>> at
>> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
>> at
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>> at
>> org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>> at
>> 

Re: Built-in functions to manipulate MULTISET type

2021-09-21 Thread Francesco Guardiani
Hi, for type strategies you can check out
org.apache.flink.table.types.inference.InputTypeStrategies. They are pretty
extensive and widely covers most use cases. In your case, this function
probably requires the COMMON type strategy. If you want to roll out your
own type inference, look at the
org.apache.flink.table.types.inference.strategies package, you can find all
the type strategies we have inside it.

On Mon, Sep 20, 2021 at 4:39 PM Seth Wiesman  wrote:

> The type strategy can be generic over the input and output types, so you
> can write something generic that say given a multiset of some type T this
> function returns an array of some type T. This is the exact same logic
> built-in functions use and is just as expressive as anything Flink could
> provide.
>
> Seth
>
> On Mon, Sep 20, 2021 at 1:26 AM Kai Fu  wrote:
>
>> Hi Seth,
>>
>> This is really helpful and inspiring, thank you for the information.
>>
>> On Sun, Sep 19, 2021 at 11:06 PM Seth Wiesman 
>> wrote:
>>
>>> Hi,
>>>
>>> I agree it would be great to see these functions built-in, but you do
>>> not need to write a UDF for each type. You can overload a UDFs type
>>> inference and have the same capabilities as built-in functions, which means
>>> supporting generics.
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
>>>
>>> On Sat, Sep 18, 2021 at 7:42 AM Yuval Itzchakov 
>>> wrote:
>>>
 Hi Jing,

 I recall there is already an open ticket for built-in aggregate
 functions

 On Sat, Sep 18, 2021, 15:08 JING ZHANG  wrote:

> Hi Yuval,
> You could open a JIRA to track this if you think some functions should
> be added as built-in functions in Flink.
>
> Best,
> JING ZHANG
>
> Yuval Itzchakov  于2021年9月18日周六 下午3:33写道:
>
>> The problem with defining a UDF is that you have to create one
>> overload per key type in the MULTISET. It would be very convenient to 
>> have
>> functions like Snowflakes ARRAY_AGG.
>>
>> On Sat, Sep 18, 2021, 05:43 JING ZHANG  wrote:
>>
>>> Hi Kai,
>>> AFAIK, there is no built-in function to extract the keys in MULTISET
>>> 
>>>  to
>>> be an ARRAY. Define a UTF is a good solution.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Kai Fu  于2021年9月18日周六 上午7:35写道:
>>>
 Hi team,

 We want to know if there is any built-in function to extract the
 keys in MULTISET
 
 to be an ARRAY. There is no such function as far as we can find, 
 except to
 define a simple wrapper UDF for that, please advise.

 --
 *Best wishes,*
 *- Kai*

>>>
>>
>> --
>> *Best wishes,*
>> *- Kai*
>>
>


Fwd: Understading Flink statefun deployment

2020-06-09 Thread Francesco Guardiani
Hi everybody,
I'm quite new to Flink and Flink Statefun and I'm trying to understand the
deployment techniques on k8s.
I wish to understand if it's feasible to deploy a statefun project
separating the different functions on separate deployments (in order to
have some functions as remote and some as embedded) all connected to the
same master. The idea is that I can scale the deployments independently
using the Kubernetes HPA and these instances cooperate automatically using
the same master. For example, given a flow like kafka -> fn a -> fn b ->
kafka:

* Remote function A (plus ingress) in deployment fn-a, where the function
process is deployed as another container in the same pod
* embedded function B (plus egress) in deployment fn-b
* master deployment in flink-master

Does that make sense at all in Flink architecture? If it's feasible, do you
have any example?

FG

-- 
Francesco Guardiani
Website: https://slinkydeveloper.com/
Twitter: https://twitter.com/SlinkyGuardiani

Github: https://github.com/slinkydeveloper