Parsing a JSON array string as a Flink SQL Array data type

2022-07-12 Thread Abhishek Rai
Hello! I'm trying to use the new JSON functions in Flink 1.15 to parse JSON input data. In particular, using JSON_QUERY, I'm able to extract out JSON array elements from a larger JSON record. However, this function returns the JSON array as a string. I'd like to run this array through the SQL

Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) java.base@11.0.11/java.lang.Thread.run(Unknown Source) On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai wrote: > Thanks Yangze, indeed, I see the following in the log about 10s before the > final crash (masked some sensitive data using `

Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
hek, > > Do you see something like "Fatal error occurred while executing the > TaskManager" in your log or would you like to provide the whole task > manager log? > > Best, > Yangze Guo > > On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai > wrote: > > > >

Flink taskmanager in crash loop

2021-08-16 Thread Abhishek Rai
Hello, In our production environment, running Flink 1.13 (Scala 2.11), where Flink has been working without issues with a dozen or so jobs running for a while, Flink taskmanager started crash looping with a period of ~4 minutes per crash. The stack trace is not very informative, therefore

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-02-04 Thread Abhishek Rai
I had a similar need recently and ended up using KafkaDeserializationSchemaWrapper to wrap a given DeserializationSchema. The resulting KafkaDeserializationSchema[Wrapper] can be passed directly to the `FlinkKafkaConsumer` constructor. ``` class BoundingDeserializationSchema extends

Re: Publishing a table to Kafka

2021-01-20 Thread Abhishek Rai
Thanks Leonard, we are working towards 1.12 upgrade and should be able to try upsert-kafka after that. > Your first workaround should have been worked, but looks like an exception > was thrown in Type conversion phase, could you share you table schema and > query that can reproduce the issue.

Publishing a table to Kafka

2021-01-13 Thread Abhishek Rai
Hello, I'm using Flink 1.11.2 where I have a SQL backed `Table` which I'm trying to write to Kafka. I'm using `KafkaTableSourceSinkFactory` which ends up instantiating a table sink of type `KafkaTableSink`. Since this sink is an `AppendStreamTableSink`, I cannot write to it using a generic table

Re: Snowflake access through JDBC

2021-01-13 Thread Abhishek Rai
/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.html > [4] > https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcInputFormat.html > > Regards, > Roman > > > On Fri, Dec 18, 2020 at 4:55 PM Abh

Snowflake access through JDBC

2020-12-18 Thread Abhishek Rai
Hello, I'm trying to create a `StreamTableSource` for Snowflake using `JdbcTableSourceSinkFactory.createStreamTableSource` (in package org.apache.flink.connector.jdbc.table) but it fails with the following error message due to `JdbcDialects` not having a dialect for Snowflake. My goal is to

Restore from savepoint through Java API

2020-06-11 Thread Abhishek Rai
Hello, I'm writing a test for my custom sink function. The function is stateful and relies on checkpoint restores for maintaining consistency with the external system that it's writing to. For integration testing of the sink function, I have a MiniCluster based environment inside a single JVM