Re: Initializing broadcast state

2021-01-26 Thread Nick Bendtner
Thanks a lot Guowei, that makes sense. I will go with the caching approach.
Can you point me to any example which shows what is the most efficient way
to cache elements.
Thanks a ton for your help.

Best,
Nick

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma  wrote:

> Hi,Nick
> I do not think you could update the `myState`  in the
> `processBroadcastElement`. It is because you need a key before to update
> the keyedstate. But there is no key in `processBroadcastElement` .
> Best,
> Guowei
>
>
> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner  wrote:
>
>> Hi Guowei,
>> I am not using a keyed broadcast function, I use [1].  My question is,
>> can a non broadcast state, for instance value state/map state be updated
>> whenever I get a broadcast event in *processBroadcastElement*. This way
>> the state updates are consistent since each instance of the task gets the
>> same broadcast element.
>>
>> ```
>> private MapState myState;
>>
>> @Override
>>public void processElement(InputType value, ReadOnlyContext ctx,
>> Collector out) throws Exception {
>>  // Iterate over map state.
>>myState.iterator().forEach(entry -> ())// Business logic
>>
>>// Do things
>>}
>>
>>@Override
>>public void processBroadcastElement(BroadcastedStateType value,
>> Context ctx, Collector out) throws Exception {
>>  // update map state which is not a broadcast state. Same update in
>> every sub operator
>>state.put(value.ID(), value.state());   // Update the mapState
>> with value from broadcast
>>}
>>
>>
>>@Override
>>  public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {
>>
>>  // called when it's time to save state
>>
>>  myState.clear();
>>
>>  // Update myState with current application state
>>
>>  }
>>
>>  @Override
>>  public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>>
>>  // called when things start up, possibly recovering from an error
>>
>>  descriptor = new MapStateDescriptor<>("state", Types.STRING,
>> Types.POJO(BroadcastedStateType.class));
>>
>>  myState = context.getKeyedStateStore().getMapState(descriptor);
>>
>>  if (context.isRestored()) {
>>
>>  // restore application state from myState
>>
>>  }
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
>> .
>>
>>
>> Best,
>> Nick.
>>
>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma  wrote:
>>
>>> Hi,Nick
>>> Normally you could not iterate all the keyed states, but the
>>> `BroadCastState` & `applyTokeyedState` could do that.
>>> For example, before you get the broadcast side elements you might choose
>>> to cache the non-broadcast element to the keyed state. After the broadcast
>>> elements arrive you need to use `applyTokeyedState`[1] to iterate all the
>>> elements you "cached" in the keyed state and do your business logic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner 
>>> wrote:
>>>
>>>> Thanks Guowei. Another question I have is, what is the use of a
>>>> broadcast state when I can update a map state or value state inside of the
>>>> process broadcast element method and use that state to do a lookup in the
>>>> process element method like this example
>>>>
>>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>>>
>>>>
>>>> Best,
>>>> Nick
>>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma  wrote:
>>>>
>>>>> Hi, Nick
>>>>>   You might need to handle it yourself If you have to process an
>>>>> element only after you get the broadcast state.
>>>>>   For example, you could “cache” the element to the state and handle
>>>>> it when the element from the broadcast side elements are arrived. 
>>>>> Specially
>>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the
>>>>> `applyToKeyedState` to access the element you cache before.
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner 
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>> What is the way to initialize broadcast state(say with default
>>>>>> values) before the first element shows up in the broadcasting stream? I 
>>>>>> do
>>>>>> a lookup on the broadcast state to process transactions which come from
>>>>>> another stream. The problem is the broadcast state is empty until the 
>>>>>> first
>>>>>> element shows up.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Nick.
>>>>>>
>>>>>


Re: Initializing broadcast state

2021-01-25 Thread Nick Bendtner
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can
a non broadcast state, for instance value state/map state be updated
whenever I get a broadcast event in *processBroadcastElement*. This way the
state updates are consistent since each instance of the task gets the same
broadcast element.

```
private MapState myState;

@Override
   public void processElement(InputType value, ReadOnlyContext ctx,
Collector out) throws Exception {
 // Iterate over map state.
   myState.iterator().forEach(entry -> ())// Business logic

   // Do things
   }

   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context
ctx, Collector out) throws Exception {
 // update map state which is not a broadcast state. Same update in
every sub operator
   state.put(value.ID(), value.state());   // Update the mapState with
value from broadcast
   }


   @Override
 public void snapshotState(FunctionSnapshotContext context) throws
Exception {

 // called when it's time to save state

 myState.clear();

 // Update myState with current application state

 }

 @Override
 public void initializeState(FunctionInitializationContext context) throws
Exception {

 // called when things start up, possibly recovering from an error

 descriptor = new MapStateDescriptor<>("state", Types.STRING,
Types.POJO(BroadcastedStateType.class));

 myState = context.getKeyedStateStore().getMapState(descriptor);

 if (context.isRestored()) {

 // restore application state from myState

 }

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
.


Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma  wrote:

> Hi,Nick
> Normally you could not iterate all the keyed states, but the
> `BroadCastState` & `applyTokeyedState` could do that.
> For example, before you get the broadcast side elements you might choose
> to cache the non-broadcast element to the keyed state. After the broadcast
> elements arrive you need to use `applyTokeyedState`[1] to iterate all the
> elements you "cached" in the keyed state and do your business logic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> Best,
> Guowei
>
>
> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner  wrote:
>
>> Thanks Guowei. Another question I have is, what is the use of a broadcast
>> state when I can update a map state or value state inside of the process
>> broadcast element method and use that state to do a lookup in the process
>> element method like this example
>>
>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>
>>
>> Best,
>> Nick
>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma  wrote:
>>
>>> Hi, Nick
>>>   You might need to handle it yourself If you have to process an element
>>> only after you get the broadcast state.
>>>   For example, you could “cache” the element to the state and handle it
>>> when the element from the broadcast side elements are arrived. Specially if
>>> you are using the `KeyedBroadcastProcessFunction` you could use the
>>> `applyToKeyedState` to access the element you cache before.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner 
>>> wrote:
>>>
>>>> Hi guys,
>>>> What is the way to initialize broadcast state(say with default values)
>>>> before the first element shows up in the broadcasting stream? I do a lookup
>>>> on the broadcast state to process transactions which come from another
>>>> stream. The problem is the broadcast state is empty until the first element
>>>> shows up.
>>>>
>>>>
>>>> Best,
>>>> Nick.
>>>>
>>>


Re: Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Thanks Guowei. Another question I have is, what is the use of a broadcast
state when I can update a map state or value state inside of the process
broadcast element method and use that state to do a lookup in the process
element method like this example
https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate


Best,
Nick
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma  wrote:

> Hi, Nick
>   You might need to handle it yourself If you have to process an element
> only after you get the broadcast state.
>   For example, you could “cache” the element to the state and handle it
> when the element from the broadcast side elements are arrived. Specially if
> you are using the `KeyedBroadcastProcessFunction` you could use the
> `applyToKeyedState` to access the element you cache before.
>
> Best,
> Guowei
>
>
> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner  wrote:
>
>> Hi guys,
>> What is the way to initialize broadcast state(say with default values)
>> before the first element shows up in the broadcasting stream? I do a lookup
>> on the broadcast state to process transactions which come from another
>> stream. The problem is the broadcast state is empty until the first element
>> shows up.
>>
>>
>> Best,
>> Nick.
>>
>


Initializing broadcast state

2021-01-24 Thread Nick Bendtner
Hi guys,
What is the way to initialize broadcast state(say with default values)
before the first element shows up in the broadcasting stream? I do a lookup
on the broadcast state to process transactions which come from another
stream. The problem is the broadcast state is empty until the first element
shows up.


Best,
Nick.


Re: Status of a job when a kafka source dies

2020-08-13 Thread Nick Bendtner
Hi Piotr,
Sorry for the late reply. So the poll does not throw an exception when a
broker goes down. In spring they solve it by generating an event [1]
whenever this happens and you can intercept this event, consumer.timeout.ms
helps to some extent does help but if the source topic does not receive any
messages for the specified value then it still throws an exception.

Best,
Nick.


[1]
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/event/NonResponsiveConsumerEvent.html

On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski  wrote:

> Hi Nick,
>
> Could you elaborate more, what event and how would you like Flink to
> handle? Is there some kind of Kafka's API that can be used to listen to
> such kind of events? Becket, do you maybe know something about this?
>
> As a side note Nick, can not you configure some timeouts [1] in the
> KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as
> I wrote before, that would be more a question to Kafka guys.
>
> Piotrek
>
> [1] http://kafka.apache.org/20/documentation/
>
> śr., 5 sie 2020 o 19:58 Nick Bendtner  napisał(a):
>
>> +user group.
>>
>> On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:
>>
>>> Thanks Piotr but shouldn't this event be handled by the
>>> FlinkKafkaConsumer since the poll happens inside the FlinkKafkaConsumer.
>>> How can I catch this event in my code since I don't have control over the
>>> poll.
>>>
>>> Best,
>>> Nick.
>>>
>>> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> What Aljoscha was trying to say is that Flink is not trying to do any
>>>> magic. If `KafkaConsumer` - which is being used under the hood of
>>>> `FlinkKafkaConsumer` connector - throws an exception, this
>>>> exception bubbles up causing the job to failover. If the failure is handled
>>>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>>>> TM log that you attached, the latter seems to be happening - note that the
>>>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>>>> that's not the code we (Flink developers) control.
>>>>
>>>> If you want to change this behaviour, unless someone here on this
>>>> mailing list just happens to know the answer, the better place to ask such
>>>> a question on the Kafka mailing list. Maybe there is some way to configure
>>>> this.
>>>>
>>>> And sorry I don't know much about neither the KafkaConsumer nor the
>>>> KafkaBrokers configuration :(
>>>>
>>>> Piotrek
>>>>
>>>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>>>
>>>>> Hi,
>>>>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>>>>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>>>>> messages in the log ( data is obfuscated) . There are no error logs. The
>>>>> kafka consumer properties are
>>>>>
>>>>> 1. "bootstrap.servers"
>>>>>
>>>>> 2. "zookeeper.connect
>>>>>
>>>>> 3. "auto.offset.reset"
>>>>>
>>>>> 4. "group.id"
>>>>>
>>>>> 5."security.protocol"
>>>>>
>>>>>
>>>>> The flink consumer starts consuming data as soon as the kafka comes
>>>>> back up. So I want to know in what scenario/kafka consumer config will the
>>>>> job go to failed state after a finite number of restart attempts from
>>>>> checkpoint.
>>>>>
>>>>>
>>>>> TM log.
>>>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>>>  - [Consumer clientId=consumer-5,
>>>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>>>> Broker may not be available.
>>>>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>>>>  - [Consumer clientId=consumer-4,
>>>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>>>>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>>>>> Broker may not be available.
>>>>> 2020-08-04 19:50:55,791 WARN  org.apache.kaf

Re: Status of a job when a kafka source dies

2020-08-05 Thread Nick Bendtner
+user group.

On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner  wrote:

> Thanks Piotr but shouldn't this event be handled by the FlinkKafkaConsumer
> since the poll happens inside the FlinkKafkaConsumer. How can I catch this
> event in my code since I don't have control over the poll.
>
> Best,
> Nick.
>
> On Wed, Aug 5, 2020 at 12:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Nick,
>>
>> What Aljoscha was trying to say is that Flink is not trying to do any
>> magic. If `KafkaConsumer` - which is being used under the hood of
>> `FlinkKafkaConsumer` connector - throws an exception, this
>> exception bubbles up causing the job to failover. If the failure is handled
>> by the `KafkaConsumer` silently, that's what's happening. As we can in the
>> TM log that you attached, the latter seems to be happening - note that the
>> warning is logged by "org.apache.kafka.clients.NetworkClient" package, so
>> that's not the code we (Flink developers) control.
>>
>> If you want to change this behaviour, unless someone here on this mailing
>> list just happens to know the answer, the better place to ask such a
>> question on the Kafka mailing list. Maybe there is some way to configure
>> this.
>>
>> And sorry I don't know much about neither the KafkaConsumer nor the
>> KafkaBrokers configuration :(
>>
>> Piotrek
>>
>> wt., 4 sie 2020 o 22:04 Nick Bendtner  napisał(a):
>>
>>> Hi,
>>> I don't observe this behaviour though, we use flink 1.7.2 . I stopped
>>> kafka and zookeeper on all broker nodes. On the flink side, I see the
>>> messages in the log ( data is obfuscated) . There are no error logs. The
>>> kafka consumer properties are
>>>
>>> 1. "bootstrap.servers"
>>>
>>> 2. "zookeeper.connect
>>>
>>> 3. "auto.offset.reset"
>>>
>>> 4. "group.id"
>>>
>>> 5."security.protocol"
>>>
>>>
>>> The flink consumer starts consuming data as soon as the kafka comes back
>>> up. So I want to know in what scenario/kafka consumer config will the job
>>> go to failed state after a finite number of restart attempts from
>>> checkpoint.
>>>
>>>
>>> TM log.
>>> 2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-5,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
>>> yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-4,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
>>> yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established.
>>> Broker may not be available.
>>> 2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
>>>- [Consumer clientId=consumer-6,
>>> groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
>>> yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established.
>>> Broker may not be available.
>>>
>>> Best,
>>> Nick
>>>
>>> On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink doesn't do any special failure-handling or retry logic, so it’s
>>>> up
>>>> to how the KafkaConsumer is configured via properties. In general Flink
>>>> doesn’t try to be smart: when something fails an exception fill bubble
>>>> up that will fail this execution of the job. If checkpoints are enabled
>>>> this will trigger a restore, this is controlled by the restart
>>>> strategy.
>>>> If that eventually gives up the job fill go to “FAILED” and stop.
>>>>
>>>> This is the relevant section of the docs:
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 15.07.20 17:42, Nick Bendtner wrote:
>>>> > Hi guys,
>>>> > I want to know what is the default behavior of Kafka source when a
>>>> kafka
>>>> > cluster goes down during streaming. Will the job status go to failing
>>>> or is
>>>> > the exception caught and there is a back off before the source tries
>>>> to
>>>> > poll for more events ?
>>>> >
>>>> >
>>>> > Best,
>>>> > Nick.
>>>> >
>>>>
>>>>


Re: Status of a job when a kafka source dies

2020-08-04 Thread Nick Bendtner
Hi,
I don't observe this behaviour though, we use flink 1.7.2 . I stopped kafka
and zookeeper on all broker nodes. On the flink side, I see the messages in
the log ( data is obfuscated) . There are no error logs. The kafka consumer
properties are

1. "bootstrap.servers"

2. "zookeeper.connect

3. "auto.offset.reset"

4. "group.id"

5."security.protocol"


The flink consumer starts consuming data as soon as the kafka comes back
up. So I want to know in what scenario/kafka consumer config will the job
go to failed state after a finite number of restart attempts from
checkpoint.


TM log.
2020-08-04 19:50:55,539 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-5,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,540 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-4,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (
yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-4,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (
yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker
may not be available.
2020-08-04 19:50:55,791 WARN  org.apache.kafka.clients.NetworkClient
 - [Consumer clientId=consumer-6,
groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (
yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker
may not be available.

Best,
Nick

On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek 
wrote:

> Hi,
>
> Flink doesn't do any special failure-handling or retry logic, so it’s up
> to how the KafkaConsumer is configured via properties. In general Flink
> doesn’t try to be smart: when something fails an exception fill bubble
> up that will fail this execution of the job. If checkpoints are enabled
> this will trigger a restore, this is controlled by the restart strategy.
> If that eventually gives up the job fill go to “FAILED” and stop.
>
> This is the relevant section of the docs:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html
>
> Best,
> Aljoscha
>
> On 15.07.20 17:42, Nick Bendtner wrote:
> > Hi guys,
> > I want to know what is the default behavior of Kafka source when a kafka
> > cluster goes down during streaming. Will the job status go to failing or
> is
> > the exception caught and there is a back off before the source tries to
> > poll for more events ?
> >
> >
> > Best,
> > Nick.
> >
>
>


Status of a job when a kafka source dies

2020-07-15 Thread Nick Bendtner
Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.


Non parallel file sources

2020-06-23 Thread Nick Bendtner
Hi guys,
What is the best way to process a file from a unix file system since there
is no guarantee as to which task manager will be assigned to process the
file. We run flink in standalone mode. We currently follow the brute force
way in which we copy the file to every task manager, is there a better way
to do this ?


Best,
Nick.


State backend considerations

2020-06-21 Thread Nick Bendtner
Hi guys,
I have a few questions on state backends.
Is there a guideline on how big the state has to be where it makes sense to
use RocksDB rather than FsStatebackend ? Is there an analysis on
latency for a full checkpoint for FsSateBackend based on increase in state
size ?


Best,
Nick.


Re: kerberos integration with flink

2020-06-01 Thread Nick Bendtner
Hi Guo,
The auto renewal happens fine, however I want to generate a new ticket with
a new renew until period so that the job can run longer than 7 days, I am
talking about the second paragraph your email, I have set a custom cache by
setting KRB5CCNAME . Just want to make sure that Krb5LoginModule does a
re-login like you said. I think it does because I generated a new ticket
when the flink job was running and the job continues to auto renew the new
ticket. Let me know if you can think of any pit falls. Once again i really
want to thank you for your help and your time.

Best,
Nick.

On Mon, Jun 1, 2020 at 12:29 AM Yangze Guo  wrote:

> Hi, Nick.
>
> Do you mean that you manually execute "kinit -R" to renew the ticket cache?
> If that is the case, Flink already sets the "renewTGT" to true. If
> everything is ok, you do not need to do it yourself. However, it seems
> this mechanism has a bug and this bug is not fixed in all JDK
> versions. Please refer to [1].
>
> If you mean that you generate a new ticket cache in the same place(by
> default /tmp/krb5cc_uid), I'm not sure will Krb5LoginModule re-login
> with your new ticket cache. I'll try to do a deeper investigation.
>
> [1] https://bugs.openjdk.java.net/browse/JDK-8058290.
>
> Best,
> Yangze Guo
>
> On Sat, May 30, 2020 at 3:07 AM Nick Bendtner  wrote:
> >
> > Hi Guo,
> > Thanks again for your inputs. If I periodically renew the kerberos cache
> using an external process(kinit) on all flink nodes in standalone mode,
> will the cluster still be short lived or will the new ticket in the cache
> be used and the cluster can live till the end of the new expiry ?
> >
> > Best,
> > Nick.
> >
> > On Sun, May 24, 2020 at 9:15 PM Yangze Guo  wrote:
> >>
> >> Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
> >> or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
> >> Yarn, Flink will acquire delegation tokens with your ticket cache and
> >> set tokens for job manager and task executor. As the document said,
> >> the main drawback is that the cluster is necessarily short-lived since
> >> the generated delegation tokens will expire (typically within a week).
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, May 23, 2020 at 1:23 AM Nick Bendtner 
> wrote:
> >> >
> >> > Hi Guo,
> >> > Even for HDFS I don't really need to set
> "security.kerberos.login.contexts" . As long as there is the right ticket
> in the ticket cache before starting the flink cluster it seems to work
> fine. I think even [4] from your reference seems to do the same thing. I
> have defined own ticket cache specifically for flink cluster by setting
> this environment variable. Before starting the cluster I create a ticket by
> using kinit.
> >> > This is how I make flink read this cache.
> >> > export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to
> find the location of ticket cache using this variable [1].
> >> > Do you see any problems in setting up hadoop security module this way
> ? And thanks a lot for your help.
> >> >
> >> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
> >> >
> >> > Best,
> >> > Nick
> >> >
> >> >
> >> >
> >> > On Thu, May 21, 2020 at 9:54 PM Yangze Guo 
> wrote:
> >> >>
> >> >> Hi, Nick,
> >> >>
> >> >> From my understanding, if you configure the
> >> >> "security.kerberos.login.keytab", Flink will add the
> >> >> AppConfigurationEntry of this keytab to all the apps defined in
> >> >> "security.kerberos.login.contexts". If you define
> >> >> "java.security.auth.login.config" at the same time, Flink will also
> >> >> keep the configuration in it. For more details, see [1][2].
> >> >>
> >> >> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> >> >> you need to set "security.kerberos.login.contexts". See [3][4].
> >> >>
> >> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> >> >> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> >> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-se

Re: kerberos integration with flink

2020-05-29 Thread Nick Bendtner
Hi Guo,
Thanks again for your inputs. If I periodically renew the kerberos
cache using an external process(kinit) on all flink nodes in standalone
mode, will the cluster still be short lived or will the new ticket in the
cache be used and the cluster can live till the end of the new expiry ?

Best,
Nick.

On Sun, May 24, 2020 at 9:15 PM Yangze Guo  wrote:

> Yes, you can use kinit. But AFAIK, if you deploy Flink on Kubernetes
> or Mesos, Flink will not ship the ticket cache. If you deploy Flink on
> Yarn, Flink will acquire delegation tokens with your ticket cache and
> set tokens for job manager and task executor. As the document said,
> the main drawback is that the cluster is necessarily short-lived since
> the generated delegation tokens will expire (typically within a week).
>
> Best,
> Yangze Guo
>
> On Sat, May 23, 2020 at 1:23 AM Nick Bendtner  wrote:
> >
> > Hi Guo,
> > Even for HDFS I don't really need to set
> "security.kerberos.login.contexts" . As long as there is the right ticket
> in the ticket cache before starting the flink cluster it seems to work
> fine. I think even [4] from your reference seems to do the same thing. I
> have defined own ticket cache specifically for flink cluster by setting
> this environment variable. Before starting the cluster I create a ticket by
> using kinit.
> > This is how I make flink read this cache.
> > export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to
> find the location of ticket cache using this variable [1].
> > Do you see any problems in setting up hadoop security module this way ?
> And thanks a lot for your help.
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
> >
> > Best,
> > Nick
> >
> >
> >
> > On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:
> >>
> >> Hi, Nick,
> >>
> >> From my understanding, if you configure the
> >> "security.kerberos.login.keytab", Flink will add the
> >> AppConfigurationEntry of this keytab to all the apps defined in
> >> "security.kerberos.login.contexts". If you define
> >> "java.security.auth.login.config" at the same time, Flink will also
> >> keep the configuration in it. For more details, see [1][2].
> >>
> >> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> >> you need to set "security.kerberos.login.contexts". See [3][4].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> >> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
> >> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi guys,
> >> > Is there any difference in providing kerberos config to the flink jvm
> using this method in the flink configuration?
> >> >
> >> > env.java.opts:  -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
> >> >
> >> > Is there any difference in doing it this way vs providing it from
> security.kerberos.login.keytab .
> >> >
> >> > Best,
> >> >
> >> > Nick.
>


Re: kerberos integration with flink

2020-05-22 Thread Nick Bendtner
Hi Guo,
Even for HDFS I don't really need to set "security.kerberos.login.contexts"
. As long as there is the right ticket in the ticket cache before starting
the flink cluster it seems to work fine. I think even [4] from your
reference seems to do the same thing. I have defined own ticket
cache specifically for flink cluster by setting this environment variable.
Before starting the cluster I create a ticket by using kinit.
This is how I make flink read this cache.
export KRB5CCNAME=/home/was/Jaas/krb5cc . I think even flink tries to find
the location of ticket cache using this variable [1].
Do you see any problems in setting up hadoop security module this way ? And
thanks a lot for your help.

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java

Best,
Nick



On Thu, May 21, 2020 at 9:54 PM Yangze Guo  wrote:

> Hi, Nick,
>
> From my understanding, if you configure the
> "security.kerberos.login.keytab", Flink will add the
> AppConfigurationEntry of this keytab to all the apps defined in
> "security.kerberos.login.contexts". If you define
> "java.security.auth.login.config" at the same time, Flink will also
> keep the configuration in it. For more details, see [1][2].
>
> If you want to use this keytab to interact with HDFS, HBase and Yarn,
> you need to set "security.kerberos.login.contexts". See [3][4].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#jaas-security-module
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html#hadoop-security-module
> [4]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
>
> Best,
> Yangze Guo
>
> On Thu, May 21, 2020 at 11:06 PM Nick Bendtner  wrote:
> >
> > Hi guys,
> > Is there any difference in providing kerberos config to the flink jvm
> using this method in the flink configuration?
> >
> > env.java.opts:  -Dconfig.resource=qa.conf
> -Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
> -Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
> -Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf
> >
> > Is there any difference in doing it this way vs providing it from
> security.kerberos.login.keytab .
> >
> > Best,
> >
> > Nick.
>


kerberos integration with flink

2020-05-21 Thread Nick Bendtner
Hi guys,
Is there any difference in providing kerberos config to the flink jvm using
this method in the flink configuration?

env.java.opts:  -Dconfig.resource=qa.conf
-Djava.library.path=/usr/mware/flink-1.7.2/simpleapi/lib/
-Djava.security.auth.login.config=/usr/mware/flink-1.7.2/Jaas/kafka-jaas.conf
-Djava.security.krb5.conf=/usr/mware/flink-1.7.2/Jaas/krb5.conf

Is there any difference in doing it this way vs providing it from
security.kerberos.login.keytab .

Best,

Nick.


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-14 Thread Nick Bendtner
Hi Gary,
I have used this technique before. I deleted flink-avro jar from lib and
packed it into the application  jar and there are no problems.

Best,
Nick

On Thu, May 14, 2020 at 6:11 AM Gary Yao  wrote:

> Its because the flink distribution of the cluster is 1.7.2. We use a
>> standalone cluster , so in the lib directory in flink the artifact is
>> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
>> and use child first class loading to use newer version of flink-core.
>>
>
> Do you have experience with this technique in production? In general I do
> not think this can work; a job pretending to run a newer version of Flink
> generally cannot communicate with an older JobManager, which normally does
> not even run user code.
>
> If you are stuck with Flink 1.8, maybe it is an option for you to backport
> FLINK-11693 to Flink 1.8 yourself and build a custom Kafka connector.
>
>
> On Tue, May 12, 2020 at 10:04 PM Nick Bendtner  wrote:
> >
> > Hi Gary,
> > Its because the flink distribution of the cluster is 1.7.2. We use a
> standalone cluster , so in the lib directory in flink the artifact is
> flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
> and use child first class loading to use newer version of flink-core. If I
> have it as provided scope, sure it will work in IntelliJ but not outside of
> it .
> >
> > Best,
> > Nick
> >
> > On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:
> >>
> >> Hi Nick,
> >>
> >> Can you explain why it is required to package flink-core into your
> >> application jar? Usually flink-core is a dependency with provided
> >> scope [1]
> >>
> >> Best,
> >> Gary
> >>
> >> [1]
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
> >>
> >> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi Gary,
> >> > Thanks for the info. I am aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it. If its not possible to do this with older version without writing
> our own kafka sink implementation similar to the flink provided version in
> 1.9.0 then I think we will pack flink-core 1.9.0 with the application and
> follow the approach that you suggested. Thanks again for getting back to me
> so quickly.
> >> >
> >> > Best,
> >> > Nick
> >> >
> >> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
> >> >>
> >> >> Hi Nick,
> >> >>
> >> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you
> can use
> >> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
> >> >>
> >> >> Best,
> >> >> Gary
> >> >>
> >> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
> >> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
> >> >>
> >> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
> wrote:
> >> >> >
> >> >> > Hi guys,
> >> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >> >> >
> >> >> > Best,
> >> >> > Nick.
>


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary,
Its because the flink distribution of the cluster is 1.7.2. We use a
standalone cluster , so in the lib directory in flink the artifact is
flink-core-1.7.2.jar . I need to pack flink-core-1.9.0.jar from application
and use child first class loading to use newer version of flink-core. If I
have it as provided scope, sure it will work in IntelliJ but not outside of
it .

Best,
Nick

On Tue, May 12, 2020 at 2:53 PM Gary Yao  wrote:

> Hi Nick,
>
> Can you explain why it is required to package flink-core into your
> application jar? Usually flink-core is a dependency with provided
> scope [1]
>
> Best,
> Gary
>
> [1]
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope
>
> On Tue, May 12, 2020 at 5:41 PM Nick Bendtner  wrote:
> >
> > Hi Gary,
> > Thanks for the info. I am aware this feature is available in 1.9.0
> onwards. Our cluster is still very old and have CICD challenges,I was
> hoping not to bloat up the application jar by packaging even flink-core
> with it. If its not possible to do this with older version without writing
> our own kafka sink implementation similar to the flink provided version in
> 1.9.0 then I think we will pack flink-core 1.9.0 with the application and
> follow the approach that you suggested. Thanks again for getting back to me
> so quickly.
> >
> > Best,
> > Nick
> >
> > On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:
> >>
> >> Hi Nick,
> >>
> >> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can
> use
> >> KafkaSerializationSchema to produce a ProducerRecord [1][2].
> >>
> >> Best,
> >> Gary
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-11693
> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
> >>
> >> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner 
> wrote:
> >> >
> >> > Hi guys,
> >> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >> >
> >> > Best,
> >> > Nick.
>


Re: ProducerRecord with Kafka Sink for 1.8.0

2020-05-12 Thread Nick Bendtner
Hi Gary,
Thanks for the info. I am aware this feature is available in 1.9.0 onwards.
Our cluster is still very old and have CICD challenges,I was hoping not to
bloat up the application jar by packaging even flink-core with it. If its
not possible to do this with older version without writing our own kafka
sink implementation similar to the flink provided version in 1.9.0 then I
think we will pack flink-core 1.9.0 with the application and follow the
approach that you suggested. Thanks again for getting back to me so
quickly.

Best,
Nick

On Tue, May 12, 2020 at 3:37 AM Gary Yao  wrote:

> Hi Nick,
>
> Are you able to upgrade to Flink 1.9? Beginning with Flink 1.9 you can use
> KafkaSerializationSchema to produce a ProducerRecord [1][2].
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-11693
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html
>
> On Mon, May 11, 2020 at 10:59 PM Nick Bendtner  wrote:
> >
> > Hi guys,
> > I use 1.8.0 version for flink-connector-kafka. Do you have any
> recommendations on how to produce a ProducerRecord from a kafka sink.
> Looking to add support to kafka headers therefore thinking about
> ProducerRecord. If you have any thoughts its highly appreciated.
> >
> > Best,
> > Nick.
>


ProducerRecord with Kafka Sink for 1.8.0

2020-05-11 Thread Nick Bendtner
Hi guys,
I use 1.8.0 version for flink-connector-kafka. Do you have any
recommendations on how to produce a ProducerRecord from a kafka
sink. Looking to add support to kafka headers therefore thinking about
ProducerRecord. If you have any thoughts its highly appreciated.

Best,
Nick.


Using flink-connector-kafka-1.9.1 with flink-core-1.7.2

2020-05-06 Thread Nick Bendtner
Hi guys,
I am using flink 1.7.2 version. I have to deserialize data from kafka into
consumer records therefore I decided to update the flink-connector-kafka to
1.9.1 which provides support for consumer record. We use child first class
loading. However it seems like I have compatibility issue as I get this
exception, *Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/api/common/ExecutionConfig$ClosureCleanerLevel *.

Any tricks to make this work without changing the version of flink-core ?


Best,
Nick.


Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-05 Thread Nick Bendtner
Hi guys,
In our flink job we use java source for deserializing a message from kafka
using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
KafkaDeserializationSchema>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
be cast to scala.Product at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*

Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation> getProducedType() {
return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
.writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
  deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
  properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
  val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
source.asJava,
deserialization,
properties)
  consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala case
class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
the cluster uses 1.7.2 version of flink.

Best,
Nick.


Re: Help with flink hdfs sink

2020-03-25 Thread Nick Bendtner
Thank you so much guys, I used "hdfs://nameservice/path/of/your/file",
works fine for me now.

Best,
Nick

On Fri, Mar 20, 2020 at 3:48 AM Yang Wang  wrote:

> I think Jingsong is right. You miss a slash in your HDFS path.
>
> Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file".
> And the nameservice could be omitted if you want to use the defaultFS
> configured in the core-site.xml.
>
>
> Best,
> Yang
>
> Jingsong Li  于2020年3月20日周五 上午10:09写道:
>
>> Hi Nick,
>>
>> You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional
>> / after hdfs://, which is a protocol name.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner  wrote:
>>
>>> Hi guys,
>>> I am using flink version 1.7.2.
>>> I am trying to write to hdfs sink from my flink job. I
>>> setup HADOOP_HOME. Here is the debug log for this :
>>>
>>> 2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils 
>>> - Cannot find hdfs-default configuration-file path in Flink 
>>> config.
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils 
>>> - Cannot find hdfs-site configuration-file path in Flink 
>>> config.
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils 
>>> - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop 
>>> configuration
>>> 2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils 
>>> - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop 
>>> configuration
>>> 2020-03-19 18:59:34,344 INFO  
>>> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user 
>>> set to kafka (auth:KERBEROS)
>>>
>>>
>>> This is what my streaming file sink code looks like.
>>>
>>>
>>> val sink: StreamingFileSink[String] = StreamingFileSink
>>>   .forRowFormat(new Path("hdfs://tmp/auditlog/"), new 
>>> SimpleStringEncoder[String]("UTF-8"))
>>>   .withRollingPolicy(DefaultRollingPolicy.create()
>>> .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>>> .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>>> .withMaxPartSize(1024
>>>   * 1024 * 1024)
>>> .build())
>>>   .build()
>>>
>>> result.addSink(sink).name("HDFSSink")
>>>
>>>
>>> When I run the job I get this error stack trace :
>>>
>>>  INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph- 
>>> Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from 
>>> RUNNING to FAILED.
>>> java.io.IOException: Cannot instantiate file system for URI: 
>>> hdfs://tmp/auditlog
>>> at 
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>> at 
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>>> at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Illegal

Help with flink hdfs sink

2020-03-19 Thread Nick Bendtner
Hi guys,
I am using flink version 1.7.2.
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME.
Here is the debug log for this :

2020-03-19 18:59:34,316 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Cannot
find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG
org.apache.flink.runtime.util.HadoopUtils - Adding
/home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO
org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
user set to kafka (auth:KERBEROS)


This is what my streaming file sink code looks like.


val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path("hdfs://tmp/auditlog/"), new
SimpleStringEncoder[String]("UTF-8"))
  .withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
  * 1024 * 1024)
.build())
  .build()

result.addSink(sink).name("HDFSSink")


When I run the job I get this error stack trace :

 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph-
Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from
RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
at 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: tmp
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:687)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:628)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)


Why is it trying to connect to /tmp ? Is it not supposed to get the
namenodes from the core-site.xml and hdfs-site.xml ?

Can you please help with the correct way to configure hdfs sink.


Best,

Nick.


Re: Providing hdfs name node IP for streaming file sink

2020-03-02 Thread Nick Bendtner
Thanks a lot Yang. What are your thoughts on catching the exception when a
name node is down and retrying with the secondary name node ?

Best,
Nick.

On Sun, Mar 1, 2020 at 9:05 PM Yang Wang  wrote:

> Hi Nick,
>
> Certainly you could directly use "namenode:port" as the schema of you HDFS
> path.
> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
> necessary.
> However, that also means you could benefit from the HDFS
> high-availability[1].
>
> If your HDFS cluster is HA configured, i strongly suggest you to set the
> "HADOOP_CONF_DIR"
> for your Flink application. Both the client and cluster(JM/TM) side need
> to be set. Then
> your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
> Given that "myhdfs"
> is the name service configured in hdfs-site.xml.
>
>
> Best,
> Yang
>
>
>
> [1].
> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>
> Nick Bendtner  于2020年2月29日周六 上午6:00写道:
>
>> To add to this question, do I need to setup env.hadoop.conf.dir to point
>> to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for
>> the jvm ? Or is it possible to write to hdfs without any external hadoop
>> config like core-site.xml, hdfs-site.xml ?
>>
>> Best,
>> Nick.
>>
>>
>>
>> On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner 
>> wrote:
>>
>>> Hi guys,
>>> I am trying to write to hdfs from streaming file sink. Where should I
>>> provide the IP address of the name node ? Can I provide it as a part of the
>>> flink-config.yaml file or should I provide it like this :
>>>
>>> final StreamingFileSink sink = StreamingFileSink
>>> .forBulkFormat(hdfs://namenode:8020/flink/test, 
>>> ParquetAvroWriters.forGenericRecord(schema))
>>>
>>> .build();
>>>
>>>
>>> Best,
>>> Nick
>>>
>>>
>>>


Re: Providing hdfs name node IP for streaming file sink

2020-02-28 Thread Nick Bendtner
To add to this question, do I need to setup env.hadoop.conf.dir to point to
the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for the jvm
? Or is it possible to write to hdfs without any external hadoop config
like core-site.xml, hdfs-site.xml ?

Best,
Nick.



On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner  wrote:

> Hi guys,
> I am trying to write to hdfs from streaming file sink. Where should I
> provide the IP address of the name node ? Can I provide it as a part of the
> flink-config.yaml file or should I provide it like this :
>
> final StreamingFileSink sink = StreamingFileSink
>   .forBulkFormat(hdfs://namenode:8020/flink/test, 
> ParquetAvroWriters.forGenericRecord(schema))
>
>   .build();
>
>
> Best,
> Nick
>
>
>


Providing hdfs name node IP for streaming file sink

2020-02-28 Thread Nick Bendtner
Hi guys,
I am trying to write to hdfs from streaming file sink. Where should I
provide the IP address of the name node ? Can I provide it as a part of the
flink-config.yaml file or should I provide it like this :

final StreamingFileSink sink = StreamingFileSink
.forBulkFormat(hdfs://namenode:8020/flink/test,
ParquetAvroWriters.forGenericRecord(schema))

.build();


Best,
Nick