Re: Broadcasting control messages to a sink

2020-10-16 Thread Jaffe, Julian
Hey Piotr,

Thanks for your help! The main thing I was missing was the .broadcast partition 
operation on a stream (searching for “broadcasting” obviously brought up the 
broadcast state pattern). This coupled with my misunderstanding of an error in 
my code as being an error in Flink code resulted in me making this a much 
harder problem than it needed to be.

For anyone who may find this in the future, Piotr’s suggestion is pretty 
spot-on. I wound up broadcasting (as in the partitioning strategy) my schema 
stream and connecting it to my event stream. I then processed those using a 
CoProcessFunction, using the schema messages to update the parsing for the 
events. I also emitted a side output message when I processed a new schema, 
using the same type as my main output messages. I once again 
broadcast-as-in-partitioning the side output stream, unioned it with my 
processed output from the CoProcessFunction and passed it to my sink, making 
sure to handle control messages before attempting to do any bucketing.

In poor ASCII art, it looks something like the below:


___   
| Schema Source || Event Source |
---  ---
  | |
   Broadcast |
  |__   |
   - | Processor | ---
  |  | ---   • Control message 
side output
   ---   |
 |  |
 |   Broadcast
 |  |
Union  --
 |
  ___
 |   Sink   |
  ---

I hope this is helpful to someone.

Julian

From: Piotr Nowojski 
Date: Wednesday, October 14, 2020 at 11:22 PM
To: "Jaffe, Julian" 
Cc: "user@flink.apache.org" 
Subject: Re: Broadcasting control messages to a sink

Hi Julian,

I think the problem is that BroadcastProcessFunction and SinkFunction will be 
executed by separate operators, so they won't be able to share state. If you 
can not split your logic into two, I think you will have to workaround this 
problem differently.

1. Relay on operator chaining and wire both of them together.

If you set up your BroadcastProcessFunction and SinkFunction one after another, 
with the same parallelism, with the default chaining, without any 
rebalance/keyBy in between, you can be sure they will be chained together. So 
the output type of your record between BroadcastProcessFunction and 
SinkFunction, can be a Union type, of a) your actual payload, b) broadcasted 
message. Upon initialization/before processing first record, if you have any 
broadcast state, you would need to forward it's content to the downstream 
SinkFunction as well.

2. Another solution is that maybe you can try to embed SinkFunction inside the 
BroadcastProcessFunction? This will require some careful proxying and wrapping 
calls.
3. As always, you can also write a custom operator that will be doing the same 
thing.

For the 2. and 3. I'm not entirely sure if there are some gotchas that I 
haven't thought through (state handling?), so if you can make 1. work for you, 
it will probably be a safer route.

Best,
Piotrek




śr., 14 paź 2020 o 19:42 Jaffe, Julian 
mailto:julianja...@activision.com>> napisał(a):
Thanks for the suggestion Piotr!

The problem is that the sink needs to have access to the schema (so that it can 
write the schema only once per file instead of record) and thus needs to know 
when the schema has been updated. In this proposed architecture, I think the 
sink would still need to check each record to see if the current schema matches 
the new record or not? The main problem I encountered when playing around with 
broadcast state was that I couldn’t figure out how to access the broadcast 
state within the sink, but perhaps I just haven’t thought about it the right 
way. I’ll meditate on the docs further  

Julian

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Date: Wednesday, October 14, 2020 at 6:35 AM
To: "Jaffe, Julian" 
mailto:julianja...@activision.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Broadcasting control messages to a sink

Hi Julian,

Have you seen Broadcast State [1]? I have never used it personally, but it 
sounds like something you want. Maybe your job should look like:

1. read raw messages from Kafka, without using the schema
2. read schema changes and broadcast them to 3. and 5.
3. deserialize kafka records in BroadcastProcessFunction by using combined 1. 
and 2.
4. do your logic o
5. serialize records using schema in another BroadcastProcessFunction 

PyFlink:

2020-10-16 Thread Schneider, Thilo
Dear list,

I’m having my first go at using Flink and quickly stumbled over a problem I 
find no easy way around. I hope you may help me.

I try to read an avro encoded kafka topic. Doing so, I do get a 
NoClassDefFoundError. Org.apache.avro.SchemaBuilder could not be found, but 
this should be included in the provided avro-1.9.2.jar. The jar is correctly 
picked up, as I do get “java.lang.ClassNotFoundException: 
org.apache.avro.generic.IndexedRecord” if I remove that dependency.

[…]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.avro.SchemaBuilder
at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:240)
at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:179)
at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.open(AvroRowDataDeserializationSchema.java:136)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:694)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

The code I am using is the following:

env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)

jars = ["flink-sql-connector-kafka_2.11-1.11.2.jar",
"flink-avro-1.11.2-sql-jar.jar",
"avro-1.9.2.jar"]

jar_base_path = "file:///path/to/my/jars/"
table_env.get_config().get_configuration().set_string('pipeline.jars', 
';'.join([jar_base_path + j for j in jars]))

table_env.execute_sql("""
CREATE TABLE test (
a STRING,
b INT,
c TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'my_topic',
'properties.bootstrap.servers' = 'kafka.broker.com:9192',
'properties.group.id' = 'something',
'format' = 'avro'
)
""")

test = table_env.from_path('test')
test.to_pandas()

Any help would be appreciated.

Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


Re: Flink Kubernetes / Helm

2020-10-16 Thread Austin Cawley-Edwards
We use the Ververica Platform and have built an operator for it here[1] :)
and use Helm with it as well.

Best,
Austin

[1]: https://github.com/fintechstudios/ververica-platform-k8s-operator


On Fri, Oct 16, 2020 at 3:12 PM Dan Hill  wrote:

> What libraries do people use for running Flink on Kubernetes?
>
> Some links I've found:
>
>- Flink official documentation
>
> 
>- Ververica documentation
>
>- https://github.com/lightbend/flink-operator
>- https://github.com/riskfocus/helm-charts-public
>- https://github.com/docker-flink/examples
>- different K8 proposal
>
> 
>
>


Flink Kubernetes / Helm

2020-10-16 Thread Dan Hill
What libraries do people use for running Flink on Kubernetes?

Some links I've found:

   - Flink official documentation
   

   - Ververica documentation
   
   - https://github.com/lightbend/flink-operator
   - https://github.com/riskfocus/helm-charts-public
   - https://github.com/docker-flink/examples
   - different K8 proposal
   



Un-ignored Parsing Exceptions in the CsvFormat

2020-10-16 Thread Austin Cawley-Edwards
Hey all,

I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].

Even with the `ignoreParseErrors()` set, the job fails when it encounters
some types of malformed rows. The root cause is indeed a `ParseException`,
so I'm wondering if there's anything more I need to do to ignore these
rows. Each field in the schema is a STRING.


I've configured the CSV format and table like so:

tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()


Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
to `isLenient()` if there is an unexpected parser position?[2]

Example error:

2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: null
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser
position for column 1 of row '",
https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
""company,'
at
org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)


Thanks,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
[2]:
https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206


ZooKeeper connection SUSPENDING

2020-10-16 Thread Kenzyme
Hi,

Related to 
https://mail-archives.apache.org/mod_mbox/flink-dev/201709.mbox/%3CCA+faj9yvPyzmmLoEWAMPgXDP6kx+0oed1Z5k4s3K9sgiCFyb=w...@mail.gmail.com%3E
 and https://issues.apache.org/jira/browse/FLINK-10052, I was wondering if 
there's a way to prevent Flink instances from failing while doing a rolling 
restart on ZK followers while still keeping the quorum?

This is what was shown in Flink logs while restarting ZK :
ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
monitored (temporarily).

I was able to reproduce this twice with a quorum of 5 ZK nodes while doing some 
ZK maintenance.

Thanks!

Kenzyme Le

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-16 Thread Till Rohrmann
Hi Dan,

I think it is a good idea to use an exponential backoff strategy in the
RpcGatewayRetriever. So from my side you can open an issue and a PR for
fixing it.

Cheers,
Till

On Fri, Oct 16, 2020 at 7:24 PM Dan Hill  wrote:

> To be clear, I'd be fine coding this.
>
> On Fri, Oct 16, 2020 at 9:35 AM Dan Hill  wrote:
>
>> Makes sense.  Thanks for the details!
>>
>> I just looked into it.  It's this code
>> 
>>  in
>> this diff
>> 
>> from ~2 years ago.
>>
>> +Till - would you be fine if we change this?  Context: I was able to
>> speed up my test by writing my own future.  I think 20ms retry is long when
>> the test is simple.  One idea is to introduce an exponential backoff up to
>> some max (e.g. start at 2ms, 4ms, 8ms, 16ms, 20ms, 20ms).
>>
>> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
>> commonRpcService,
>> DispatcherGateway.class,
>> DispatcherId::fromUuid,
>> 20,
>> Time.milliseconds(20L));
>> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
>> commonRpcService,
>> ResourceManagerGateway.class,
>> ResourceManagerId::fromUuid,
>> 20,
>> Time.milliseconds(20L));
>>
>>
>> On Fri, Oct 16, 2020 at 2:04 AM Aljoscha Krettek 
>> wrote:
>>
>>> I think it's because there's just so many layers involved and so many
>>> futures that this passes through that it takes some time. Plus it's not
>>> a critical path so no-one took the time to optimize it.
>>>
>>> There's no one single person that I could point you towards because it's
>>> a mix of older code and newer code that has grown over time and
>>> different people have worked on it.
>>>
>>> On 15.10.20 01:56, Dan Hill wrote:
>>> > I think it was MiniClusterJobClient.
>>> >
>>> > Each wait call slowed down by about 20-40ms.  It added about 3% to my
>>> total
>>> > test runtime.  I ended up using getJobStatus because it's cleaner
>>> (better
>>> > than optimizing for latency now).  I was curious what the future
>>> > implementation did (e.g. does it just sleep?) and if it's configurable.
>>> >
>>> > On Tue, Oct 13, 2020 at 5:28 AM Aljoscha Krettek 
>>> > wrote:
>>> >
>>> >> That's interesting!
>>> >>
>>> >> Which JobClient implementation is being used underneath? You're using
>>> >> the MiniCluster resource so it should be MiniClusterJobClient (or
>>> >> PerJobMiniClusterClient as it's called prior to Flink 1.12), right?
>>> >>
>>> >> Also, what does that second mean percentage-wise? Is it more a 1
>>> second
>>> >> improvement on a 60 second total runtime or 10 second total runtime.
>>> >>
>>> >>
>>> >> On 11.10.20 22:17, Dan Hill wrote:
>>> >>> -others.  Any idea who wrote this futures code?  I'm curious how it's
>>> >>> implemented.  My sleep version seems to finish my tests a few tens of
>>> >>> milliseconds faster per call (my overall test suite runs a second
>>> >> faster).
>>> >>> I tried diving deeper into this last night but, once I got a few
>>> layers
>>> >>> deeper, it made sense to ask about it.
>>> >>>
>>> >>> On Sat, Oct 10, 2020 at 10:37 AM Dan Hill 
>>> wrote:
>>> >>>
>>>  No, thanks!  I used JobClient to getJobStatus and sleep if it was
>>> not
>>>  terminal.  I'll switch to this.
>>> 
>>> 
>>>  On Sat, Oct 10, 2020 at 12:50 AM Aljoscha Krettek <
>>> aljos...@apache.org>
>>>  wrote:
>>> 
>>> > Hi Dan,
>>> >
>>> > did you try using the JobClient you can get from the TableResult to
>>> >> wait
>>> > for job completion? You can get a CompletableFuture for the
>>> JobResult
>>> > which should help you.
>>> >
>>> > Best,
>>> > Aljoscha
>>> >
>>> > On 08.10.20 23:55, Dan Hill wrote:
>>> >> I figured out the issue.  The join caused part of the job's
>>> execution
>>> > to be
>>> >> delayed.  I added my own hacky wait condition into the test to
>>> make
>>> >> sure
>>> >> the join job finishes first and it's fine.
>>> >>
>>> >> What common test utilities exist for Flink?  I found
>>> >> flink/flink-test-utils-parent.  I implemented a simple sleep loop
>>> to
>>> > wait
>>> >> for jobs to finish.  I'm guessing this can be done with one of the
>>> >> other
>>> >> utilities.
>>> >>
>>> >> Are there any open source test examples?
>>> >>
>>> >> How are watermarks usually sent with Table API in tests?
>>> >>
>>> >> After I collect some answers, I'm fine updating the Flink testing
>>> >> page.
>>> >>
>>> >
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs
>>> >>
>>> >> On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
>>> >> austin.caw...@gmail.com> wrote:
>>> >>
>>> >>> Can't comment on the SQL issues, but here's our exact setup 

Re: Effect of renaming a primary key

2020-10-16 Thread Rex Fenley
>Yes, I think Flink still can infer the primary key from renamed pk column,
the state and checkpoint does not effect by the rename action.

Thanks for the info, that's good news.

>but It’s unusual the data from Kafka is unique as I know

I'm using Debezium CDC so the PK is unique per row update. The PK needs to
be the same per row so that the state represented by that row gets updated
accordingly.

Thanks!

On Fri, Oct 16, 2020 at 12:29 AM Leonard Xu  wrote:

> Hi, Rex
>
>
> , won't that therefore change the key used to reference the state in
> RockDB and in a checkpoint for the associated table?
>
> How might this effect state storage and checkpointing?
>
> Will the pk that was renamed remain a key for state or is some other
> mechanism used to form a key?
>
>
> Yes, I think Flink still can infer the primary key from renamed pk column,
> the state and checkpoint does not effect by the rename action.
>
> BTW, Flink uses primary key in some optimizations,  but Flink does not
> check the primary key integrity because Flink does not own data like DB.
> So user should ensure the unique key integrity when define a primary key,
> but It’s unusual the data from Kafka is unique as I know,
> this may lead unexpected result.
>
> Best,
> Leonard
>
>
>
> tableEnv.executeSql("""
> CREATE TABLE topic_users (
> id BIGINT,
> deleted_at BIGINT,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> ...
> )
> """)
> val usersNotDeletedTable =
> tableEnv
> .from("topic_users")
> .select($"*")
> // Will PK automatically change to "users_user_id"?
> .renameColumns($"id".as("users_user_id"))
> .filter($"deleted_at".isNull)
>
> val membershipsNotDeletedTable =
> membershipsTable // This table also has "id" PK
> .join(
> usersNotDeletedTable,
> $"user_id" === $"users_user_id"
> )
> .dropColumns($"users_user_id")
>
> Thanks!
>
> --
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Error in my streamtableExample(java style),Thanks for your help

2020-10-16 Thread ??????
I want to write an StreamTableExample(java style)
of
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamTableExample.scala


My code is:
https://paste.ubuntu.com/p/WcBb7Rr6gb/


Could you tell me why my code is wrong?


The intellij tell me what I got is DataStreamSource, instead of DataStream,
How can I fix it?


Thanks for your help.

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-16 Thread Chesnay Schepler
@Seth: Earlier in this discussion it was said that the BucketingSink 
would not be usable in 1.12 .


On 10/16/2020 4:25 PM, Seth Wiesman wrote:

+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:


@Arvid Heise I also do not remember exactly what were all the
problems. The fact that we added some more bulk formats to the
streaming file sink definitely reduced the non-supported features. In
addition, the latest discussion I found on the topic was [1] and the
conclusion of that discussion seems to be to remove it.

Currently, I cannot find any obvious reason why keeping the
BucketingSink, apart from the fact that we do not have a migration
plan unfortunately. This is why I posted this to dev@ and user@.

Cheers,
Kostas

[1]
https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E

On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:

I remember this conversation popping up a few times already and I'm in
general a big fan of removing BucketingSink.

However, until now there were a few features lacking in StreamingFileSink
that are present in BucketingSink and that are being actively used (I

can't

exactly remember them now, but I can look it up if everyone else is also
suffering from bad memory). Did we manage to add them in the meantime? If
not, then it feels rushed to remove it at this point.

On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 

wrote:

@Chesnay Schepler  Off the top of my head, I cannot find an easy way
to migrate from the BucketingSink to the StreamingFileSink. It may be
possible but it will require some effort because the logic would be
"read the old state, commit it, and start fresh with the
StreamingFileSink."

On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
wrote:

On 13.10.20 14:01, David Anderson wrote:

I thought this was waiting on FLIP-46 -- Graceful Shutdown

Handling --

and

in fact, the StreamingFileSink is mentioned in that FLIP as a

motivating

use case.

Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.

Thanks

for the reminder, we should close FLIP-46 now with an explanatory
message to avoid confusion.


--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng





Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-16 Thread Seth Wiesman
+1 It has been deprecated for some time and the StreamingFileSink has
stabalized with a large number of formats and features.

Plus, the bucketing sink only implements a small number of stable
interfaces[1]. I would expect users to continue to use the bucketing sink
from the 1.11 release with future versions for some time.

Seth

https://github.com/apache/flink/blob/2ff3b771cbb091e1f43686dd8e176cea6d435501/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L170-L172

On Thu, Oct 15, 2020 at 2:57 PM Kostas Kloudas  wrote:

> @Arvid Heise I also do not remember exactly what were all the
> problems. The fact that we added some more bulk formats to the
> streaming file sink definitely reduced the non-supported features. In
> addition, the latest discussion I found on the topic was [1] and the
> conclusion of that discussion seems to be to remove it.
>
> Currently, I cannot find any obvious reason why keeping the
> BucketingSink, apart from the fact that we do not have a migration
> plan unfortunately. This is why I posted this to dev@ and user@.
>
> Cheers,
> Kostas
>
> [1]
> https://lists.apache.org/thread.html/r799be74658bc7e169238cc8c1e479e961a9e85ccea19089290940ff0%40%3Cdev.flink.apache.org%3E
>
> On Wed, Oct 14, 2020 at 8:03 AM Arvid Heise  wrote:
> >
> > I remember this conversation popping up a few times already and I'm in
> > general a big fan of removing BucketingSink.
> >
> > However, until now there were a few features lacking in StreamingFileSink
> > that are present in BucketingSink and that are being actively used (I
> can't
> > exactly remember them now, but I can look it up if everyone else is also
> > suffering from bad memory). Did we manage to add them in the meantime? If
> > not, then it feels rushed to remove it at this point.
> >
> > On Tue, Oct 13, 2020 at 2:33 PM Kostas Kloudas 
> wrote:
> >
> > > @Chesnay Schepler  Off the top of my head, I cannot find an easy way
> > > to migrate from the BucketingSink to the StreamingFileSink. It may be
> > > possible but it will require some effort because the logic would be
> > > "read the old state, commit it, and start fresh with the
> > > StreamingFileSink."
> > >
> > > On Tue, Oct 13, 2020 at 2:09 PM Aljoscha Krettek 
> > > wrote:
> > > >
> > > > On 13.10.20 14:01, David Anderson wrote:
> > > > > I thought this was waiting on FLIP-46 -- Graceful Shutdown
> Handling --
> > > and
> > > > > in fact, the StreamingFileSink is mentioned in that FLIP as a
> > > motivating
> > > > > use case.
> > > >
> > > > Ah yes, I see FLIP-147 as a more general replacement for FLIP-46.
> Thanks
> > > > for the reminder, we should close FLIP-46 now with an explanatory
> > > > message to avoid confusion.
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > 
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
>


hive streaning 问题请教

2020-10-16 Thread McClone
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().addConfiguration(
new Configuration()
.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(30)));
tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 
'org.example.flink.TestFunca' LANGUAGE JAVA");
tEnv.executeSql("CREATE TABLE datagen (\n" +
" name STRING,\n" +
" pass STRING,\n" +
" type1 INT,\n" +
" t1 STRING,\n" +
" t2 STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.type1.min'='1',\n" +
" 'fields.type1.max'='10'\n" +
")");

tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE TABLE hive_table (\n" +
"  user_id STRING,\n" +
"  order_amount STRING\n" +
") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet 
TBLPROPERTIES (\n" +
"  'sink.partition-commit.trigger'='partition-time',\n" +
"  'sink.partition-commit.delay'='1 h',\n" +
"  'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");

tEnv.executeSql("insert into hive_table select 
t1,t2,TestFunca(type1),TestFunca(type1) from datagen");

Caused by: org.apache.flink.table.api.ValidationException: Table options do not 
contain an option key 'connector' for discovering a connector.
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
发送自 Windows 10 版邮件应用



Flink SQL 1.11如何获取执行计划 & StreamGraph

2020-10-16 Thread zilong xiao
1.11好像改了接口,用StreamExecutionEnvironment.getExecutionPlan()会报"No operators
defined in streaming topology. Cannot execute." 1.10是可以正常执行的


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-16 Thread Till Rohrmann
Done, you are assigned now Weike.

Cheers,
Till

On Fri, Oct 16, 2020 at 1:33 PM DONG, Weike  wrote:

> Hi Till,
>
> Thank you for the kind reminder, and I have created a JIRA ticket for this
> issue https://issues.apache.org/jira/browse/FLINK-19677
>
> Could you please assign it to me? I will try to submit a PR this weekend
> to fix this : )
>
> Sincerely,
> Weike
>
> On Fri, Oct 16, 2020 at 5:54 PM Till Rohrmann 
> wrote:
>
>> Great, thanks a lot Weike. I think the first step would be to open a JIRA
>> issue, get assigned and then start on fixing it and opening a PR.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike 
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks for all the replies, and I agree with Yang, as we have found that
>>> for a pod without a service (like TaskManager pod), the reverse DNS lookup
>>> would always fail, so this lookup is not necessary for the Kubernetes
>>> environment.
>>>
>>> I am glad to help fix this issue to make Flink better : )
>>>
>>> Best,
>>> Weike
>>>
>>> On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Weike,

 thanks for getting back to us with your findings. Looking at the
 `TaskManagerLocation`, we are actually calling
 `InetAddress.getCanonicalHostName` twice for every creation of a
 `TaskManagerLocation` instance. This does not look right.

 I think it should be fine to make the look up configurable. Moreover,
 one could think about only doing a lazy look up if the canonical hostname
 is really needed (as far as I can see it is only really needed input split
 assignments and for the LocationPreferenceSlotSelectionStrategy to
 calculate how many TMs run on the same machine).

 Do you want to fix this issue?

 Cheers,
 Till

 On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
 wrote:

> Hi Till and community,
>
> By the way, initially I resolved the IPs several times but results
> returned rather quickly (less than 1ms, possibly due to DNS cache on the
> server), so I thought it might not be the DNS issue.
>
> However, after debugging and logging, it is found that the lookup time
> exhibited high variance, i. e. normally it completes fast but occasionally
> some slow results would block the thread. So an unstable DNS server might
> have a great impact on the performance of Flink job startup.
>
> Best,
> Weike
>
> On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
> wrote:
>
>> Hi Till and community,
>>
>> Increasing `kubernetes.jobmanager.cpu` in the configuration makes
>> this issue alleviated but not disappeared.
>>
>> After adding DEBUG logs to the internals of *flink-runtime*, we have
>> found the culprit is
>>
>> inetAddress.getCanonicalHostName()
>>
>> in
>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
>> and
>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
>> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
>> are severely blocked by that.
>>
>> By commenting out the two methods, this issue seems to be solved
>> immediately, so I wonder if Flink could provide a configuration parameter
>> to turn off the DNS reverse lookup process, as it seems that Flink jobs
>> could run happily without it.
>>
>> Sincerely,
>> Weike
>>
>>
>> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Weike,
>>>
>>> could you try setting kubernetes.jobmanager.cpu: 4 in your
>>> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
>>> component.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Weike,

 thanks for posting the logs. I will take a look at them. My
 suspicion would be that there is some operation blocking the 
 JobMaster's
 main thread which causes the registrations from the TMs to time out. 
 Maybe
 the logs allow me to validate/falsify this suspicion.

 Cheers,
 Till

 On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <
 kyled...@connect.hku.hk> wrote:

> Hi community,
>
> I have uploaded the log files of JobManager and TaskManager-1-1
> (one of the 50 TaskManagers) with DEBUG log level and default Flink
> configuration, and it clearly shows that TaskManager failed to 
> register
> with JobManager after 10 attempts.
>
> Here is the link:
>
> JobManager:
> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>
> TaskManager-1-1:
> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>
>
> Thanks : )

Re: In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Timo Walther

Hi Jon,

I would not recommend to use the configuration parameter. It is not 
deprecated yet but can be considered legacy code from before we reworked 
the type system.


Regards,
Timo

On 16.10.20 13:23, Kurt Young wrote:

Yes, I think this is a bug, feel free to open a jira and a pull request.

Best,
Kurt


On Fri, Oct 16, 2020 at 4:13 PM Jon Alberdi > wrote:


Hello to all, 

on flink-1.11.2 the program  written at
https://gist.github.com/yetanotherion/d007fa113d97411226eaea4f20cd4c2d

creates unexpected stack traces when the line “// triggerBug…”

Is uncommented (some lines of the stack trace are written in the
gist).

(It correctly outputs
“

+-+

| c |

+-+

| 1 |

| 2 |

+-+

“ else)

Is that behavior expected? 

If not, do you think a jira should be created to handle that? (I’d
be glad to do so)


Regards,

Ion

__ __





Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-16 Thread DONG, Weike
Hi Till,

Thank you for the kind reminder, and I have created a JIRA ticket for this
issue https://issues.apache.org/jira/browse/FLINK-19677

Could you please assign it to me? I will try to submit a PR this weekend to
fix this : )

Sincerely,
Weike

On Fri, Oct 16, 2020 at 5:54 PM Till Rohrmann  wrote:

> Great, thanks a lot Weike. I think the first step would be to open a JIRA
> issue, get assigned and then start on fixing it and opening a PR.
>
> Cheers,
> Till
>
> On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike 
> wrote:
>
>> Hi all,
>>
>> Thanks for all the replies, and I agree with Yang, as we have found that
>> for a pod without a service (like TaskManager pod), the reverse DNS lookup
>> would always fail, so this lookup is not necessary for the Kubernetes
>> environment.
>>
>> I am glad to help fix this issue to make Flink better : )
>>
>> Best,
>> Weike
>>
>> On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Weike,
>>>
>>> thanks for getting back to us with your findings. Looking at the
>>> `TaskManagerLocation`, we are actually calling
>>> `InetAddress.getCanonicalHostName` twice for every creation of a
>>> `TaskManagerLocation` instance. This does not look right.
>>>
>>> I think it should be fine to make the look up configurable. Moreover,
>>> one could think about only doing a lazy look up if the canonical hostname
>>> is really needed (as far as I can see it is only really needed input split
>>> assignments and for the LocationPreferenceSlotSelectionStrategy to
>>> calculate how many TMs run on the same machine).
>>>
>>> Do you want to fix this issue?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
>>> wrote:
>>>
 Hi Till and community,

 By the way, initially I resolved the IPs several times but results
 returned rather quickly (less than 1ms, possibly due to DNS cache on the
 server), so I thought it might not be the DNS issue.

 However, after debugging and logging, it is found that the lookup time
 exhibited high variance, i. e. normally it completes fast but occasionally
 some slow results would block the thread. So an unstable DNS server might
 have a great impact on the performance of Flink job startup.

 Best,
 Weike

 On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
 wrote:

> Hi Till and community,
>
> Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
> issue alleviated but not disappeared.
>
> After adding DEBUG logs to the internals of *flink-runtime*, we have
> found the culprit is
>
> inetAddress.getCanonicalHostName()
>
> in
> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
> and
> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
> are severely blocked by that.
>
> By commenting out the two methods, this issue seems to be solved
> immediately, so I wonder if Flink could provide a configuration parameter
> to turn off the DNS reverse lookup process, as it seems that Flink jobs
> could run happily without it.
>
> Sincerely,
> Weike
>
>
> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
> wrote:
>
>> Hi Weike,
>>
>> could you try setting kubernetes.jobmanager.cpu: 4 in your
>> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
>> component.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Weike,
>>>
>>> thanks for posting the logs. I will take a look at them. My
>>> suspicion would be that there is some operation blocking the JobMaster's
>>> main thread which causes the registrations from the TMs to time out. 
>>> Maybe
>>> the logs allow me to validate/falsify this suspicion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <
>>> kyled...@connect.hku.hk> wrote:
>>>
 Hi community,

 I have uploaded the log files of JobManager and TaskManager-1-1
 (one of the 50 TaskManagers) with DEBUG log level and default Flink
 configuration, and it clearly shows that TaskManager failed to register
 with JobManager after 10 attempts.

 Here is the link:

 JobManager:
 https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce

 TaskManager-1-1:
 https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe

 Thanks : )

 Best regards,
 Weike


 On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <
 kyled...@connect.hku.hk> wrote:

> Hi community,
>
> Recently we have noticed a strange behavior for Flink jobs on
> 

Re: In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Kurt Young
Yes, I think this is a bug, feel free to open a jira and a pull request.

Best,
Kurt


On Fri, Oct 16, 2020 at 4:13 PM Jon Alberdi  wrote:

> Hello to all,
>
> on flink-1.11.2 the program  written at
> https://gist.github.com/yetanotherion/d007fa113d97411226eaea4f20cd4c2d
>
> creates unexpected stack traces when the line “// triggerBug…”
>
> Is uncommented (some lines of the stack trace are written in the gist).
>
> (It correctly outputs
> “
>
> +-+
>
> |   c |
>
> +-+
>
> |   1 |
>
> |   2 |
>
> +-+
>
> “ else)
>
> Is that behavior expected?
>
> If not, do you think a jira should be created to handle that? (I’d be glad
> to do so)
>
>
> Regards,
>
> Ion
>
>
>


Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 Thread chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar;;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-16 Thread Till Rohrmann
Great, thanks a lot Weike. I think the first step would be to open a JIRA
issue, get assigned and then start on fixing it and opening a PR.

Cheers,
Till

On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike 
wrote:

> Hi all,
>
> Thanks for all the replies, and I agree with Yang, as we have found that
> for a pod without a service (like TaskManager pod), the reverse DNS lookup
> would always fail, so this lookup is not necessary for the Kubernetes
> environment.
>
> I am glad to help fix this issue to make Flink better : )
>
> Best,
> Weike
>
> On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann 
> wrote:
>
>> Hi Weike,
>>
>> thanks for getting back to us with your findings. Looking at the
>> `TaskManagerLocation`, we are actually calling
>> `InetAddress.getCanonicalHostName` twice for every creation of a
>> `TaskManagerLocation` instance. This does not look right.
>>
>> I think it should be fine to make the look up configurable. Moreover, one
>> could think about only doing a lazy look up if the canonical hostname is
>> really needed (as far as I can see it is only really needed input split
>> assignments and for the LocationPreferenceSlotSelectionStrategy to
>> calculate how many TMs run on the same machine).
>>
>> Do you want to fix this issue?
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
>> wrote:
>>
>>> Hi Till and community,
>>>
>>> By the way, initially I resolved the IPs several times but results
>>> returned rather quickly (less than 1ms, possibly due to DNS cache on the
>>> server), so I thought it might not be the DNS issue.
>>>
>>> However, after debugging and logging, it is found that the lookup time
>>> exhibited high variance, i. e. normally it completes fast but occasionally
>>> some slow results would block the thread. So an unstable DNS server might
>>> have a great impact on the performance of Flink job startup.
>>>
>>> Best,
>>> Weike
>>>
>>> On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
>>> wrote:
>>>
 Hi Till and community,

 Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
 issue alleviated but not disappeared.

 After adding DEBUG logs to the internals of *flink-runtime*, we have
 found the culprit is

 inetAddress.getCanonicalHostName()

 in
 *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
 and
 *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
 which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
 are severely blocked by that.

 By commenting out the two methods, this issue seems to be solved
 immediately, so I wonder if Flink could provide a configuration parameter
 to turn off the DNS reverse lookup process, as it seems that Flink jobs
 could run happily without it.

 Sincerely,
 Weike


 On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
 wrote:

> Hi Weike,
>
> could you try setting kubernetes.jobmanager.cpu: 4 in your
> flink-conf.yaml? I fear that a single CPU is too low for the JobManager
> component.
>
> Cheers,
> Till
>
> On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
> wrote:
>
>> Hi Weike,
>>
>> thanks for posting the logs. I will take a look at them. My suspicion
>> would be that there is some operation blocking the JobMaster's main 
>> thread
>> which causes the registrations from the TMs to time out. Maybe the logs
>> allow me to validate/falsify this suspicion.
>>
>> Cheers,
>> Till
>>
>> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
>> wrote:
>>
>>> Hi community,
>>>
>>> I have uploaded the log files of JobManager and TaskManager-1-1 (one
>>> of the 50 TaskManagers) with DEBUG log level and default Flink
>>> configuration, and it clearly shows that TaskManager failed to register
>>> with JobManager after 10 attempts.
>>>
>>> Here is the link:
>>>
>>> JobManager:
>>> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>>>
>>> TaskManager-1-1:
>>> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>>>
>>> Thanks : )
>>>
>>> Best regards,
>>> Weike
>>>
>>>
>>> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
>>> wrote:
>>>
 Hi community,

 Recently we have noticed a strange behavior for Flink jobs on
 Kubernetes per-job mode: when the parallelism increases, the time it 
 takes
 for the TaskManagers to register with *JobManager *becomes
 abnormally long (for a task with parallelism of 50, it could take 60 ~ 
 120
 seconds or even longer for the registration attempt), and usually more 
 than
 10 attempts are needed to finish this registration.

 Because of this, we could not submit a job requiring more than 20
 slots with the 

flink sql 窗口函数对分区的这个列进行过滤

2020-10-16 Thread kcz
因为列会有默认值,也有真实的,我想取到真实的那个列,这个功能如何实现一下。想到了窗口函数,发现不能进行过滤,还有一种骚操作是求max min。之后if来操作。

关于内存大小设置以及预测

2020-10-16 Thread Kyle Zhang
Hi all,
  最近也是遇到比较常见的内存溢出的错误OutOfMemoryError: Java heap space,JM:1g
TM:2g,简单粗暴的设置成2g、4g就可以运行了,
INFO  [] - Loading configuration property:
cluster.termination-message-path, /flink/log/termination.log
INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  3.906gb (4194304000 bytes)
INFO  [] - Total Flink Memory:  3.266gb (3506438138 bytes)
INFO  [] -   Total JVM Heap Memory: 1.508gb (1619001315 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:1.383gb (1484783587 bytes)
INFO  [] -   Total Off-heap Memory: 1.758gb (1887436823 bytes)
INFO  [] - Managed: 1.306gb (1402575276 bytes)
INFO  [] - Total JVM Direct Memory: 462.400mb (484861547 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   334.400mb (350643819 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:400.000mb (419430406 bytes)

请问有没有指标能够事前估算JM、TM需要的内存大小?

Best


Password usage in ssl configuration

2020-10-16 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

I have a query regarding the ssl configuration in flink. In flink with ssl 
enabled, flink-conf.yaml configuration file will contain the cleartext 
passwords for keystore and truststore files.
Suppose if any attacker gains access to this configuration file, using these 
passwords keystore and truststore files can be read. What is the community 
approach to protect these passwords ?

Regards,
Suchithra


回复:回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 罗显宴
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amen...@163.com 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 罗显宴
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年10月16日 15:45,amen...@163.com 写道:
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub

发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Jon Alberdi
Hello to all,
on flink-1.11.2 the program  written at 
https://gist.github.com/yetanotherion/d007fa113d97411226eaea4f20cd4c2d
creates unexpected stack traces when the line “// triggerBug…”
Is uncommented (some lines of the stack trace are written in the gist).
(It correctly outputs
“

+-+

|   c |

+-+

|   1 |

|   2 |

+-+
“ else)

Is that behavior expected?
If not, do you think a jira should be created to handle that? (I’d be glad to 
do so)

Regards,
Ion



Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-16 Thread DONG, Weike
Hi all,

Thanks for all the replies, and I agree with Yang, as we have found that
for a pod without a service (like TaskManager pod), the reverse DNS lookup
would always fail, so this lookup is not necessary for the Kubernetes
environment.

I am glad to help fix this issue to make Flink better : )

Best,
Weike

On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann  wrote:

> Hi Weike,
>
> thanks for getting back to us with your findings. Looking at the
> `TaskManagerLocation`, we are actually calling
> `InetAddress.getCanonicalHostName` twice for every creation of a
> `TaskManagerLocation` instance. This does not look right.
>
> I think it should be fine to make the look up configurable. Moreover, one
> could think about only doing a lazy look up if the canonical hostname is
> really needed (as far as I can see it is only really needed input split
> assignments and for the LocationPreferenceSlotSelectionStrategy to
> calculate how many TMs run on the same machine).
>
> Do you want to fix this issue?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike 
> wrote:
>
>> Hi Till and community,
>>
>> By the way, initially I resolved the IPs several times but results
>> returned rather quickly (less than 1ms, possibly due to DNS cache on the
>> server), so I thought it might not be the DNS issue.
>>
>> However, after debugging and logging, it is found that the lookup time
>> exhibited high variance, i. e. normally it completes fast but occasionally
>> some slow results would block the thread. So an unstable DNS server might
>> have a great impact on the performance of Flink job startup.
>>
>> Best,
>> Weike
>>
>> On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike 
>> wrote:
>>
>>> Hi Till and community,
>>>
>>> Increasing `kubernetes.jobmanager.cpu` in the configuration makes this
>>> issue alleviated but not disappeared.
>>>
>>> After adding DEBUG logs to the internals of *flink-runtime*, we have
>>> found the culprit is
>>>
>>> inetAddress.getCanonicalHostName()
>>>
>>> in
>>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName*
>>> and
>>> *org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName*,
>>> which could take ~ 6 seconds to complete, thus Akka dispatcher(s)
>>> are severely blocked by that.
>>>
>>> By commenting out the two methods, this issue seems to be solved
>>> immediately, so I wonder if Flink could provide a configuration parameter
>>> to turn off the DNS reverse lookup process, as it seems that Flink jobs
>>> could run happily without it.
>>>
>>> Sincerely,
>>> Weike
>>>
>>>
>>> On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Weike,

 could you try setting kubernetes.jobmanager.cpu: 4 in your
 flink-conf.yaml? I fear that a single CPU is too low for the JobManager
 component.

 Cheers,
 Till

 On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann 
 wrote:

> Hi Weike,
>
> thanks for posting the logs. I will take a look at them. My suspicion
> would be that there is some operation blocking the JobMaster's main thread
> which causes the registrations from the TMs to time out. Maybe the logs
> allow me to validate/falsify this suspicion.
>
> Cheers,
> Till
>
> On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike 
> wrote:
>
>> Hi community,
>>
>> I have uploaded the log files of JobManager and TaskManager-1-1 (one
>> of the 50 TaskManagers) with DEBUG log level and default Flink
>> configuration, and it clearly shows that TaskManager failed to register
>> with JobManager after 10 attempts.
>>
>> Here is the link:
>>
>> JobManager:
>> https://gist.github.com/kylemeow/740c470d9b5a1ab3552376193920adce
>>
>> TaskManager-1-1:
>> https://gist.github.com/kylemeow/41b9a8fe91975875c40afaf58276c2fe
>>
>> Thanks : )
>>
>> Best regards,
>> Weike
>>
>>
>> On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike 
>> wrote:
>>
>>> Hi community,
>>>
>>> Recently we have noticed a strange behavior for Flink jobs on
>>> Kubernetes per-job mode: when the parallelism increases, the time it 
>>> takes
>>> for the TaskManagers to register with *JobManager *becomes
>>> abnormally long (for a task with parallelism of 50, it could take 60 ~ 
>>> 120
>>> seconds or even longer for the registration attempt), and usually more 
>>> than
>>> 10 attempts are needed to finish this registration.
>>>
>>> Because of this, we could not submit a job requiring more than 20
>>> slots with the default configuration, as the TaskManager would say:
>>>
>>>
 Registration at JobManager 
 (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2)
 attempt 9 timed out after 25600 ms
>>>
>>> Free slot with allocation id 60d5277e138a94fb73fc6691557001e0
 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed 

回复: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread amen...@163.com
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug

best,
amenhub
 
发件人: 史 正超
发送时间: 2020-10-16 15:26
收件人: user-zh@flink.apache.org
主题: 回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试
 
CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;
 
我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以
 

发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用
 
是的,是我传参有问题
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Effect of renaming a primary key

2020-10-16 Thread Leonard Xu
Hi, Rex


> , won't that therefore change the key used to reference the state in RockDB 
> and in a checkpoint for the associated table?
> How might this effect state storage and checkpointing?
> Will the pk that was renamed remain a key for state or is some other 
> mechanism used to form a key?

Yes, I think Flink still can infer the primary key from renamed pk column, the 
state and checkpoint does not effect by the rename action.

BTW, Flink uses primary key in some optimizations,  but Flink does not check 
the primary key integrity because Flink does not own data like DB.
So user should ensure the unique key integrity when define a primary key, but 
It’s unusual the data from Kafka is unique as I know, 
this may lead unexpected result.

Best,
Leonard   


> 
> tableEnv.executeSql("""
> CREATE TABLE topic_users (
>   id BIGINT,
>   deleted_at BIGINT,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   ...
> )
> """)
> val usersNotDeletedTable =
>   tableEnv
> .from("topic_users")
> .select($"*")
> // Will PK automatically change to "users_user_id"?
> .renameColumns($"id".as("users_user_id"))
> .filter($"deleted_at".isNull)
> 
> val membershipsNotDeletedTable =
>   membershipsTable // This table also has "id" PK
> .join(
>   usersNotDeletedTable,
>   $"user_id" === $"users_user_id"
> )
> .dropColumns($"users_user_id")
> 
> Thanks!
> 
> -- 
> Rex Fenley  |  Software Engineer - Mobile and Backend
> 
> Remind.com  |  BLOG   |  
> FOLLOW US   |  LIKE US 
> 


回复: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 史 正超
你这样创建试一下,或者换个名字试试

CREATE TEMPORARY SYSTEM  FUNCTION imei_encrypt AS 
'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA;

 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY 
SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以


发件人: 奔跑的小飞袁 
发送时间: 2020年10月16日 6:47
收件人: user-zh@flink.apache.org 
主题: Re: 回复:回复: flink 自定义udf注册后不能使用

是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 奔跑的小飞袁
是的,是我传参有问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 奔跑的小飞袁
是的,我这个函数只需要一个参数



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Effect of renaming a primary key

2020-10-16 Thread Rex Fenley
Hello,

When two column names collide in a join, the user is forced to change the
name of one of the columns for the join to be valid. However, if those
columns are primary keys such as "id", won't that therefore change the key
used to reference the state in RockDB and in a checkpoint for the
associated table? How might this effect state storage and checkpointing?
Will the pk that was renamed remain a key for state or is some other
mechanism used to form a key?

My example

as you can see, I alter "id" to . This is because the following code must
join two tables which both have PK "id".

tableEnv.executeSql("""
CREATE TABLE topic_users (
id BIGINT,
deleted_at BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
)
""")
val usersNotDeletedTable =
tableEnv
.from("topic_users")
.select($"*")
// Will PK automatically change to "users_user_id"?
.renameColumns($"id".as("users_user_id"))
.filter($"deleted_at".isNull)

val membershipsNotDeletedTable =
membershipsTable // This table also has "id" PK
.join(
usersNotDeletedTable,
$"user_id" === $"users_user_id"
)
.dropColumns($"users_user_id")

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



回复:回复: flink 自定义udf注册后不能使用

2020-10-16 Thread Shuai Xia
你好,没看错的话,只有一个参?


--
发件人:奔跑的小飞袁 
发送时间:2020年10月16日(星期五) 14:18
收件人:user-zh 
主 题:Re: 回复: flink 自定义udf注册后不能使用

完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
targets ARRAY>,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR
)with (
'connector'='filesystem',
'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
'format'='parquet',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
 operation,
 operation_channel,
 `time`,
 ip,
 lat,
 lng,
 user_id,
 device_id,
 imei_encrypt(imei) AS imei,
 product_name,
 product_version,
 product_vendor,
 platform,
 platform_version,
 `languaage`,
 locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 奔跑的小飞袁
完整的sql执行文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=4;

CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
targets ARRAY>,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation VARCHAR,
operation_channel VARCHAR,
`time` VARCHAR,
ip VARCHAR,
lat VARCHAR,
lng VARCHAR,
user_id VARCHAR,
device_id VARCHAR,
imei VARCHAR,
product_name VARCHAR,
product_version VARCHAR,
product_vendor VARCHAR,
platform VARCHAR,
platform_version VARCHAR,
`languaage` VARCHAR,
locale VARCHAR
)with (
'connector'='filesystem',
'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db',
'format'='parquet',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='10min'
);

-- 业务过程
insert into cloud_behavior_sink
select
 operation,
 operation_channel,
 `time`,
 ip,
 lat,
 lng,
 user_id,
 device_id,
 imei_encrypt(imei) AS imei,
 product_name,
 product_version,
 product_vendor,
 platform,
 platform_version,
 `languaage`,
 locale
FROM cloud_behavior_source;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink作业运行失败

2020-10-16 Thread gangzi
我的是hadoop-2.10,flink-sql-connector-hive-1.2.2_2.11-1.11.1.jar。

> 2020年10月16日 下午12:01,Jeff Zhang  写道:
> 
> 你是hadoop2 吗?我记得这个情况只有hadoop3才会出现
> 
> 
> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道:
> 
>> TM
>> 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。
>> 
>>> 2020年10月16日 上午10:50,Jeff Zhang  写道:
>>> 
>>> 你看看TM的log,里面有CLASSPATH的
>>> 
>>> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道:
>>> 
 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop
 classpath`,但是报:java.lang.NoClassDefFoundError:
 org/apache/hadoop/mapred/JobConf
 
 
>> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。
 
> 2020年10月16日 上午9:59,Shubin Ruan  写道:
> 
> export HADOOP_CLASSPATH=
 
 
>>> 
>>> --
>>> Best Regards
>>> 
>>> Jeff Zhang
>> 
>> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: 回复: flink 自定义udf注册后不能使用

2020-10-16 Thread 奔跑的小飞袁
这是我的udf声明
CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE
JAVA;
以下是udf实现
public class IMEIEncrypt extends ScalarFunction {

public String eval(String column_type,String value) {
EncryptUtils encryptUtils = new EncryptUtils();
return encryptUtils.encrypt(column_type,value);
}
}




--
Sent from: http://apache-flink.147419.n8.nabble.com/