Re: apply with fold- and window function

2016-11-14 Thread Anchit Jatana
Hi Stephan,

I faced the similar issue, the way implemented this(though a workaround) is
by making the input to fold function i.e. the initial value to fold
symmetric to what goes into the window function.

I made the initial value to fold function a tuple with all non
required/available index values in that initial tuple as 'null'.

The idea was to have a consistent tuple pass onto both functions and let
individual functions operate/update the index of their choice in the tuple.

The last tuple i.e. returned tuple after both these operations would have
all the index values set up.

val DEFAULT_ACCUMULATOR_VALUE  = (null, List[String]())

.apply(DEFAULT_ACCUMULATOR_VALUE, 
 new MyFoldFunction(),// This operates/folds
values in index 1 i.e. the list
 new MyWindowFunction())  // This simply puts in the key
at index 0

In the end I achieve the aggregation task through fold function + element
processing through window function.

Hope this helps!

Regards,
Anchit





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/apply-with-fold-and-window-function-tp10092p10110.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Apache Flink Introduction Guide

2016-11-11 Thread Anchit Jatana
Hi Manish,

Appreciate the way you presented Apache Flink. While it's like an 'Intro' to
beginners, I would really encourage you to highlight/present some of the
groundbreaking features that flink offers towards stream processing like - 

-> Explicit handling of time with it's notion of 'Event time' + 'Expressive
and flexible windowing capabilities'

-> State management + Fault tolerance through it's light-weight
checkpointing/snapshotting mechanism

-> Performance - in terms of low latency, throughput and back-pressure
handling + comparative benchmarks with other engines in market.

Since, any person who reads the blog should not just take the text as a
'Hello World' to 'another new' technology in town but take it seriously as
to how this is better and how it beats the contemporary processing engines
so that the reader understands how important it is for him to delve deeper
into the topic and expand his knowledge about Flink which is going to be the
next most adopted engine in industry.

Regards,
Anchit




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Introduction-Guide-tp10041p10050.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink 1.1.3 | Shutting down YarnClusterClient from the client shutdown hook | happening frequently

2016-11-09 Thread Anchit Jatana
Hi All,

I'm running my flink application on YARN. It's frequently getting suspended,
though gracefully. Below is the snippet of the error, attaching full
jobmanager log to help debug. Please help me identify the cause and resolve
the issue.

Thank you

Regards,
Anchit

Error snippet:

2016-11-09 03:15:01,238 INFO  org.apache.flink.yarn.YarnClusterClient   
   
- Shutting down YarnClusterClient from the client shutdown hook
2016-11-09 03:15:01,242 INFO  org.apache.flink.yarn.YarnClusterClient   
   
- Sending shutdown request to the Application Master
2016-11-09 03:15:01,248 INFO  org.apache.flink.yarn.YarnClusterClient   
   
- Start application client.
2016-11-09 03:15:01,260 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Notification about new leader address
akka.tcp://flink@10.60.200.106:36465/user/jobmanager with session ID null.
2016-11-09 03:15:01,263 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Sending StopCluster request to JobManager.
2016-11-09 03:15:01,264 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Received address of new leader
akka.tcp://flink@10.60.200.106:36465/user/jobmanager with session ID null.
2016-11-09 03:15:01,265 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Disconnect from JobManager null.
2016-11-09 03:15:01,269 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Trying to register at JobManager
akka.tcp://flink@10.60.200.106:36465/user/jobmanager.
2016-11-09 03:15:01,279 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Successfully registered at the ResourceManager using JobManager
Actor[akka.tcp://flink@10.60.200.106:36465/user/jobmanager#918758350]
2016-11-09 03:15:02,282 INFO  org.apache.flink.yarn.ApplicationClient   
   
- Sending StopCluster request to JobManager.
2016-11-09 03:15:02,295 INFO  org.apache.flink.yarn.YarnClusterClient   
   
- Deleting files in
hdfs://ldnsns/user/a12345/.flink/application_1478099802210_11790
2016-11-09 03:15:02,319 INFO  org.apache.flink.runtime.client.JobClientActor
   
- 11/09/2016 03:15:02   Job execution switched to status SUSPENDED.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-1-3-Shutting-down-YarnClusterClient-from-the-client-shutdown-hook-happening-frequently-tp10019.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-03 Thread Anchit Jatana
Hi Maximilian,

Thanks for you response. Since, I'm running the application on YARN cluster
using 'yarn-cluster' mode i.e. using 'flink run -m yarn-cluster ..' command.
Is there anything more that I need to configure apart from setting up
'yarn.application-attempts: 10' property inside conf/flink-conf.yaml.

Just wished to confirm if there is anything more that I need to configure to
set up HA on 'yarn-cluster' mode.

Thank you

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Application-on-YARN-failed-on-losing-Job-Manager-No-recovery-Need-help-debug-the-cause-from-los-tp9839p9887.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink Application on YARN failed on losing Job Manager | No recovery | Need help debug the cause from logs

2016-11-02 Thread Anchit Jatana
Hi All,

I started my flink application on YARN using flink run -m yarn-cluster,
after running smoothly for 20 hrs it failed. Ideally the application should
have recover on losing the Job Manger (which runs in the same container as
the application master) pertaining to the fault tolerant nature of flink on
YARN but it didn't recover and failed. 

Please help me debug the logs. 

Thank you

Regards,
Anchit

Below are the logs:

2016-11-01 14:12:37,592 INFO  org.apache.flink.runtime.client.JobClientActor
   
- 11/01/2016 14:12:36   Parse & Map Record - (Visitor ID, Product List)  ->
Filtering None Objects -> Fetching Output(148/200) switched to RUNNING 
2016-11-02 10:16:42,960 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm1
2016-11-02 10:17:24,026 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm2
2016-11-02 10:17:40,882 INFO 
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm1
2016-11-02 10:24:41,964 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@10.66.245.26:47722] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-11-02 10:24:56,311 WARN  Remoting  
   
- Tried to associate with unreachable remote address
[akka.tcp://flink@10.66.245.26:47722]. Address is now gated for 5000 ms, all
messages to this address will be delivered to dead letters. Reason:
Connection refused: /10.66.245.26:47722
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Lost connection to JobManager
akka.tcp://flink@10.66.245.26:47722/user/jobmanager. Triggering connection
timeout.
2016-11-02 10:24:56,315 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Disconnect from JobManager
Actor[akka.tcp://flink@10.66.245.26:47722/user/jobmanager#1251121709].
2016-11-02 10:25:56,330 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Terminate JobClientActor.
2016-11-02 10:25:56,331 INFO  org.apache.flink.runtime.client.JobClientActor
   
- Disconnect from JobManager null.
2016-11-02 10:25:56,333 ERROR org.apache.flink.client.CliFrontend   
   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Communication with JobManager failed: Lost connection to
the JobManager.
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
at
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:204)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
at
com.tgt.prz.streaming.recs.drivers.SessionRecs2$.main(SessionRecs2.scala:126)
at 
com.tgt.prz.streaming.recs.drivers.SessionRecs2.main(SessionRecs2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:997)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:994)
at
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:994)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException:
Communication with JobManager failed: Lost connection to the JobManager.
at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:137)
at

Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-02 Thread Anchit Jatana
Hi Jamie,

Thanks for sharing your thoughts. I'll try and integrate with Graphite to
see if this gets resolved.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9838.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Anchit Jatana
I've set the metric reporting frequency to InfluxDB as 10s. In the
screenshot, I'm using Grafana query interval of 1s. I've tried 10s and more
too, the graph shape changes a bit but the incorrect negative values are
still plotted(makes no difference).

Something to add: If the subtasks are less than equal to 30, the same query
yields correct results. For subtask index > 30 (for my case being 50) it
plots junk negative and poistive values.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9819.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink on YARN - Fault Tolerance | use case supported or not

2016-11-01 Thread Anchit Jatana
Yes, thank Stephan.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-on-YARN-Fault-Tolerance-use-case-supported-or-not-tp9776p9817.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-11-01 Thread Anchit Jatana
Hi Jamie,

Thank you so much for your response. 

The below query:

SELECT derivative(sum("count"), 1s) FROM "numRecordsIn" WHERE "task_name" =
'Sink: Unnamed' AND $timeFilter GROUP BY time(1s)

behaves the same as with the use of the templating variable in the 'All'
case i.e. shows a plots of junk 'negative values'

It shows accurate results/plot when an additional where clause for
"subtask_index" is applied to the query.

But without the "subtask_index" where clause (which means for all the
subtask_indexes) it shows some junk/incorrect values on the graph (both
highly positive & highly negative values in orders of millions)

Images:


  


 

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Metrics-InfluxDB-Grafana-Help-with-query-influxDB-query-for-Grafana-to-plot-numRecordsIn-numRen-tp9775p9816.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink on YARN - Fault Tolerance | use case supported or not

2016-10-28 Thread Anchit Jatana
Hi All,

I tried testing fault tolerance in a different way(not sure if it as
appropriate way) of my running flink application. I ran the flink
application on YARN and after completing few checkpoints, killed the YARN
application using:

yarn application -kill application_1476277440022_

Further, tried restarting the application by providing the same path of the
checkpointing directory. The application started afresh and did not resume
from the last check-pointed state. Just wanted to make sure if fault
tolerance in this usecase is valid or not. If yes, what am I doing wrong?

I'm aware of the savepoint process- to create savepoint, stop the
application and resume new application from the same savepoint but wished
to check the above usecase considering the fact that for some reason if the
YARN application gets killed perhaps accidentally or due to any other
reason, is this kind of fault tolerance supported or not.


Regards,
Anchit


Flink Metrics - InfluxDB + Grafana | Help with query influxDB query for Grafana to plot 'numRecordsIn' & 'numRecordsOut' for each operator/operation

2016-10-28 Thread Anchit Jatana
Hi All,

I'm trying to plot the flink application metrics using grafana backed by
influxdb. I need to plot/monitor the 'numRecordsIn' & 'numRecordsOut' for
each operator/operation. I'm finding it hard to generate the influxdb query
in grafana which can help me make this plot.

I am able to plot the 'numRecordsIn' & 'numRecordsOut' for each
subtask(parallelism set to 50) of the operator but not the operator as a
whole.

If somebody has knowledge or has successfully implemented this kind of a
plot on grafana backed by influxdb, please share with me the process/query
to achieve the same.

Below is the query which I have to monitor the 'numRecordsIn' &
'numRecordsOut' for each subtask

SELECT derivative(sum("count"), 10s) FROM "numRecordsOut" WHERE "task_name"
= 'Source: Reading from Kafka' AND "subtask_index" =~ /^$subtask$/ AND
$timeFilter GROUP BY time(10s), "task_name"

PS: $subtask is the templating variable that I'm using in order to have
multiple subtask values. I have tried the 'All' option for this templating
variable- This give me an incorrect plot showing me negative values while
the individual selection of subtask values when selected from the
templating variable drop down yields correct result.

Thank you!

Regards,
Anchit


Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-28 Thread Anchit Jatana
Hi Aljoscha,

I am using the custom trigger with GlobalWindows window assigner. Do I still
need to override clear method and delete the ProcessingTimeTimer using-
triggerContext.deleteProcessingTimeTimer(prevTime)?

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-21 Thread Anchit Jatana
Hi Bart,

Thank you so much for sharing the approach. Looks like this solved my
problem. Here's what I have as an implementation for my use-case:

package org.apache.flink.quickstart

import org.apache.flink.api.common.state.{ ReducingState,
ReducingStateDescriptor, ValueState, ValueStateDescriptor }
import
org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
TriggerResult }
import org.apache.flink.streaming.api.windowing.windows.Window
import org.slf4j.LoggerFactory

class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
Window] {

  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {

// remove old timer
val time_state: ValueState[Option[Long]] =
triggerContext.getPartitionedState(timeState)
val time_set = time_state.value()
if (time_set.isDefined) {
  triggerContext.deleteProcessingTimeTimer(time_set.get)
}
// set new time and continue
val new_time = triggerContext.getCurrentProcessingTime +
Time.seconds(sessionPauseHours).toMilliseconds()
time_state.update(Some(new_time))
triggerContext.registerProcessingTimeTimer(new_time)
TriggerResult.FIRE
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
TriggerResult.PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
  }
}

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Kafka Consumer Behaviour

2016-10-12 Thread Anchit Jatana
Hi Janardhan/Stephan,

I just figured out what the issue is (Talking about Flink KafkaConnector08,
don't know about Flink KafkaConnector09)

The reason why- bin/kafka-consumer-groups.sh --zookeeper
 --describe --group  is not showing any result
is because of the absence of the 

/consumers//owners/ in the zookeeper. 

The flink application is creating and updating
/consumers//offsets// but not creating "owners"
Znode 

If I manually create the Znode using the following:

create /consumers//owners “firstchildren”

create /consumers//owners/ null

It works fine, bin/kafka-consumer-groups.sh --zookeeper 
--describe --group  starts pulling offset results for me.

I think this needs to be corrected in the application: to check and create
"/consumers//owners/" if it does not exist.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consumer-Behaviour-tp8257p9499.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink Kafka connector08 not updating the offsets with the zookeeper

2016-10-12 Thread Anchit Jatana
Hi Robert,

Thanks for your response. I just figured out what the issue is. 

The reason why- bin/kafka-consumer-groups.sh --zookeeper
 --describe --group  is not showing any result
is because of the absence of the 

/consumers//owners/ in the zookeeper. 

The flink application is creating and updating
/consumers//offsets// but not creating "owners"
Znode 

If I manually create the Znode using the following:

create /consumers//owners “firstchildren”

create /consumers//owners/ null

It works fine, bin/kafka-consumer-groups.sh --zookeeper 
--describe --group  starts pulling offset results for me.

I think this needs to be corrected in the application: to check and create
"/consumers//owners/" if it does not exist.

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-tp9469p9498.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink Kafka connector08 not updating the offsets with the zookeeper

2016-10-11 Thread Anchit Jatana
Hi All,

I'm using Flink Kafka connector08. I need to check/monitor the offsets of
the my flink application's kafka consumer.

When running this:

bin/kafka-consumer-groups.sh --zookeeper  --describe
--group 

I get the message: No topic available for consumer group provided. Why is
the consumer not updating the offsets with the zookeeper ?

PS: I have enabled checkpointing. Is there any configuration that I'm
missing or is this some sort of a bug?

Using Flink version 1.1.2

Thank you

Regards,
Anchit


How to Broadcast a very large model object (used in iterative scoring in recommendation system) in Flink

2016-09-30 Thread Anchit Jatana
Hi All,

I'm building a recommendation system streaming application for which I need
to broadcast a very large model object (used in iterative scoring) among
all the task managers performing the operation parallely for the operator

I'm doing an this operation in map1 of CoMapFunction. Please suggest me
some way to achieve the broadcasting of the large model variable (something
similar to what Spark has with broadcast variables).

Thank you

Regards,
Anchit


Re: How to interact with a running flink application?

2016-09-29 Thread Anchit Jatana
Hi Ufuk,

Thanks for your help, I'm working on using the suggested approach to
address my use case.

Regards,
Anchit


On Wed, Sep 28, 2016 at 12:48 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Anchit,
>
> the usual recommendation for this is to use a CoMap/CoFlatMap
> operator, where the second input are the lookup location changes. You
> can then use this input to update the location.
>
> Search for CoMap/CoFlatMap here:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.
> html#datastream-transformations
>
> Best,
>
> Ufuk
>
> On Wed, Sep 28, 2016 at 9:37 AM, Anchit Jatana
> <development.anc...@gmail.com> wrote:
> > Hi All,
> >
> > Brief: I have a use case where I need to interact with a running flink
> > application.
> >
> > Detail:
> >
> > My Flink application has a Kafka source, an operator processing on the
> > content received from the Kafka stream(this operator is using a lookup
> from
> > an external source file to accomplish the processing of the Kafka
> content).
> > If the content of the file kept at the same source location changes, I
> need
> > to notify the operator to update its lookup content loaded in the memory
> and
> > continue its processing of Kafka content with the new loaded lookup
> content
> > without stopping the Flink application.
> >
> > Is there a way where I can "interact with the running Flink Application"
> > through some event or something to notify the application to make some
> > changes in its operation without stopping the application.
> >
> > Thank you!
> >
> > Regards,
> > Anchit
>


Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation

2016-09-29 Thread Anchit Jatana
Hi All,

I'm trying to link my flink application with HBase for simple read/write
operations. I need to implement Flink to HBase the connectivity through
Kerberos using the keytab.

Can somebody share(or link me to some resource) a sample
code/implementation on how to achieve Flink to HBase connectivity through
Kerberos using keytab for simple read/write (get/put) operation.

Thank you!

Regards,
Anchit


Create stream from multiple files using "env.readFile(format, input1, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter())" ?

2016-09-28 Thread Anchit Jatana
Hi All,

I have a use case where in need to create multiple source streams from
multiple files and monitor the files for any changes using the "
FileProcessingMode.PROCESS_CONTINUOUSLY"

Intention is to achieve something like this(have a monitored stream for
each of the multiple files), something like this:

DataStream stream1 = env.readFile(format, input1,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

DataStream stream2 = env.readFile(format, input2,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

DataStream stream3= env.readFile(format, input3, FileProcessingMode.
PROCESS_CONTINUOUSLY, 1000, FilePathFilter.createDefaultFilter());

.

.

.

.

DataStream streamN = env.readFile(format, inputN,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000,
FilePathFilter.createDefaultFilter());

Since, this implementation doesn't work, can someone suggest a way how this
thing can be achieved?


PS: Main intention is to '*monitor'* all the files and stream the updated
content if any change has been made to it.


Thank you!

Regards,

Anchit