Re: Socket stream source in Python?

2022-01-30 Thread Philippe Rigaux

Thank you so much. 

> Le 31 janv. 2022 à 01:11, Francis Conroy  a 
> écrit :
> 
> Hi Philippe,
> after checking the source Flink master I think you're right, there is 
> currently no binding from python to Flink socketTextStream (via py4j) in 
> pyFlink. The py4j interface isn't too complicated to modify for some tasks 
> and I suspect that it should be fairly trivial to extend pyflink to support 
> this. I imagine that you could take   
> read_text_file
> in 'stream_execution_environment.py' as a starting point.
> Happy to provide some more information on this if you'd like.
> 
> Kind regards,
> Francis
> 
> On Sat, 29 Jan 2022 at 01:20, Philippe Rigaux  > wrote:
> Hi there
> 
> I would like to use a socket stream as input for my Flink workflow in Python. 
> This works in scala with the socketTextStream() method, for instance
> 
> val stream = senv.socketTextStream("localhost", 9000, '\n')
> 
> I cannot find an equivalent in PyFlink, although it is briefly mentioned in 
> the documentation. 
> 
> Any help is much appreciated.
> 
> Philippe
> 
> 
> This email and any attachments are proprietary and confidential and are 
> intended solely for the use of the individual to whom it is addressed. Any 
> views or opinions expressed are solely those of the author and do not 
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
> received this email in error, please let us know immediately by reply email 
> and delete it from your system. You may not use, disseminate, distribute or 
> copy this message nor disclose its contents to anyone. 
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia



Re: Socket stream source in Python?

2022-01-30 Thread Francis Conroy
 Hi Philippe,
after checking the source Flink master I think you're right, there is
currently no binding from python to Flink socketTextStream (via py4j) in
pyFlink. The py4j interface isn't too complicated to modify for some tasks
and I suspect that it should be fairly trivial to extend pyflink to support
this. I imagine that you could take
read_text_file
in 'stream_execution_environment.py' as a starting point.
Happy to provide some more information on this if you'd like.

Kind regards,
Francis

On Sat, 29 Jan 2022 at 01:20, Philippe Rigaux 
wrote:

> Hi there
>
> I would like to use a socket stream as input for my Flink workflow in
> Python. This works in scala with the socketTextStream() method, for instance
>
> val stream = senv.socketTextStream("localhost", 9000, '\n')
>
> I cannot find an equivalent in PyFlink, although it is briefly mentioned
> in the documentation.
>
> Any help is much appreciated.
>
> Philippe
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: MAP data type (PyFlink)

2022-01-30 Thread Francis Conroy
 Hi Philippe,

I don't think it's quite that simple unfortunately. A python dict can map
from any hashable type to any value, however the 'equivalent' POJO, 'Map'
in this case, requires all key types to be the same and all value types to
be the same. You cannot specify multiple types for the key or value in one
map object. This makes me think that the Map is not actually what you are
looking for.
If you have multiple named fields with different types you could use a Row,
for these you specify a name and a value type for each column in the row.
I'm very green in the Flink/Java area but I hope that at least gives you
something to move forward with for now.

On Sat, 29 Jan 2022 at 02:12, Philippe Rigaux 
wrote:

> Hello
>
> I want to send and receive dict Python values. According to the PyFlink
> doc, this is specified with Types.MAP(). Unfortunately I
> found no example of the required arguments,  and I am stuck with the
> following error:
>
> TypeError: MAP() missing 2 required positional arguments: 'key_type_info'
> and 'value_type_info'
>
> How should I specify for instance the type for {‘url’: ‘’, ‘count’: 2}
> ?
>
> Thanks for your help.
>
> Philippe
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: How to get FileName when using FileSink in Flink

2022-01-30 Thread Guowei Ma
Hi,Kartik

FileSink does not expose the file name to the user now. 
Would you like to share your scenario ,which needs the file name?

Best,
Guowei


发自我的iPhone

> 在 2022年1月30日,下午6:38,Kartik Khare  写道:
> 
> Hi,
> For my use case, I want to get the part file name that is being created in 
> the HDFS when using the file sink. I saw the code and can’t find a way to 
> extend one of the existing Bucket classes to achieve this. The thing I am 
> looking for is `assemblePartFilePath` functions output.
> 
> Regards,
> Kartik


Re: Inaccurate checkpoint trigger time

2022-01-30 Thread Yun Tang
Hi Paul,

I think Robert's idea might be right.

>From the log you pasted, the checkpoint interval is 2m30s. Chk-5 triggered at 
>16:42:23 and completed at 16:42:42.
In the normal case, chk-6 would be triggered near 16:44:53. However, the actual 
chk-6 triggered at 16:46:02, which is obviously not normal case.

I think your analysis is not correct due to the log below:
2022-01-27 16:46:02,693 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 6 (type=CHECKPOINT) @ 1643273162422 for job 
3a57fdaa16502c411a46471bba595d7c.

If you translate the unix time 1643273162422 to Beijing time, you can see this 
is actually 2022-01-27 16:46:02.
If the Zookeeper is really slow to respond, the unix time should be much 
earlier than the logged time [1].

Flink has been improved that checkpointing would be backpressured on slow 
cleanup [2].



[1] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L540
[2] https://issues.apache.org/jira/browse/FLINK-17073

Best
Yun Tang




From: Robert Metzger 
Sent: Friday, January 28, 2022 21:53
To: Paul Lam 
Cc: Yun Tang ; user 
Subject: Re: Inaccurate checkpoint trigger time

Hi Paul,

where are you storing your checkpoints, and what's their size?

IIRC, Flink won't trigger a new checkpoint before the old ones haven't been 
cleaned up, and if your checkpoints are large and stored on S3, it can take a 
while to clean them up (especially with the Hadoop S3 plugin, using presto s3 
is faster).




On Thu, Jan 27, 2022 at 10:56 AM Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Hi Yun,

Sorry for the late reply. I finally found some time to investigate this problem 
further. I upgraded the job to 1.14.0, but it’s still the same.

I’ve checked the debug logs, and I found that Zookeeper notifies watched event 
of checkpoint id changes very late [1]. Each time a checkpoint finished, it 
would take minutes before the Zookeeper client notices the checkpoint ID is 
changed.

I suspect the checkpoint coordinator is blocking on incrementing checkpoint ID 
on Zookeeper [2]. But with no luck, there’s no many relevant logs can help me 
prove that.

What do you think of this? Thanks a lot!

[1] https://gist.github.com/link3280/5072a054a43b40ba28891837a8fdf995
[2] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L743

Best,
Paul Lam

2021年11月23日 16:49,Paul Lam 
mailto:paullin3...@gmail.com>> 写道:

Hi Yun,

Thanks a lot for your pointers! I’ll try it out as you suggested and then get 
back to you.

Best,
Paul Lam

2021年11月23日 16:32,Yun Tang mailto:myas...@live.com>> 写道:

Hi Paul,

This is really weird, from what I know, flink-1.11.0 has a problem of handling 
min-pause time [1] and this should be resolved in flink-1.12.1.

Could you open the debug log level for org.apache.flink.runtime.checkpoint and 
use jmap or byteman to get the field value of 
CheckpointCoordinator#lastCheckpointCompletionRelativeTime, 
CheckpointRequestDecider#minPauseBetweenCheckpoints and 
SystemClock#relativeTimeMillis in method 
CheckpointRequestDecider#nextTriggerDelayMillis [2] to see any unexpected 
behavior.


[1] https://issues.apache.org/jira/browse/FLINK-18856
[2] 
https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L182


Best
Yun Tang


From: Paul Lam mailto:paullin3...@gmail.com>>
Sent: Tuesday, November 23, 2021 14:35
To: user mailto:user@flink.apache.org>>
Subject: Inaccurate checkpoint trigger time

Hi,

Recently I’ve noticed a job has nondeterministic checkpoint trigger time.

The jobs is using Flink 1.12.1 with FsStateBackend and is of 650 parallelism. 
It was configured to trigger checkpoint every 150 seconds with 0 pause time and 
no concurrent checkpoints. However there’re obvious errors in the checkpoint 
trigger times, as the actual interval may vary from 30 seconds to 6 minutes.

The jobmanager logs are good, and no error logs is found. Some of the output 
are as follow:

2021-11-23 13:51:46,438 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1446 for job f432b8d90859db54f7a79ff29a563ee4 (47142264825 bytes in 
22166 ms).
2021-11-23 13:57:21,021 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1447 (type=CHECKPOINT) @ 1637647040653 for job 
f432b8d90859db54f7a79ff29a563ee4.
2021-11-23 13:57:43,761 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1447 for job f432b8d90859db54f7a79ff29a563ee4 (46563195101 bytes in 
21813 ms).
2021-11-23 13:59:09,387 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 

Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-30 Thread David Anderson
I agree.

The Twitter connector is used in a few (unofficial) tutorials, so if we
remove it that will make it more difficult for those tutorials to be
maintained. On the other hand, if I recall correctly, that connector uses
V1 of the Twitter API, which has been deprecated, so it's really not very
useful even for that purpose.

David



On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I would like to discuss deprecating Flinks' Twitter connector [1]. This
> was one of the first connectors that was added to Flink, which could be
> used to access the tweets from Twitter. Given the evolution of Flink over
> Twitter, I don't think that:
>
> * Users are still using this connector at all
> * That the code for this connector should be in the main Flink codebase.
>
> Given the circumstances, I would propose to deprecate and remove this
> connector. I'm looking forward to your thoughts. If you agree, please also
> let me know if you think we should first deprecate it in Flink 1.15 and
> remove it in a version after that, or if you think we can remove it
> directly.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>
>


Re: Stack Overflow Question - Deserialization schema for multiple topics

2022-01-30 Thread David Anderson
Hussein,

To use a JsonRowDeserializationSchema you'll need to use the Table API, and
not DataStream.

You'll want to use a JsonRowSchemaConverter to convert your json schema
into the TypeInformation needed by Flink, which is done for you by
the JsonRowDeserializationSchema builder:

json_row_schema =
JsonRowDeserializationSchema.builder().json_schema(json_schema_string).build()

Given that schema, you can pass it to constructor for the kafka consumer:

kafka_consumer = FlinkKafkaConsumer("stream-source", json_row_schema,
kafka_props)

To read from 3 different topics, you can either instantiate three different
sources, or specify that a single source is to be used to read from
multiple topics, which you can do by passing a list of strings as the
topics parameter.

Regards,
David

On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul 
wrote:

> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer


How to get FileName when using FileSink in Flink

2022-01-30 Thread Kartik Khare
Hi,
For my use case, I want to get the part file name that is being created in the 
HDFS when using the file sink. I saw the code and can’t find a way to extend 
one of the existing Bucket classes to achieve this. The thing I am looking for 
is `assemblePartFilePath` functions output.

Regards,
Kartik