Hi,
Since currently State Processor doesn’t support window states, what’s the
recommended way to debug window states?
The background is that I have a SQL job whose states, mainly window aggregation
operator states, keep increasing steadily.
The things I’ve checked:
- Outputs are as expected.
-
Thank you Xingbo, this will certainly help!
On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang wrote:
> Hi Manas,
>
> I have created a issue[1] to add related doc
>
> [1] https://issues.apache.org/jira/browse/FLINK-18598
>
> Best,
> Xingbo
>
> Manas Kale 于2020年7月14日周二 下午4:15写道:
>
>> Thank you for the
I think this might be a bug in `tableEnv.fromValues`.
Could you try to remove the DataType parameter, and let the framework
derive the types?
final Table inputTable = tableEnv.fromValues(
Row.of(1L, "Hello"), //
Row.of(2L, "Hello"), //
Row.of(3L, ""), //
Row.of(4L,
Hi, Flavio
I reproduced your issue, and I think it should be a bug. But I’m not sure it
comes from Calcite or Flink shaded Calcite, Flink Table Planner module shaded
calcite.
Maybe Danny can help explain more.
CC: Danny
Best
Leonard Xu
> 在 2020年7月14日,23:06,Flavio Pompermaier 写道:
>
> If I
Hi Manas,
If you want to return a RowType in Python UDF, you can use Row Class which
extends from python tuple.
You can use the following statement to import Row : from pyflink.table
import Row
Best,
Xingbo
Manas Kale 于2020年7月6日周一 下午8:08写道:
> I also tried doing this by using a User Defined Func
Hi Manas,
I have created a issue[1] to add related doc
[1] https://issues.apache.org/jira/browse/FLINK-18598
Best,
Xingbo
Manas Kale 于2020年7月14日周二 下午4:15写道:
> Thank you for the quick reply Xingbo!
> Is there some documented webpage example that I can refer to in the
> future for the latest p
First of all, mongodb itself does not support transactions. But you can
init a buffer to save each row when transaction begins, and save all the
buffered records to database once when transaction completes.
C DINESH 于2020年7月15日周三 上午9:28写道:
> Hello all,
>
> Can we implement TwoPhaseCommitPro
Hi Jesse,
For how to add jar packages, you can refer to the Common Questions doc[1]
of PyFlink. PyFlink 1.10 and 1.11 have some differences in the way of
adding jar packages which the document has a detailed introduction
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/p
Hello all,
Can we implement TwoPhaseCommitProtocol for mongodb to get EXACTLY_ONCE
semantics. Will there be any limitation for it?
Thanks,
Dinesh.
Hello Guys,
I am trying to launch a FLINK app on a distance server, but I have this
error message.
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 8bf7f299746e051ea7b94afd07e2
Hello Flink, I have some questions regarding to the guideline on configuring
restart strategy.
I was testing a job with the following setup:
1. There are many tasks, but currently I'm running with only 2 parallelism,
but plenty of task slots (4 TM and 4 task slot in each TM).
2. It's ran
Figured it out, pulled StructuredOptionsSplitter into a debugger and was able
to get it working with:
-Dkubernetes.jobmanager.annotations="\"KEY:\"\"V:A:L:U:E\"\"\""
Best
kevin
I am trying to read from kafka using pyflink table API on EMR 5.30, Flink 1.10
(and I have reproduced the error with 1.11). I am getting an error the
following error using either `flink run -py` or pyflink-shell.sh (the error
message below was generated in the pyflink shell):
>>> Kafka()
Traceb
Hi,
How do we escape : in map type param?
For example, we have:
-Dkubernetes.jobmanager.annotations=KEY:V:A:L:U:E
Which should result in {“KEY”: “V:A:L:U:E”}.
Best,
kevin
Hello!
We are experiencing occasional backpressure on a Window function in our
pipeline. The window is on a KeyedStream and is triggered by an
EventTimeSessionWindows.withGap(Time.seconds(30)). The prior step does a fanout
and we use the window to sort things into batches based on the Key for t
Thanks for your help. I had to create a dependency like this to reference
the flink-avro module found in the flink-formats pom. Learn something new
every day.
org.apache.flink
flink-avro
1.11.0
provided
On Tue, Jul 14, 2020 at 11:43 AM Chesnay Schepler
wrote:
> flink-formats is a pom
Hi Antonio,
Yes, you are right. Revisiting your question, I'm wondering whether it's
possible to partition speeds and nodes in the same way (stably across
iterations)? (I'm assuming a distributed setup)
If not, each iteration would have to wait for *all* subtasks of the
previous iteration to finis
flink-formats is a pom artifact, meaning that there are no jars for it.
You should add a dependency to the specific format(s) you are interested
in, like flink-formats-csv.
On 14/07/2020 17:41, Daves Open wrote:
I added flink-formats to my pom.xml file, but the jar files are not
found. I che
I added flink-formats to my pom.xml file, but the jar files are not found.
I checked in mvnrepository and the pom entry exists, but there are no
files. Can you please confirm that the jars for flink-format are available
in the repos? The following is my pom entry:
org.apache.flink
flink
If I use
final Table inputTable = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("col1", DataTypes.STRING().notNull()),
DataTypes.FIELD("col2", DataTypes.STRING().notNull())
), ..
tableEnv.executeSql(//
"CREATE TABLE `out` (" +
"co
Sorry, obviously " 'format' = 'parquet'" + is without comment :D
On Tue, Jul 14, 2020 at 4:48 PM Flavio Pompermaier
wrote:
> Hi to all,
> I'm trying to test write to parquet using the following code but I have an
> error:
>
> final TableEnvironment tableEnv =
> DatalinksExecutionEnvironment.ge
Hi to all,
I'm trying to test write to parquet using the following code but I have an
error:
final TableEnvironment tableEnv =
DatalinksExecutionEnvironment.getBatchTableEnv();
final Table inputTable = tableEnv.fromValues(//
DataTypes.ROW(//
DataTypes.FIELD("col1", DataTyp
Thanks for the quick replies Dawid and Leonard... I had both flink-json
JARs for 1.10 and 1.11. I deleted 1.10 and now it works!
On Tue, Jul 14, 2020 at 4:17 PM Leonard Xu wrote:
> Hi,Kale
>
> I think you’re using correct TIMESTAMP Data type in JSON format, and this
> should work properly.
> But
I'm getting this error when creating a savepoint. I've read in
https://issues.apache.org/jira/browse/FLINK-16193 that it's caused by
unstable hashcode or equals on the key, or improper use of
reinterpretAsKeyedStream.
My key is a string and I don't use reinterpretAsKeyedStream, so what's
going on?
Hi,Kale
I think you’re using correct TIMESTAMP Data type in JSON format, and this
should work properly.
But looks like you used an old version `flink-json` dependency from the log.
Could you check the version of `flink-json` is 1.11.0 ?
Best,
Leonard Xu
> 在 2020年7月14日,18:07,Manas Kale 写道:
>
Forget my previous message. This is most probably some class conflict.
The SQL_TIMESTAMP_FORMAT field was added in 1.11. It looks as if you
were using old version of the TimeFormats class from an earlier version
of Flink.
Best,
Dawid
On 14/07/2020 12:07, Manas Kale wrote:
> Hi,
> I am trying to
Can you try changing the precision to 6 or try changing the format in
json to produce only 3 fractional digits? As described in the JSON
docs[1] the expected default format for timestamp is: -MM-dd
HH:mm:ss.s{precision}
Best,
Dawid
[1]
https://ci.apache.org/projects/flink/flink-docs-release-
Hi,
I am trying to parse this JSON message:
{"monitorId": 789, "deviceId": "abcd", "data": 144.0, "state": 2,
"time_st": "2020-07-14 15:15:19.60"}
using pyFlink 1.11 DDL with this code:
ddl_source = f"""
CREATE TABLE {INPUT_TABLE} (
`monitorId` STRING,
`deviceId` STRING,
Hi Congxian, sorry for the late reply.
I'm using the filesystem with an S3 path as the default state backend in
flink-conf.yml (state.backend: filesystem).
The Flink version I'm using is 1.10.1.
By "The task manager did not clean up the state", I mean what the
taskmanager was writing on disk the
Thank you for the quick reply Xingbo!
Is there some documented webpage example that I can refer to in the future
for the latest pyFlink 1.11 API? I couldn't find anything related to
awaiting asynchronous results.
Thanks,
Manas
On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang wrote:
> Hi Manas,
>
>
Hi Manas,
I tested your code, but there are no errors. Because execute_sql is an
asynchronous method, you need to await through TableResult, you can try the
following code:
from pyflink.datastream import StreamExecutionEnvironment,
TimeCharacteristic
from pyflink.table import StreamTableEnviron
Hi Roman,
Thank you for your quick reply, but as far as I know broadcast variables
cannot be written, my problem is that I need to update the value of the
speed variable to use it in the next iteration.
Iterate only has one input dataset and propagates it to the next iteration
using closeWith(),
Hi Manas,
Yes, this is a bug which I have also encountered in the Descriptor API, but
I don't found the corresponding issue. You can create an issue to report
this problem. There are similar bugs in the current descriptor API, so DDL
is more recommended way. Now the community has started a discuss
Hi,
I am trying to get a simple streaming job running in pyFlink and understand
the new 1.11 API. I just want to read from and write to kafka topics.
Previously I was using t_env.execute("jobname"), register_table_source()
and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
34 matches
Mail list logo