Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-24 Thread Felipe Gutierrez
yes. It will be very welcome a discussion with who knows better than me.

Basically, I am trying to implement the issue FLINK-1725 [1] that was gave
up on March 2017. Stephan Ewen said that there are more issues to be fixed
before going to this implementation and I don't really know which are them.

[1] https://issues.apache.org/jira/browse/FLINK-1725

Felipe

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Mon, Sep 23, 2019 at 3:47 PM Biao Liu  wrote:

> Wow, that's really cool! There are indeed a lot works you have done. IMO
> it's beyond the scope of user group somewhat.
>
> Just one small concern, I'm not sure I have fully understood your way of
> "tackle data skew by altering the way Flink partition keys using
> KeyedStream".
>
> From my understanding, key-group is used for rescaling job. Like
> supporting reusing state after changing the parallelism of operator.
> I'm not sure whether you are in the right direction or not. It seems that
> you are implementing something deeper than user interface. User interface
> is stable, while implementation is not. Usually it's not recommended to
> support a feature based on implementation.
>
> If you have strong reasons to change the implementation, I would suggest
> to start a discussion in dev mailing list. Maybe it could be supported
> officially. What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 23 Sep 2019 at 20:54, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>>
>> I`ve implemented a combiner [1] in Flink by extending
>> OneInputStreamOperator in Flink. I call my operator using "transform".
>> It works well and I guess it is useful if I import this operator in the
>> DataStream.java. I just need more to check if I need to touch other parts
>> of the source code.
>>
>> But now I want to tackle data skew by altering the way Flink partition
>> keys using KeyedStream.
>>
>> [1]
>> https://felipeogutierrez.blogspot.com/2019/08/implementing-dynamic-combiner-mini.html
>>
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Mon, Sep 23, 2019 at 2:37 PM Biao Liu  wrote:
>>
>>> Hi Felipe,
>>>
>>> If I understand correctly, you want to solve data skew caused by
>>> imbalanced key?
>>>
>>> There is a common strategy to solve this kind of problem,
>>> pre-aggregation. Like combiner of MapReduce.
>>> But sadly, AFAIK Flink does not support pre-aggregation currently. I'm
>>> afraid you have to implement it by yourself.
>>>
>>> For example, introducing a function caching some data (time or count
>>> based). This function should be before "keyby". And it's on a non-keyed
>>> stream. It does pre-aggregation just like what the aggregation after
>>> "keyby" does. In this way, the skewed keyed data would be reduced a lot.
>>>
>>> I also found a suggestion [1] from Fabian, although it's long time ago.
>>>
>>> Hope it helps.
>>>
>>> 1.
>>> https://stackoverflow.com/questions/47825565/apache-flink-how-can-i-compute-windows-with-local-pre-aggregation
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Mon, 23 Sep 2019 at 19:51, Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 thanks Biao,

 I see. To achieve what I want to do I need to work with KeyedStream. I
 downloaded the Flink source code to learn and alter the KeyedStream to my
 needs. I am not sure but it is a lot of work because as far as I understood
 the key-groups have to be predictable [1]. and altering this touches a lot
 of other parts of the source code.

 However, If I guarantee that they (key-groups) are predictable, I will
 be able to rebalance, rescale,  the keys to other worker-nodes.

 [1]
 https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

 Thanks,
 Felipe
 *--*
 *-- Felipe Gutierrez*

 *-- skype: felipe.o.gutierrez*
 *--* *https://felipeogutierrez.blogspot.com
 *


 On Mon, Sep 23, 2019 at 9:51 AM Biao Liu  wrote:

> Hi Felipe,
>
> Flink job graph is DAG based. It seems that you set an "edge property"
> (partitioner) several times.
> Flink does not support multiple partitioners on one edge. The later
> one overrides the priors. That means the "keyBy" overrides the "rebalance"
> and "partitionByPartial".
>
> You could insert some nodes between these partitioners to satisfy your
> requirement. For example,
> `sourceDataStream.rebalance().map(...).keyby(0).sum(1).print();`
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 16:49, Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> I am executing a data stream application which uses rebalance.
>> Basically I am counting words 

Re: Recommended approach to debug this

2019-09-24 Thread Debasish Ghosh
Well, I think I got the solution though I am not yet sure of the problem ..
The original code looked like this ..

Try {
  // from a parent class called Runner which runs a streamlet
  // run returns an abstraction which completes a Promise depending on
whether
  // the Job was successful or not
  val streamletExecution =
loadedStreamlet.streamlet.run(withPodRuntimeConfig)

  // the runner waits for the execution to complete
  // In normal circumstances it will run forever for streaming data source
unless
  // being stopped forcibly or any of the queries faces an exception
  Await.result(streamletExecution.completed, Duration.Inf)
} match { //..

and then the streamlet.run(..) in turn finally invoked the following ..

val env = StreamExecutionEnvironment.getExecutionEnvironment

// creates datastreams and read from / writes to Kafka
// I pasted the body of this earlier in the thread
buildExecutionGraph()

env.execute(..)

This DID NOT run and failed with the exception I reported earlier. But when
I change the code to get the run statement out of the Try block, things run
fine .. like this ..

// from a parent class called Runner which runs a streamlet
// run returns an abstraction which completes a Promise depending on whether
// the Job was successful or not
val streamletExecution = loadedStreamlet.streamlet.run(withPodRuntimeConfig)

Try {
  // the runner waits for the execution to complete
  // In normal circumstances it will run forever for streaming data source
unless
  // being stopped forcibly or any of the queries faces an exception
  Await.result(streamletExecution.completed, Duration.Inf)
} match { //..

Apparently it looks like the exception that I was facing earlier leaked
through the Flink engine and Try caught it and it got logged. But removing
it out of Try now enables Flink to catch it back and follow the course that
it should. But I am not sure if this is a cogent explanation and looking
forward to some more accurate one from the experts. Note there is no
asynchrony of concurrency going on here - the Runner code may look a bit
over-engineered but there is a context to this. The Runner code handles not
only Flink but other types of streaming engines as well like Spark and Akka
Streams.

regards.


On Tue, Sep 24, 2019 at 10:17 AM Biao Liu  wrote:

> Hi Zili,
>
> Thanks for pointing that out.
> I didn't realize that it's a REST API based case. Debasish's case has been
> discussed not only in this thread...
>
> It's really hard to analyze the case without the full picture.
>
> I think the reason of why `ProgramAbortException` is not caught is that he
> did something outside `env.execute`. Like executing this piece of codes
> inside a Scala future.
>
> I guess the scenario is that he is submitting job through REST API. But in
> the main method, he wraps `env.execute` with Scala future, not executing it
> directly.
> The reason of env has been set to `StreamPlanEnvironment` is
> `JarHandlerUtils` retrieves job graph through it.
> And the `ProgramAbortException` is not thrown out, because the Scala
> future tackles this exception.
> So retrieving job graph fails due to an unrecognized exception (Boxed
> Error).
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 10:44, Zili Chen  wrote:
>
>> Hi Biao,
>>
>> The log below already infers that the job was submitted via REST API and
>> I don't think it matters.
>>
>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>> getJobGraphAsync$6(JarRunHandler.java:142)
>>
>> What I don't understand it that flink DOES catch the exception at the
>> point it is reported thrown...
>>
>> Best,
>> tison.
>>
>>
>> Biao Liu  于2019年9月24日周二 上午10:34写道:
>>
>>>
>>> > We submit the code through Kubernetes Flink Operator which uses the
>>> REST API to submit the job to the Job Manager
>>>
>>> So you are submitting job through REST API, not Flink client? Could you
>>> explain more about this?
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh 
>>> wrote:
>>>
 Hi Dian -

 We submit one job through the operator. We just use the following to
 complete a promise when the job completes ..

   Try {
 createLogic.executeStreamingQueries(ctx.env)
   }.fold(
 th ⇒ completionPromise.tryFailure(th),
 _ ⇒ completionPromise.trySuccess(Dun)
   )

 If we totally do away with the promise and future stuff then we don't
 get the boxed error - only the exception reported in Caused By.

 regards.

 On Mon, Sep 23, 2019 at 10:20 PM Dian Fu  wrote:

> Hi Debasish,
>
> In which case will the exception occur? Does it occur when you submit
> one job at a time or when multiple jobs are submitted at the same time? 
> I'm
> asking this because I noticed that you used 

HBaseTableSource for SQL query errors

2019-09-24 Thread ????????
I am using the HBaseTableSource class for SQL query errors.No error outside 
Flink using HBase demo.
 My flink version is 1.8.1,use flink table  SQL API
 
 flink code show as below:
  // environment configuration
  ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
  BatchTableEnvironment tEnv = 
BatchTableEnvironment.create(env);
   String currentTableName = "table";
  //
  Configuration conf = 
HBaseConfiguration.create();
...
  conf.set("hbase.zookeeper.quorum", 
quorum);
  HBaseTableSource hSrc = new 
HBaseTableSource(conf, "table");
   hSrc.addColumn("base", "rowkey", 
String.class);
  
tEnv.registerTableSource(currentTableName, hSrc);
 
  Table res = tEnv.sqlQuery("select * from 
table");
   DataSet

Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
The key point of this case is in `PackagedProgram#callMainMethod`.
The `ProgramAbortException` is expected when executing the main method
here. This `ProgramAbortException` thrown is wrapped with
`InvocationTargetException` by Java reflection layer [1]. There is a piece
of codes handling `InvocationTargetException`.

try {
  mainMethod.invoke(null, (Object) args);
}
catch (...
catch (InvocationTargetException e) {
  Throwable exceptionInMethod = e.getTargetException();
  if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;*-->* *`ProgramAbortException`
would be caught expectedly here.*
  } else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
  } else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
  } else { *--> If I'm right, the wrapped exception (Boxed Error or
something else) change the exception, it is caught here*
throw new ProgramInvocationException("The main method caused an error:
" + exceptionInMethod.getMessage(), exceptionInMethod);
  }

The `ProgramInvocationException` is handled specially in
`OptimizerPlanEnvironment`.

try {
  prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
  throw e;   *--> The submission is failed here in this case*
}
catch (Throwable t) {
  // the invocation gets aborted with the preview plan
  if (optimizerPlan != null) {
return optimizerPlan;*--> Normally it should be
here*
  } else {
throw new ProgramInvocationException("The program caused an error: ",
t);
  } ...

1.
https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
wrote:

> Well, I think I got the solution though I am not yet sure of the problem
> .. The original code looked like this ..
>
> Try {
>   // from a parent class called Runner which runs a streamlet
>   // run returns an abstraction which completes a Promise depending on
> whether
>   // the Job was successful or not
>   val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> and then the streamlet.run(..) in turn finally invoked the following ..
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> // creates datastreams and read from / writes to Kafka
> // I pasted the body of this earlier in the thread
> buildExecutionGraph()
>
> env.execute(..)
>
> This DID NOT run and failed with the exception I reported earlier. But
> when I change the code to get the run statement out of the Try block,
> things run fine .. like this ..
>
> // from a parent class called Runner which runs a streamlet
> // run returns an abstraction which completes a Promise depending on
> whether
> // the Job was successful or not
> val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
> Try {
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> Apparently it looks like the exception that I was facing earlier leaked
> through the Flink engine and Try caught it and it got logged. But removing
> it out of Try now enables Flink to catch it back and follow the course that
> it should. But I am not sure if this is a cogent explanation and looking
> forward to some more accurate one from the experts. Note there is no
> asynchrony of concurrency going on here - the Runner code may look a bit
> over-engineered but there is a context to this. The Runner code handles not
> only Flink but other types of streaming engines as well like Spark and Akka
> Streams.
>
> regards.
>
>
> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu  wrote:
>
>> Hi Zili,
>>
>> Thanks for pointing that out.
>> I didn't realize that it's a REST API based case. Debasish's case has
>> been discussed not only in this thread...
>>
>> It's really hard to analyze the case without the full picture.
>>
>> I think the reason of why `ProgramAbortException` is not caught is that
>> he did something outside `env.execute`. Like executing this piece of codes
>> inside a Scala future.
>>
>> I guess the scenario is that he is submitting job through REST API. But
>> in the main method, he wraps `env.execute` with Scala future, not executing
>> it directly.
>> The reason of env has been set to `StreamPlanEnvironment` is
>> `JarHandlerUtils` retrieves job 

Re: How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-24 Thread Fabian Hueske
Hi,

To expand on Dian's answer.
You should not add Flink's core libraries (APIs, core, runtime, etc.) to
your fat JAR. However, connector dependencies (like Kafka, Cassandra, etc.)
should be added.

If all your jobs require the same dependencies, you can also add JAR files
to the ./lib folder of your job and taskmanager.
All JARs in the ./lib folder will be loaded into the system class loader.
Note, that this means that they are shared across all jobs running on a
taskmanager which reduces the isolation of jobs.

Best, Fabian

Am Mo., 23. Sept. 2019 um 11:37 Uhr schrieb Dian Fu :

> Hi Qi Kang,
>
> You don't need and also should not package the dependencies of Flink to
> the job jar. Only application specific dependencies are enough.
>
> Regards,
> Dian
>
> > 在 2019年9月23日,下午5:17,Qi Kang  写道:
> >
> > Hi,
> >
> > According to the documentation of Flink, it seems that fat JAR is
> recommended when submitting a Flink job. However, the Flink dependencies
> (as well as other dependencies like Hadoop) are too big in size, thus
> producing a fat JAR which exceeds 100MB. Is there some way to separate the
> 'common' dependencies and app-specific dependencies and submit a thinner
> JAR? Many thanks.
> >
> >
> >
>
>


Re: Flink job manager doesn't remove stale checkmarks

2019-09-24 Thread Clay Teeter
Oh geez,  checkmarks  = checkpoints... sorry.

What i mean by stale "checkpoints" are checkpoints that should be reaped
by: "state.checkpoints.num-retained: 3".

What is happening is that directories:
  - state.checkpoints.dir: file:///opt/ha/49/checkpoints
  - high-availability.storageDir: file:///opt/ha/49/ha
are growing with every checkpoint and i'm running out of disk space.

On Tue, Sep 24, 2019 at 4:55 AM Biao Liu  wrote:

> Hi Clay,
>
> Sorry I don't get your point. I'm not sure what the "stale checkmarks"
> exactly means. The HA storage and checkpoint directory left after shutting
> down cluster?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 03:12, Clay Teeter  wrote:
>
>> I'm trying to get my standalone cluster to remove stale checkmarks.
>>
>> The cluster is composed of a single job and task manager backed by
>> rocksdb with high availability.
>>
>> The configuration on both the job and task manager are:
>>
>> state.backend: rocksdb
>> state.checkpoints.dir: file:///opt/ha/49/checkpoints
>> state.backend.incremental: true
>> state.checkpoints.num-retained: 3
>> jobmanager.heap.size: 1024m
>> taskmanager.heap.size: 2048m
>> taskmanager.numberOfTaskSlots: 24
>> parallelism.default: 1
>> high-availability.jobmanager.port: 6123
>> high-availability.zookeeper.path.root: _49
>> high-availability: zookeeper
>> high-availability.storageDir: file:///opt/ha/49/ha
>> high-availability.zookeeper.quorum: **t:2181
>>
>> Both machines have access to /opt/ha/49 and /opt/ha/49/checkpoints via
>> NFS and are owned by the flink user.  Also, there are no errors that i can
>> find.
>>
>> Does anyone have any ideas that i could try?
>>
>>


Re: Question about reading ORC file in Flink

2019-09-24 Thread Fabian Hueske
Hi QiShu,

It might be that Flink's OrcInputFormat has a bug.
Can you open a Jira issue to report the problem?
In order to be able to fix this, we need as much information as possible.
It would be great if you could create a minimal example of an ORC file and
a program that reproduces the issue.
If that's not possible, we need the schema of an Orc file that cannot be
correctly read.

Thanks,
Fabian

Am Mo., 23. Sept. 2019 um 11:40 Uhr schrieb ShuQi :

> Hi Guys,
>
> The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS
> and the job is executed successfully, no any exception or error. But some
> fields(such as tagIndustry) are always null, actually these fields are not
> null. I can read these fields by direct reading it. Below is my code:
>
> //main
>  final ParameterTool params = ParameterTool.fromArgs(args);
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setGlobalJobParameters(params);
>
> Configuration config = new Configuration();
>
>
> OrcTableSource orcTableSource = OrcTableSource
> .builder()
> .path(params.get("input"))
> .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
> .withConfiguration(config)
> .build();
>
> DataSet dataSet = orcTableSource.getDataSet(env);
>
> DataSet> counts = dataSet.flatMap(new
> Tokenizer()).groupBy(0).sum(1);
>
> //read field
> public void flatMap(Row row, Collector> out) {
>
> String content = ((String) row.getField(6));
> String tagIndustry = ((String) row.getField(35));
>
> LOGGER.info("arity: " + row.getArity());
> LOGGER.info("content: " + content);
> LOGGER.info("tagIndustry: " + tagIndustry);
> LOGGER.info("===");
>
> if (Strings.isNullOrEmpty(content) ||
> Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
> return;
> }
> // normalize and split the line
> String[] tokens = content.toLowerCase().split("\\W+");
>
> // emit the pairs
> for (String token : tokens) {
> if (token.length() > 0) {
> out.collect(new Tuple2<>(token, 1));
> }
> }
> }
>
> Thanks for your help!
>
> QiShu
>
>
>
>
>


Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-24 Thread Tony Wei
Hi Becket,

I have read kafka source code and found that the error won't be propagated
to client if the list of
topic-partition is empty [1], because it bind the error with each
topic-partition. If this list is empty,
then that error won't be packaged into response body. That made the client
didn't get the error
message to find the newer coordinator.

Back to this problem, I think the original design of kafka client might not
prefer to execute
`enqueueNewPartitions` if there is no added topic-partition. It might be a
bug here, and we should
first check if `newPartitionsInTransaction` list is empty before executing
`enqueueNewPartitions`
function. Am I right?

If it can be confirmed as a bug, I would like to submit my patch to fix it.
Thanks for your help.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L2042

Tony Wei  於 2019年9月20日 週五 下午2:57寫道:

> Hi,
>
> I found that the source code [1] in kafka showed that it always check if
> `newPartitionsInTransaction`
> is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
> that is not
> applied to flink kafka producer code [2].
>
> I wrote a simple producer with the `flushNewPartitions` copied from flink
> kafka producer, and
> successfully reproduce this exception. Then, I modified the logic in
> `enqueueNewPartitions` to check
> if there is any `newPartitionsInTransaction` before make this request. And
> this would work well even
> if I restarted the broker who owned this transaction's coordinator, since
> the empty transaction won't
> make any request to server.
>
> The attachments are my simple producer code. Please help to verify what I
> thought is correct. Thanks.
>
> Best,
> Tony Wei
>
> [1]
> https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
> [2]
> https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273
>
> Tony Wei  於 2019年9月20日 週五 上午11:56寫道:
>
>> Hi,
>>
>> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I
>> opened
>> flink's log level to DEBUG for producer. And I found some logs from flink
>> side
>> regarding this error. Below is some log snippet.
>>
>> It seems that producer client didn't catch this error and retry to find
>> new coordinator.
>> This caused the transaction state is inconsistent between client side and
>> server side.
>> Would it be possible that the problem is caused
>> by FlinkKafkaInternalProducer using
>> java reflection to send `addPartitionsToTransactionHandler` request in
>> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
>> is familiar
>> with both kafka and flink's kafka connector could help me solve this?
>> Thanks very much.
>>
>> The attachment is my code to reproduce this problem.
>> The cluster's versions are the same as I mentioned in my first email.
>>
>> Best,
>> Tony Wei
>>
>> *flink taskmanager:*
>>
>>> 2019-09-20 02:32:45,927 INFO
>>>  
>>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>>  - Flushing new partitions
>>> 2019-09-20 02:32:45,927 DEBUG
>>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[])
>>>
>> 2019-09-20 02:32:45,931 DEBUG
>>> org.apache.kafka.clients.producer.internals.Sender- [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>>- [Producer clientId=producer-29, transactionalId=map ->
>>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
>>> with correlation id 12 to node 1
>>> 2019-09-20 02:32:45,937 DEBUG
>>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>>> clientId=producer-29, transactionalId=map -> Sink:
>>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
>>> to transaction
>>
>>
>> *kafka-broker-1:*
>>
>>>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1]
>>> Initialized transactionalId 

Re: Approach to match join streams to create unique streams.

2019-09-24 Thread Fabian Hueske
Hi,

AFAIK, Flink SQL Temporal table function joins are only supported as inner
equality joins.
An extension to left outer joins would be great, but is not on the
immediate roadmap AFAIK.

If you need the inverse, I'd recommend to implement the logic in a
DataStream program with a KeyedCoProcessFunction.

Best, Fabian

Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

>  Hi there,
>
> I've two streams source Kafka. Stream1 is a continuous data and stream2 is
> a periodic update. Stream2 contains only one column.
>
> *Use case*: Every entry from stream1 should verify if the stream2 has any
> match.
> The matched and unmatched records should be separated into new unique
> streams. For example: column1, column10 from stream1 match/unmatch check on
> stream2 column to put to a new stream safeStream and unSafeStream
> respectively.
>
> *Implemented solution*: stream2 as temporal function to join over stream1
> which is a dynamic table.
>
>- Ran a time based query where stream1.column1 = stream2.column and
>stream1.column10 = stream2.column ; Working
>
>
>- Ran a time based query where stream1.column1 <> stream1.column and
>tream1.column10 <> stream1.column ; Not working.
>
> Would like to ask if there's a possibility that I could load the stream as
> a list so I could do a *contains*? OR any other approach?
>
> Help appreciated.
>
> Thanks
> Srikanth
>
>


Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-24 Thread Stephen Connolly
I have created https://issues.apache.org/jira/browse/FLINK-14184 as a
proposal to improve Flink in this specific area.

On Tue, 24 Sep 2019 at 03:23, Zhu Zhu  wrote:

> Hi Stephen,
>
> I think disposing static components in the closing stage of a task is
> required.
> This is because your code(operators/UDFs) is part of the task, namely that
> it can only be executed when the task is not disposed.
>
> Thanks,
> Zhu Zhu
>
> Stephen Connolly  于2019年9月24日周二 上午2:13写道:
>
>> Currently the best I can see is to make *everything* a Rich... and hook
>> into the open and close methods... but feels very ugly.
>>
>>
>>
>> On Mon 23 Sep 2019 at 15:45, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> We are using a 3rd party library that allocates some resources in one of
>>> our topologies.
>>>
>>> Is there a listener or something that gets notified when the topology
>>> starts / stops running in the Task Manager's JVM?
>>>
>>> The 3rd party library uses a singleton, so I need to initialize the
>>> singleton when the first task is started on the task manager and clear out
>>> the singleton when the last task is stopped in order to allow the topology
>>> classloader to be unloadable.
>>>
>>> I had thought it could all be done from the Topology's main method, but
>>> after much head-banging we were able to identify that *when run on a
>>> distributed cluster* the main method is not invoked to start the topology
>>> for each task manager.
>>>
>> --
>> Sent from my phone
>>
>


Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Sean Hester
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
when deploying Flink jobs to start from savepoints using the job-cluster
mode in Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of
Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all
long-running streaming jobs, all essentially acting as microservices. we're
using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a
savepoint to replay recent events, i.e. when we've enhanced the job logic
or fixed a bug. but after the deployment we want to have the job resume
it's "long-running" behavior, where any unplanned restarts resume from the
latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes
deployment includes the savepoint argument in the configuration. if the Job
Manager container(s) have an unplanned restart, when they come back up they
will start from the savepoint instead of resuming from the latest
checkpoint. everything is working as configured, but that's not exactly
what we want. we want the savepoint argument to be transient somehow (only
used during the initial deployment), but Kubernetes doesn't really support
the concept of transient configuration.

i can see a couple of potential solutions that either involve custom code
in the jobs or custom logic in the container (i.e. a custom entrypoint
script that records that the configured savepoint has already been used in
a file on a persistent volume or GCS, and potentially when/why/by which
deployment). but these seem like unexpected and hacky solutions. before we
head down that road i wanted to ask:

   - is this is already a solved problem that i've missed?
   - is this issue already on the community's radar?

thanks in advance!

-- 
*Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
 
*Altitude 2019 in San Francisco | Sept. 23 - 25*
It’s not just an IT conference, it’s “a complete learning and networking
experience”



Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Yuval Itzchakov
AFAIK there's currently nothing implemented to solve this problem, but
working on a possible fix can be implemented on top of
https://github.com/lyft/flinkk8soperator which already has a pretty fancy
state machine for rolling upgrades. I'd love to be involved as this is an
issue I've been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
wrote:

> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
> when deploying Flink jobs to start from savepoints using the job-cluster
> mode in Kubernetes.
>
> we're running a ~15 different jobs, all in job-cluster mode, using a mix
> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
> all long-running streaming jobs, all essentially acting as microservices.
> we're using Helm charts to configure all of our deployments.
>
> we have a number of use cases where we want to restart jobs from a
> savepoint to replay recent events, i.e. when we've enhanced the job logic
> or fixed a bug. but after the deployment we want to have the job resume
> it's "long-running" behavior, where any unplanned restarts resume from the
> latest checkpoint.
>
> the issue we run into is that any obvious/standard/idiomatic Kubernetes
> deployment includes the savepoint argument in the configuration. if the Job
> Manager container(s) have an unplanned restart, when they come back up they
> will start from the savepoint instead of resuming from the latest
> checkpoint. everything is working as configured, but that's not exactly
> what we want. we want the savepoint argument to be transient somehow (only
> used during the initial deployment), but Kubernetes doesn't really support
> the concept of transient configuration.
>
> i can see a couple of potential solutions that either involve custom code
> in the jobs or custom logic in the container (i.e. a custom entrypoint
> script that records that the configured savepoint has already been used in
> a file on a persistent volume or GCS, and potentially when/why/by which
> deployment). but these seem like unexpected and hacky solutions. before we
> head down that road i wanted to ask:
>
>- is this is already a solved problem that i've missed?
>- is this issue already on the community's radar?
>
> thanks in advance!
>
> --
> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>  
> *Altitude 2019 in San Francisco | Sept. 23 - 25*
> It’s not just an IT conference, it’s “a complete learning and networking
> experience”
> 
>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: Recommended approach to debug this

2019-09-24 Thread Zili Chen
Actually there is an ongoing client API refactoring on this stuff[1] and
one of the main purpose is
eliminating hijacking env.execute...

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E


Biao Liu  于2019年9月24日周二 下午7:12写道:

> So I believe (I did't test it) the solution for this case is keeping the
> original exception thrown from `env.execute()` and throwing this exception
> out of main method.
> It's a bit tricky, maybe we could have a better design of this scenario.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 18:55, Biao Liu  wrote:
>
>> The key point of this case is in `PackagedProgram#callMainMethod`.
>> The `ProgramAbortException` is expected when executing the main method
>> here. This `ProgramAbortException` thrown is wrapped with
>> `InvocationTargetException` by Java reflection layer [1]. There is a piece
>> of codes handling `InvocationTargetException`.
>>
>> try {
>>   mainMethod.invoke(null, (Object) args);
>> }
>> catch (...
>> catch (InvocationTargetException e) {
>>   Throwable exceptionInMethod = e.getTargetException();
>>   if (exceptionInMethod instanceof Error) {
>> throw (Error) exceptionInMethod;*-->* 
>> *`ProgramAbortException`
>> would be caught expectedly here.*
>>   } else if (exceptionInMethod instanceof
>> ProgramParametrizationException) {
>> throw (ProgramParametrizationException) exceptionInMethod;
>>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>> throw (ProgramInvocationException) exceptionInMethod;
>>   } else { *--> If I'm right, the wrapped exception (Boxed Error
>> or something else) change the exception, it is caught here*
>> throw new ProgramInvocationException("The main method caused an
>> error: " + exceptionInMethod.getMessage(), exceptionInMethod);
>>   }
>>
>> The `ProgramInvocationException` is handled specially in
>> `OptimizerPlanEnvironment`.
>>
>> try {
>>   prog.invokeInteractiveModeForExecution();
>> }
>> catch (ProgramInvocationException e) {
>>   throw e;   *--> The submission is failed here in this case*
>> }
>> catch (Throwable t) {
>>   // the invocation gets aborted with the preview plan
>>   if (optimizerPlan != null) {
>> return optimizerPlan;*--> Normally it should
>> be here*
>>   } else {
>> throw new ProgramInvocationException("The program caused an error: ",
>> t);
>>   } ...
>>
>> 1.
>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
>> wrote:
>>
>>> Well, I think I got the solution though I am not yet sure of the problem
>>> .. The original code looked like this ..
>>>
>>> Try {
>>>   // from a parent class called Runner which runs a streamlet
>>>   // run returns an abstraction which completes a Promise depending on
>>> whether
>>>   // the Job was successful or not
>>>   val streamletExecution =
>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>
>>>   // the runner waits for the execution to complete
>>>   // In normal circumstances it will run forever for streaming data
>>> source unless
>>>   // being stopped forcibly or any of the queries faces an exception
>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>> } match { //..
>>>
>>> and then the streamlet.run(..) in turn finally invoked the following ..
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> // creates datastreams and read from / writes to Kafka
>>> // I pasted the body of this earlier in the thread
>>> buildExecutionGraph()
>>>
>>> env.execute(..)
>>>
>>> This DID NOT run and failed with the exception I reported earlier. But
>>> when I change the code to get the run statement out of the Try block,
>>> things run fine .. like this ..
>>>
>>> // from a parent class called Runner which runs a streamlet
>>> // run returns an abstraction which completes a Promise depending on
>>> whether
>>> // the Job was successful or not
>>> val streamletExecution =
>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>
>>> Try {
>>>   // the runner waits for the execution to complete
>>>   // In normal circumstances it will run forever for streaming data
>>> source unless
>>>   // being stopped forcibly or any of the queries faces an exception
>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>> } match { //..
>>>
>>> Apparently it looks like the exception that I was facing earlier leaked
>>> through the Flink engine and Try caught it and it got logged. But removing
>>> it out of Try now enables Flink to catch it back and follow the course that
>>> it should. But I am not sure if this is a cogent explanation and looking
>>> forward to some more accurate one from the experts. Note there is no
>>> asynchrony of concurrency going on here - the Runner code may look a bit
>>> 

RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Subramanyam Ramanathan
Hi,

Thank you.
I think the takeaway for us is that we need to make sure that the threads are 
stopped in the close() method.

With regard to FLINK-10455, I see that the fix versions say : 1.5.6, 1.7.0, 
1.7.3, 1.8.1, 1.9.0

However, I’m unable to find 1.7.3 in the downloads 
page(https://flink.apache.org/downloads.html). Is it yet to be released, or 
perhaps I am not looking in the right place ?
We’re currently using 1.7.2. Could you please let me know what is the minimal 
upgrade for me to consume the fix for FLINK-10455 ?

Thanks,
Subbu

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Monday, September 23, 2019 1:54 PM
To: Subramanyam Ramanathan 
Cc: Zhu Zhu ; user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subbu,

The issue you encountered is very similar to the issue which has been fixed in 
FLINK-10455 [1]. Could you check if that fix could solve your problem? The root 
cause for that issue is that the method close() has not closed all things. 
After the method "close()" is called, the classloader (URLClassloader) will be 
closed. If there is thread still running after "close()" method is called, it 
may access the classes in user provided jars. However, as the URLClassloader 
has already been closed, NoClassDefFoundError will be thrown.

Regards,
Dian

[1] https://issues.apache.org/jira/browse/FLINK-10455

在 2019年9月23日,下午2:50,Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 写道:

Hi,

I was able to simulate the issue again and understand the cause a little better.

The issue occurs when :
-One of the RichMapFunction transformations uses a third party library 
in the open() method that spawns a thread.
-The thread doesn’t get properly closed in the close() method.
-Once the job starts failing, we start seeing a NoClassDefFound error 
from that thread.

I understand that cleanup should be done in the close() method. However, just 
wanted to know, do we have some kind of a configuration setting  which would 
help us clean up such threads ?
I can attach the code if required.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Friday, August 9, 2019 7:43 AM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

Could you share more information? including:
1. the URL pattern
2. the detailed exception and the log around it
3. the cluster the job is running on, e.g. standalone, yarn, k8s
4. it's session mode or per job mode

This information would be helpful to identify the failure cause.

Thanks,
Zhu Zhu











Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年8月9日周五 上午1:45写道:

Hello,

I'm currently using flink 1.7.2.

I'm trying to run a job that's submitted programmatically using the 
ClusterClient API.
   public JobSubmissionResult run(PackagedProgram prog, int 
parallelism)


The job makes use of some jars which I add to the packaged program through the 
Packaged constructor, along with the Jar file.
   public PackagedProgram(File jarFile, List classpaths, String... args)
Normally, This works perfectly and the job runs fine.

However, if there's an error in the job, and the job goes into failing state 
and when it's continously  trying to restart the job for an hour or so, I 
notice a NoClassDefFoundError for some classes in the jars that I load using 
the URL class loader and the job never recovers after that, even if the root 
cause of the issue was fixed (I had a kafka source/sink in my job, and kafka 
was down temporarily, and was brought up after that).
The jar is still available at the path referenced by the url classloader and is 
not tampered with.

Could anyone please give me some pointers with regard to the reason why this 
could happen/what I could be missing here/how can I debug further ?

thanks
Subbu



Re: How do I create a temporal function using Flink Clinet SQL?

2019-09-24 Thread Fabian Hueske
Hi,

It's not possible to create a temporal table function from SQL, but you can
define it in the config.yaml of the SQL client as described in the
documentation [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sqlClient.html#temporal-tables

Am Di., 24. Sept. 2019 um 16:49 Uhr schrieb srikanth flink <
flink.d...@gmail.com>:

> Hi,
>
> I'm running time based joins, dynamic table over temporal function.
> Is there a way I could create temporal table using flink SQL. And I'm
> using v1.9.
>
> Thanks
> Srikanth
>


RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Subramanyam Ramanathan
Hi Zhu,

We also use FlinkKafkaProducer(011), hence I felt this fix would also be needed 
for us.

I agree that the fix for the issue I had originally mentioned would not be 
fixed by this, but I felt that I should be consuming this fix also.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Tuesday, September 24, 2019 6:13 PM
To: Subramanyam Ramanathan 
Cc: Dian Fu ; user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

I think you do not need the fix in FLINK-10455 which is for Kafka only. It's 
just a similar issue as you met.
As you said, we need to make sure that the operator/UDF spawned threads are 
stopped in the close() method. In this way, we can avoid the thread to throw 
NoClassDefFoundError due to the class loader gets closed.

Thanks,
Zhu Zhu


Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年9月24日周二 下午8:07写道:
Hi,

Thank you.
I think the takeaway for us is that we need to make sure that the threads are 
stopped in the close() method.

With regard to FLINK-10455, I see that the fix versions say : 1.5.6, 1.7.0, 
1.7.3, 1.8.1, 1.9.0

However, I’m unable to find 1.7.3 in the downloads 
page(https://flink.apache.org/downloads.html). Is it yet to be released, or 
perhaps I am not looking in the right place ?
We’re currently using 1.7.2. Could you please let me know what is the minimal 
upgrade for me to consume the fix for FLINK-10455 ?

Thanks,
Subbu

From: Dian Fu [mailto:dian0511...@gmail.com]
Sent: Monday, September 23, 2019 1:54 PM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: Zhu Zhu mailto:reed...@gmail.com>>; 
user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subbu,

The issue you encountered is very similar to the issue which has been fixed in 
FLINK-10455 [1]. Could you check if that fix could solve your problem? The root 
cause for that issue is that the method close() has not closed all things. 
After the method "close()" is called, the classloader (URLClassloader) will be 
closed. If there is thread still running after "close()" method is called, it 
may access the classes in user provided jars. However, as the URLClassloader 
has already been closed, NoClassDefFoundError will be thrown.

Regards,
Dian

[1] https://issues.apache.org/jira/browse/FLINK-10455

在 2019年9月23日,下午2:50,Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 写道:

Hi,

I was able to simulate the issue again and understand the cause a little better.

The issue occurs when :
-One of the RichMapFunction transformations uses a third party library 
in the open() method that spawns a thread.
-The thread doesn’t get properly closed in the close() method.
-Once the job starts failing, we start seeing a NoClassDefFound error 
from that thread.

I understand that cleanup should be done in the close() method. However, just 
wanted to know, do we have some kind of a configuration setting  which would 
help us clean up such threads ?
I can attach the code if required.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Friday, August 9, 2019 7:43 AM
To: Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
Cc: user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

Could you share more information? including:
1. the URL pattern
2. the detailed exception and the log around it
3. the cluster the job is running on, e.g. standalone, yarn, k8s
4. it's session mode or per job mode

This information would be helpful to identify the failure cause.

Thanks,
Zhu Zhu











Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年8月9日周五 上午1:45写道:

Hello,

I'm currently using flink 1.7.2.

I'm trying to run a job that's submitted programmatically using the 
ClusterClient API.
   public JobSubmissionResult run(PackagedProgram prog, int 
parallelism)


The job makes use of some jars which I add to the packaged program through the 
Packaged constructor, along with the Jar file.
   public PackagedProgram(File jarFile, List classpaths, String... args)
Normally, This works perfectly and the job runs fine.

However, if there's an error in the job, and the job goes into failing state 
and when it's continously  trying to restart the job for an hour or so, I 
notice a NoClassDefFoundError for some classes in the jars that I load using 
the URL class loader and the job never recovers after that, even if the root 
cause of the issue was fixed (I had a kafka source/sink in my job, and kafka 
was down temporarily, and was brought up after that).
The jar is still available at the path referenced by the url classloader and is 
not tampered with.

Could anyone please give me some 

Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread Richard Deurwaarder
Hello,

We have our flink job (1.8.0) running on our hadoop 2.7 cluster with yarn.
We would like to add the GCS connector to use GCS rather than HDFS.
Following the documentation of the GCS connector[1] we have to specify
which credentials we want to use and there are two ways of doing this:
  * Edit core-site.xml
  * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS

Because we're on a company shared hadoop cluster we do not want to change
the cluster wide core-site.xml.

This leaves me with two options:

1. Create a custom core-site.xml and use --yarnship to send it to all the
taskmanager contains. If I do this, to what value should I set
fs.hdfs.hadoopconf[2] in flink-conf ?
2. The second option would be to set an environment variable, however
because the taskmanagers are started via yarn I'm having trouble figuring
out how to make sure this environment variable is set for each yarn
container / taskmanager.

I would appreciate any help you can provide.

Thank you,

Richard

[1]
https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs


How do I create a temporal function using Flink Clinet SQL?

2019-09-24 Thread srikanth flink
Hi,

I'm running time based joins, dynamic table over temporal function.
Is there a way I could create temporal table using flink SQL. And I'm using
v1.9.

Thanks
Srikanth


Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
So I believe (I did't test it) the solution for this case is keeping the
original exception thrown from `env.execute()` and throwing this exception
out of main method.
It's a bit tricky, maybe we could have a better design of this scenario.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 18:55, Biao Liu  wrote:

> The key point of this case is in `PackagedProgram#callMainMethod`.
> The `ProgramAbortException` is expected when executing the main method
> here. This `ProgramAbortException` thrown is wrapped with
> `InvocationTargetException` by Java reflection layer [1]. There is a piece
> of codes handling `InvocationTargetException`.
>
> try {
>   mainMethod.invoke(null, (Object) args);
> }
> catch (...
> catch (InvocationTargetException e) {
>   Throwable exceptionInMethod = e.getTargetException();
>   if (exceptionInMethod instanceof Error) {
> throw (Error) exceptionInMethod;*-->* *`ProgramAbortException`
> would be caught expectedly here.*
>   } else if (exceptionInMethod instanceof ProgramParametrizationException)
> {
> throw (ProgramParametrizationException) exceptionInMethod;
>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
> throw (ProgramInvocationException) exceptionInMethod;
>   } else { *--> If I'm right, the wrapped exception (Boxed Error
> or something else) change the exception, it is caught here*
> throw new ProgramInvocationException("The main method caused an error:
> " + exceptionInMethod.getMessage(), exceptionInMethod);
>   }
>
> The `ProgramInvocationException` is handled specially in
> `OptimizerPlanEnvironment`.
>
> try {
>   prog.invokeInteractiveModeForExecution();
> }
> catch (ProgramInvocationException e) {
>   throw e;   *--> The submission is failed here in this case*
> }
> catch (Throwable t) {
>   // the invocation gets aborted with the preview plan
>   if (optimizerPlan != null) {
> return optimizerPlan;*--> Normally it should
> be here*
>   } else {
> throw new ProgramInvocationException("The program caused an error: ",
> t);
>   } ...
>
> 1.
> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
> wrote:
>
>> Well, I think I got the solution though I am not yet sure of the problem
>> .. The original code looked like this ..
>>
>> Try {
>>   // from a parent class called Runner which runs a streamlet
>>   // run returns an abstraction which completes a Promise depending on
>> whether
>>   // the Job was successful or not
>>   val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> and then the streamlet.run(..) in turn finally invoked the following ..
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> // creates datastreams and read from / writes to Kafka
>> // I pasted the body of this earlier in the thread
>> buildExecutionGraph()
>>
>> env.execute(..)
>>
>> This DID NOT run and failed with the exception I reported earlier. But
>> when I change the code to get the run statement out of the Try block,
>> things run fine .. like this ..
>>
>> // from a parent class called Runner which runs a streamlet
>> // run returns an abstraction which completes a Promise depending on
>> whether
>> // the Job was successful or not
>> val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>> Try {
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> Apparently it looks like the exception that I was facing earlier leaked
>> through the Flink engine and Try caught it and it got logged. But removing
>> it out of Try now enables Flink to catch it back and follow the course that
>> it should. But I am not sure if this is a cogent explanation and looking
>> forward to some more accurate one from the experts. Note there is no
>> asynchrony of concurrency going on here - the Runner code may look a bit
>> over-engineered but there is a context to this. The Runner code handles not
>> only Flink but other types of streaming engines as well like Spark and Akka
>> Streams.
>>
>> regards.
>>
>>
>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu  wrote:
>>
>>> Hi Zili,
>>>
>>> Thanks for pointing that out.
>>> I didn't realize that it's a REST API based case. Debasish's case has
>>> been discussed not only in this thread...
>>>
>>> It's really hard to analyze 

??????HBaseTableSource for SQL query errors

2019-09-24 Thread ????????
The following exception was thrown in the MiniCluster.executeJobBlocking method 
via the debug source code.
 
 akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/dispatcher#997865675]] after [1 ms]. Sender[null] 
sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
 
 https://issues.apache.org/jira/browse/FLINK-8485
  May be a previous bug, how can we solve it?

 

 ----
  ??:""<2463...@qq.com;
 :2019??9??24??(??) 4:04
 ??:"user"

Re: Have trouble on running flink

2019-09-24 Thread Biao Liu
Hi Russell,

I don't think `BackendBuildingException` is root cause. In your case, this
exception appears when task is under cancelling.

Have you ever checked the log of yarn node manager? There should be an exit
code of container. Even more the container is probably killed by yarn node
manager.

BTW, I think we should discuss this in flink-user mailing list, not dev
mailing list. Will forward this mail there.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 19:19, Russell Bie  wrote:

> Hi Flink team,
>
> I am trying to submit flink job (version 1.8.2) with RocksDB backend to my
> own yarn cluster (hadoop version 2.6.0-cdh5.7.3), the job always failed
> after running for a few hours with the connection loss of some
> taskmanagers. Here<
> https://stackoverflow.com/questions/58046847/ioexception-when-taskmanager-restored-from-rocksdb-state-in-hdfs>
> is the question details on the stackoverflow. I am just wondering if you
> could provide some advice on this issue?
>
> Thanks,
> Russell
>
>


Re: Recommended approach to debug this

2019-09-24 Thread Biao Liu
Hi Zili,

Great to hear that!
Hope to see the new client soon!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 19:23, Zili Chen  wrote:

> Actually there is an ongoing client API refactoring on this stuff[1] and
> one of the main purpose is
> eliminating hijacking env.execute...
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>
>
> Biao Liu  于2019年9月24日周二 下午7:12写道:
>
>> So I believe (I did't test it) the solution for this case is keeping the
>> original exception thrown from `env.execute()` and throwing this exception
>> out of main method.
>> It's a bit tricky, maybe we could have a better design of this scenario.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 18:55, Biao Liu  wrote:
>>
>>> The key point of this case is in `PackagedProgram#callMainMethod`.
>>> The `ProgramAbortException` is expected when executing the main method
>>> here. This `ProgramAbortException` thrown is wrapped with
>>> `InvocationTargetException` by Java reflection layer [1]. There is a piece
>>> of codes handling `InvocationTargetException`.
>>>
>>> try {
>>>   mainMethod.invoke(null, (Object) args);
>>> }
>>> catch (...
>>> catch (InvocationTargetException e) {
>>>   Throwable exceptionInMethod = e.getTargetException();
>>>   if (exceptionInMethod instanceof Error) {
>>> throw (Error) exceptionInMethod;*-->* 
>>> *`ProgramAbortException`
>>> would be caught expectedly here.*
>>>   } else if (exceptionInMethod instanceof
>>> ProgramParametrizationException) {
>>> throw (ProgramParametrizationException) exceptionInMethod;
>>>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>>> throw (ProgramInvocationException) exceptionInMethod;
>>>   } else { *--> If I'm right, the wrapped exception (Boxed
>>> Error or something else) change the exception, it is caught here*
>>> throw new ProgramInvocationException("The main method caused an
>>> error: " + exceptionInMethod.getMessage(), exceptionInMethod);
>>>   }
>>>
>>> The `ProgramInvocationException` is handled specially in
>>> `OptimizerPlanEnvironment`.
>>>
>>> try {
>>>   prog.invokeInteractiveModeForExecution();
>>> }
>>> catch (ProgramInvocationException e) {
>>>   throw e;   *--> The submission is failed here in this case*
>>> }
>>> catch (Throwable t) {
>>>   // the invocation gets aborted with the preview plan
>>>   if (optimizerPlan != null) {
>>> return optimizerPlan;*--> Normally it
>>> should be here*
>>>   } else {
>>> throw new ProgramInvocationException("The program caused an error:
>>> ", t);
>>>   } ...
>>>
>>> 1.
>>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh 
>>> wrote:
>>>
 Well, I think I got the solution though I am not yet sure of the
 problem .. The original code looked like this ..

 Try {
   // from a parent class called Runner which runs a streamlet
   // run returns an abstraction which completes a Promise depending on
 whether
   // the Job was successful or not
   val streamletExecution =
 loadedStreamlet.streamlet.run(withPodRuntimeConfig)

   // the runner waits for the execution to complete
   // In normal circumstances it will run forever for streaming data
 source unless
   // being stopped forcibly or any of the queries faces an exception
   Await.result(streamletExecution.completed, Duration.Inf)
 } match { //..

 and then the streamlet.run(..) in turn finally invoked the following ..

 val env = StreamExecutionEnvironment.getExecutionEnvironment

 // creates datastreams and read from / writes to Kafka
 // I pasted the body of this earlier in the thread
 buildExecutionGraph()

 env.execute(..)

 This DID NOT run and failed with the exception I reported earlier. But
 when I change the code to get the run statement out of the Try block,
 things run fine .. like this ..

 // from a parent class called Runner which runs a streamlet
 // run returns an abstraction which completes a Promise depending on
 whether
 // the Job was successful or not
 val streamletExecution =
 loadedStreamlet.streamlet.run(withPodRuntimeConfig)

 Try {
   // the runner waits for the execution to complete
   // In normal circumstances it will run forever for streaming data
 source unless
   // being stopped forcibly or any of the queries faces an exception
   Await.result(streamletExecution.completed, Duration.Inf)
 } match { //..

 Apparently it looks like the exception that I was facing earlier leaked
 through the Flink engine and Try caught it and it got logged. But removing
 it out of Try now enables Flink 

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Zhu Zhu
Hi Subramanyam,

I think you do not need the fix in FLINK-10455 which is for Kafka only.
It's just a similar issue as you met.
As you said, we need to make sure that the operator/UDF spawned threads are
stopped in the close() method. In this way, we can avoid the thread to
throw NoClassDefFoundError due to the class loader gets closed.

Thanks,
Zhu Zhu


Subramanyam Ramanathan 
于2019年9月24日周二 下午8:07写道:

> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> Could you share more information? including:
>
> 1. the URL pattern
>
> 2. the detailed exception and the log around it
>
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
>
> 4. it's session mode or per job mode
>
>
>
> This information would be helpful to identify the failure cause.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Subramanyam Ramanathan  于2019年8月9
> 日周五 上午1:45写道:
>
>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
>
> I'm trying to run a job that's submitted programmatically using the
> ClusterClient API.
>
>public JobSubmissionResult run(PackagedProgram prog, int
> parallelism)
>
>
>
>
>
> The job makes use of some jars which I add to the packaged program through
> the Packaged constructor, along with the Jar file.
>
>public PackagedProgram(File jarFile, List classpaths, String...
> args)
>
> Normally, This works perfectly and the job runs fine.
>
>
>
> However, if there's an error in the job, and the job goes into failing
> state and when it's continously  trying to restart the job for an hour or
> so, I notice a NoClassDefFoundError for some classes in the jars that I
> load using the URL class loader and the job never recovers after that, even
> if the root cause of the issue was fixed (I had a kafka source/sink in my
> job, and kafka was down temporarily, and was brought up after that).
>
> The jar is still available at the path referenced by the url classloader
> and is not tampered with.
>
>
>
> Could anyone please give me some pointers with regard to the reason why
> this could happen/what I could be missing here/how can I debug further ?
>
>
>
> thanks
>
> Subbu
>
>
>


Re: Approach to match join streams to create unique streams.

2019-09-24 Thread srikanth flink
Fabian,

Thanks, already implemented the left join.

Srikanth

On Tue, Sep 24, 2019 at 2:12 PM Fabian Hueske  wrote:

> Hi,
>
> AFAIK, Flink SQL Temporal table function joins are only supported as inner
> equality joins.
> An extension to left outer joins would be great, but is not on the
> immediate roadmap AFAIK.
>
> If you need the inverse, I'd recommend to implement the logic in a
> DataStream program with a KeyedCoProcessFunction.
>
> Best, Fabian
>
> Am Mo., 23. Sept. 2019 um 13:04 Uhr schrieb srikanth flink <
> flink.d...@gmail.com>:
>
>>  Hi there,
>>
>> I've two streams source Kafka. Stream1 is a continuous data and stream2
>> is a periodic update. Stream2 contains only one column.
>>
>> *Use case*: Every entry from stream1 should verify if the stream2 has
>> any match.
>> The matched and unmatched records should be separated into new unique
>> streams. For example: column1, column10 from stream1 match/unmatch check on
>> stream2 column to put to a new stream safeStream and unSafeStream
>> respectively.
>>
>> *Implemented solution*: stream2 as temporal function to join over
>> stream1 which is a dynamic table.
>>
>>- Ran a time based query where stream1.column1 = stream2.column and
>>stream1.column10 = stream2.column ; Working
>>
>>
>>- Ran a time based query where stream1.column1 <> stream1.column and
>>tream1.column10 <> stream1.column ; Not working.
>>
>> Would like to ask if there's a possibility that I could load the stream
>> as a list so I could do a *contains*? OR any other approach?
>>
>> Help appreciated.
>>
>> Thanks
>> Srikanth
>>
>>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
We always make a savepoint before we shutdown the job-cluster. So the
savepoint is always the latest. When we fix a bug or change the job graph,
it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start
from checkpoint after a bug fix.
>From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:

> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator
>  which already
> has a pretty fancy state machine for rolling upgrades. I'd love to be
> involved as this is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> wrote:
>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>- is this is already a solved problem that i've missed?
>>- is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> 
>> 
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience” 
>> 
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread Peter Huang
Hi Richard,

For the first question, I don't think you need to explicitly specify
fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
local resource for containers. The configuration path is
overridden internally in Flink.

For the second question of setting TM environment variables, please use
these two configurations in your flink conf.

/**
 * Prefix for passing custom environment variables to Flink's master process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the
AppMaster, set:
 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
public static final String CONTAINERIZED_MASTER_ENV_PREFIX =
"containerized.master.env.";

/**
 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this
configuration prefix allows
 * setting custom environment variables for the workers (TaskManagers).
 */
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX =
"containerized.taskmanager.env.";



Best Regards

Peter Huang




On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder  wrote:

> Hello,
>
> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with yarn.
> We would like to add the GCS connector to use GCS rather than HDFS.
> Following the documentation of the GCS connector[1] we have to specify
> which credentials we want to use and there are two ways of doing this:
>   * Edit core-site.xml
>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>
> Because we're on a company shared hadoop cluster we do not want to change
> the cluster wide core-site.xml.
>
> This leaves me with two options:
>
> 1. Create a custom core-site.xml and use --yarnship to send it to all the
> taskmanager contains. If I do this, to what value should I set
> fs.hdfs.hadoopconf[2] in flink-conf ?
> 2. The second option would be to set an environment variable, however
> because the taskmanagers are started via yarn I'm having trouble figuring
> out how to make sure this environment variable is set for each yarn
> container / taskmanager.
>
> I would appreciate any help you can provide.
>
> Thank you,
>
> Richard
>
> [1]
> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread bupt_ljy
Hi Richard,
You can use dynamic properties to add your environmental variables.


Set jobmanager env:
e.g. -Dcontainerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz


Set taskmanager env:
e.g. -Dcontainerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz


Best Regards,
Jiayi Liao


 Original Message 
Sender: Richard Deurwaarder
Recipient: user
Date: Tuesday, Sep 24, 2019 23:01
Subject: Setting environment variables of the taskmanagers (yarn)


Hello,


We have our flink job (1.8.0) running on our hadoop 2.7 cluster with yarn. We 
would like to add the GCS connector to use GCS rather than HDFS. 
Following the documentation of the GCS connector[1] we have to specify which 
credentials we want to use and there are two ways of doing this:
  * Edit core-site.xml
  * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS


Because we're on a company shared hadoop cluster we do not want to change the 
cluster wide core-site.xml. 


This leaves me with two options:


1. Create a custom core-site.xml and use --yarnship to send it to all the 
taskmanager contains. If I do this, to what value should I set 
fs.hdfs.hadoopconf[2] in flink-conf ?
2. The second option would be to set an environment variable, however because 
the taskmanagers are started via yarn I'm having trouble figuring out how to 
make sure this environment variable is set for each yarn container / 
taskmanager.


I would appreciate any help you can provide.


Thank you,


Richard


[1] 
https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Zhu Zhu
Hi Subramanyam,

I checked the commits.
There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0
contain both of them.

Thanks,
Zhu Zhu

Subramanyam Ramanathan 
于2019年9月24日周二 下午11:02写道:

> Hi Zhu,
>
>
>
> We also use FlinkKafkaProducer(011), hence I felt this fix would also be
> needed for us.
>
>
>
> I agree that the fix for the issue I had originally mentioned would not be
> fixed by this, but I felt that I should be consuming this fix also.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 6:13 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I think you do not need the fix in FLINK-10455 which is for Kafka only.
> It's just a similar issue as you met.
>
> As you said, we need to make sure that the operator/UDF spawned threads
> are stopped in the close() method. In this way, we can avoid the thread to
> throw NoClassDefFoundError due to the class loader gets closed.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午8:07写道:
>
> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> Could you share more information? including:
>
> 1. the URL pattern
>
> 2. the detailed exception and the log around it
>
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
>
> 4. it's session mode or per job mode
>
>
>
> This information would be helpful to identify the failure cause.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Subramanyam Ramanathan  于2019年8月9
> 日周五 上午1:45写道:
>
>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
>
> I'm trying to run a job that's submitted programmatically using the
> ClusterClient API.
>
>public JobSubmissionResult run(PackagedProgram prog, int
> parallelism)
>
>
>
>
>
> The job makes use of some jars which I add to the packaged program through
> the Packaged constructor, along with the Jar file.
>
>public PackagedProgram(File jarFile, List classpaths, String...
> args)
>
> Normally, This works perfectly and the job runs fine.
>
>
>
> However, if there's an error in the job, and the job goes into failing
> state and when it's continously  trying to restart the job for an hour or
> so, I notice a NoClassDefFoundError for some classes in the jars that I
> load using the 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Yuval Itzchakov
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and
they come back up from the latest savepoint which might be stale by that
time.

On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:

> We always make a savepoint before we shutdown the job-cluster. So the
> savepoint is always the latest. When we fix a bug or change the job graph,
> it can resume well.
> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
> uncaught exception, etc.
>
> Maybe I do not understand your use case well, I do not see a need to start
> from checkpoint after a bug fix.
> From what I know, currently you can use checkpoint as a savepoint as well
>
> Hao Sun
>
>
> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:
>
>> AFAIK there's currently nothing implemented to solve this problem, but
>> working on a possible fix can be implemented on top of
>> https://github.com/lyft/flinkk8soperator which already has a pretty
>> fancy state machine for rolling upgrades. I'd love to be involved as this
>> is an issue I've been thinking about as well.
>>
>> Yuval
>>
>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
>> wrote:
>>
>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>>> when deploying Flink jobs to start from savepoints using the job-cluster
>>> mode in Kubernetes.
>>>
>>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>>> all long-running streaming jobs, all essentially acting as microservices.
>>> we're using Helm charts to configure all of our deployments.
>>>
>>> we have a number of use cases where we want to restart jobs from a
>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>> or fixed a bug. but after the deployment we want to have the job resume
>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>> latest checkpoint.
>>>
>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>> deployment includes the savepoint argument in the configuration. if the Job
>>> Manager container(s) have an unplanned restart, when they come back up they
>>> will start from the savepoint instead of resuming from the latest
>>> checkpoint. everything is working as configured, but that's not exactly
>>> what we want. we want the savepoint argument to be transient somehow (only
>>> used during the initial deployment), but Kubernetes doesn't really support
>>> the concept of transient configuration.
>>>
>>> i can see a couple of potential solutions that either involve custom
>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>> script that records that the configured savepoint has already been used in
>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>> deployment). but these seem like unexpected and hacky solutions. before we
>>> head down that road i wanted to ask:
>>>
>>>- is this is already a solved problem that i've missed?
>>>- is this issue already on the community's radar?
>>>
>>> thanks in advance!
>>>
>>> --
>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>  
>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>> It’s not just an IT conference, it’s “a complete learning and networking
>>> experience”
>>> 
>>>
>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-24 Thread Steven Wu
Zhu Zhu,

Sorry, I was using different terminology. yes, Flink meter is what I was
talking about regarding "fullRestarts" for threshold based alerting.

On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:

> Steven,
>
> In my mind, Flink counter only stores its accumulated count and reports
> that value. Are you using an external counter directly?
> Maybe Flink Meter/MeterView is what you need? It stores the count and
> calculates the rate. And it will report its "count" as well as "rate" to
> external metric services.
>
> The counter "task_failures" only works if the individual failover strategy
> is enabled. However, it is not a public interface and is not suggested to
> use, as the fine grained recovery (region failover) now supersedes it.
> I've opened a ticket[1] to add a metric to show failovers that respects
> fine grained recovery.
>
> [1] https://issues.apache.org/jira/browse/FLINK-14164
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月24日周二 上午6:41写道:
>
>>
>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
>> after a first full restart. So alert condition will always be true after
>> first job restart. If we can apply a derivative to the Gauge value, I guess
>> alert can probably work. I can explore if that is an option or not.
>>
>> Yeah. Understood that "fullRestart" won't increment when fine grained
>> recovery happened. I think "task_failures" counter already exists in Flink.
>>
>>
>>
>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>
>>> Steven,
>>>
>>> Thanks for the information. If we can determine this a common issue, we
>>> can solve it in Flink core.
>>> To get to that state, I have two questions which need your help:
>>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>>> Gauge. Does the metric reporter you use report Counter and
>>> Gauge to external services in different ways? Or anything else can be
>>> different due to the metric type?
>>> 2. Is the "number of restarts" what you actually need, rather than
>>> the "fullRestart" count? If so, I believe we will have such a counter
>>> metric in 1.10, since the previous "fullRestart" metric value is not the
>>> number of restarts when grained recovery (feature added 1.9.0) is enabled.
>>> "fullRestart" reveals how many times entire job graph has been
>>> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
>>> would not be restarted when task failures happen and the "fullRestart"
>>> value will not increment in such cases.
>>>
>>> I'd appreciate if you can help with these questions and we can make
>>> better decisions for Flink.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>>
 Zhu Zhu,

 Flink fullRestart metric is a Gauge, which is not good for alerting on.
 We publish an equivalent Counter metric for alerting purpose.

 Thanks,
 Steven

 On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:

> Thanks Steven for the feedback!
> Could you share more information about the metrics you add in you
> customized restart strategy?
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月20日周五 上午7:11写道:
>
>> We do use config like "restart-strategy:
>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>> metrics than the Flink provided ones.
>>
>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>
>>> Thanks everyone for the input.
>>>
>>> The RestartStrategy customization is not recognized as a public
>>> interface as it is not explicitly documented.
>>> As it is not used from the feedbacks of this survey, I'll conclude
>>> that we do not need to support customized RestartStrategy for the new
>>> scheduler in Flink 1.10
>>>
>>> Other usages are still supported, including all the strategies and
>>> configuring ways described in
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>>> .
>>>
>>> Feel free to share in this thread if you has any concern for it.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>>
 Thanks Oytun for the reply!

 Sorry for not have stated it clearly. When saying "customized
 RestartStrategy", we mean that users implement an
 *org.apache.flink.runtime.executiongraph.restart.RestartStrategy*
 by themselves and use it by configuring like "restart-strategy:
 org.foobar.MyRestartStrategyFactoryFactory".

 The usage of restart strategies you mentioned will keep working
 with the new scheduler.

 Thanks,
 Zhu Zhu

 Oytun Tez  于2019年9月12日周四 下午10:05写道:

> Hi Zhu,
>
> We are using custom restart strategy like this:
>
> 

Flink Temporal Tables Usage

2019-09-24 Thread Nishant Gupta
Hi Team,

I have slight confusion w.r.t usage of temporal tables.

In documentation [1], it mentions that we need to use Lookuptables like
HBaseTableSource
and
In documentation [2], while using SQLClient, there isn't anything mentioned
about it.

Do we need to use the same kind of LookUpTables in environment file while
using SQLclient ? Could you please share me some working example or
anything. ?

Thanks
Nishant

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#defining-temporal-table
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#temporal-tables


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
I think I overlooked it. Good point. I am using Redis to save the path to
my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  wrote:

> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
>> wrote:
>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator
>>>  which
>>> already has a pretty fancy state machine for rolling upgrades. I'd love to
>>> be involved as this is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
>>> wrote:
>>>
 hi all--we've run into a gap (knowledge? design? tbd?) for our use
 cases when deploying Flink jobs to start from savepoints using the
 job-cluster mode in Kubernetes.

 we're running a ~15 different jobs, all in job-cluster mode, using a
 mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
 are all long-running streaming jobs, all essentially acting as
 microservices. we're using Helm charts to configure all of our deployments.

 we have a number of use cases where we want to restart jobs from a
 savepoint to replay recent events, i.e. when we've enhanced the job logic
 or fixed a bug. but after the deployment we want to have the job resume
 it's "long-running" behavior, where any unplanned restarts resume from the
 latest checkpoint.

 the issue we run into is that any obvious/standard/idiomatic Kubernetes
 deployment includes the savepoint argument in the configuration. if the Job
 Manager container(s) have an unplanned restart, when they come back up they
 will start from the savepoint instead of resuming from the latest
 checkpoint. everything is working as configured, but that's not exactly
 what we want. we want the savepoint argument to be transient somehow (only
 used during the initial deployment), but Kubernetes doesn't really support
 the concept of transient configuration.

 i can see a couple of potential solutions that either involve custom
 code in the jobs or custom logic in the container (i.e. a custom entrypoint
 script that records that the configured savepoint has already been used in
 a file on a persistent volume or GCS, and potentially when/why/by which
 deployment). but these seem like unexpected and hacky solutions. before we
 head down that road i wanted to ask:

- is this is already a solved problem that i've missed?
- is this issue already on the community's radar?

 thanks in advance!

 --
 *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
 
 
 *Altitude 2019 in San Francisco | Sept. 23 - 25*
 It’s not just an IT conference, it’s “a complete learning and
 networking experience”
 


>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-24 Thread Zhu Zhu
Hi Steven,

As a conclusion, since we will have a meter metric[1] for restarts,
customized restart strategy is not needed in your case.
Is that right?

[1] https://issues.apache.org/jira/browse/FLINK-14164

Thanks,
Zhu Zhu

Steven Wu  于2019年9月25日周三 上午2:30写道:

> Zhu Zhu,
>
> Sorry, I was using different terminology. yes, Flink meter is what I was
> talking about regarding "fullRestarts" for threshold based alerting.
>
> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>
>> Steven,
>>
>> In my mind, Flink counter only stores its accumulated count and reports
>> that value. Are you using an external counter directly?
>> Maybe Flink Meter/MeterView is what you need? It stores the count and
>> calculates the rate. And it will report its "count" as well as "rate" to
>> external metric services.
>>
>> The counter "task_failures" only works if the individual failover
>> strategy is enabled. However, it is not a public interface and is not
>> suggested to use, as the fine grained recovery (region failover) now
>> supersedes it.
>> I've opened a ticket[1] to add a metric to show failovers that respects
>> fine grained recovery.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月24日周二 上午6:41写道:
>>
>>>
>>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>>> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
>>> after a first full restart. So alert condition will always be true after
>>> first job restart. If we can apply a derivative to the Gauge value, I guess
>>> alert can probably work. I can explore if that is an option or not.
>>>
>>> Yeah. Understood that "fullRestart" won't increment when fine grained
>>> recovery happened. I think "task_failures" counter already exists in Flink.
>>>
>>>
>>>
>>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>>
 Steven,

 Thanks for the information. If we can determine this a common issue, we
 can solve it in Flink core.
 To get to that state, I have two questions which need your help:
 1. Why is gauge not good for alerting? The metric "fullRestart" is a
 Gauge. Does the metric reporter you use report Counter and
 Gauge to external services in different ways? Or anything else can be
 different due to the metric type?
 2. Is the "number of restarts" what you actually need, rather than
 the "fullRestart" count? If so, I believe we will have such a counter
 metric in 1.10, since the previous "fullRestart" metric value is not the
 number of restarts when grained recovery (feature added 1.9.0) is enabled.
 "fullRestart" reveals how many times entire job graph has been
 restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
 would not be restarted when task failures happen and the "fullRestart"
 value will not increment in such cases.

 I'd appreciate if you can help with these questions and we can make
 better decisions for Flink.

 Thanks,
 Zhu Zhu

 Steven Wu  于2019年9月22日周日 上午3:31写道:

> Zhu Zhu,
>
> Flink fullRestart metric is a Gauge, which is not good for alerting
> on. We publish an equivalent Counter metric for alerting purpose.
>
> Thanks,
> Steven
>
> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>
>> Thanks Steven for the feedback!
>> Could you share more information about the metrics you add in you
>> customized restart strategy?
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>
>>> We do use config like "restart-strategy:
>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>> metrics than the Flink provided ones.
>>>
>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>
 Thanks everyone for the input.

 The RestartStrategy customization is not recognized as a public
 interface as it is not explicitly documented.
 As it is not used from the feedbacks of this survey, I'll conclude
 that we do not need to support customized RestartStrategy for the new
 scheduler in Flink 1.10

 Other usages are still supported, including all the strategies and
 configuring ways described in
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
 .

 Feel free to share in this thread if you has any concern for it.

 Thanks,
 Zhu Zhu

 Zhu Zhu  于2019年9月12日周四 下午10:33写道:

> Thanks Oytun for the reply!
>
> Sorry for not have stated it clearly. When saying "customized
> RestartStrategy", we mean that users implement an
> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy*
> by themselves and use it by configuring like "restart-strategy:
> 

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Dian Fu
Hi Subramanyam,

1.7.3 is not released yet. You need cherrypick these fixes if they really need 
them.

Regards,
Dian

> 在 2019年9月25日,上午12:08,Zhu Zhu  写道:
> 
> Hi Subramanyam, 
> 
> I checked the commits.
> There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0 
> contain both of them.
> 
> Thanks,
> Zhu Zhu
> 
> Subramanyam Ramanathan  > 于2019年9月24日周二 下午11:02写道:
> Hi Zhu,
> 
>  
> 
> We also use FlinkKafkaProducer(011), hence I felt this fix would also be 
> needed for us.
> 
>  
> 
> I agree that the fix for the issue I had originally mentioned would not be 
> fixed by this, but I felt that I should be consuming this fix also.
> 
>  
> 
> Thanks,
> 
> Subbu
> 
>  
> 
> From: Zhu Zhu [mailto:reed...@gmail.com ] 
> Sent: Tuesday, September 24, 2019 6:13 PM
> To: Subramanyam Ramanathan  >
> Cc: Dian Fu mailto:dian0511...@gmail.com>>; 
> user@flink.apache.org 
> Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
> classloader
> 
>  
> 
> Hi Subramanyam,
> 
>  
> 
> I think you do not need the fix in FLINK-10455 which is for Kafka only. It's 
> just a similar issue as you met.
> 
> As you said, we need to make sure that the operator/UDF spawned threads are 
> stopped in the close() method. In this way, we can avoid the thread to throw 
> NoClassDefFoundError due to the class loader gets closed.
> 
>  
> 
> Thanks,
> 
> Zhu Zhu
> 
>  
> 
>  
> 
> Subramanyam Ramanathan  > 于2019年9月24日周二 下午8:07写道:
> 
> Hi,
> 
>  
> 
> Thank you.
> 
> I think the takeaway for us is that we need to make sure that the threads are 
> stopped in the close() method.
> 
>  
> 
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6, 1.7.0, 
> 1.7.3, 1.8.1, 1.9.0
> 
>  
> 
> However, I’m unable to find 1.7.3 in the downloads 
> page(https://flink.apache.org/downloads.html 
> ). Is it yet to be released, or 
> perhaps I am not looking in the right place ?
> 
> We’re currently using 1.7.2. Could you please let me know what is the minimal 
> upgrade for me to consume the fix for FLINK-10455 ?
> 
>  
> 
> Thanks,
> 
> Subbu
> 
>  
> 
> From: Dian Fu [mailto:dian0511...@gmail.com ] 
> Sent: Monday, September 23, 2019 1:54 PM
> To: Subramanyam Ramanathan  >
> Cc: Zhu Zhu mailto:reed...@gmail.com>>; 
> user@flink.apache.org 
> Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
> classloader
> 
>  
> 
> Hi Subbu,
> 
>  
> 
> The issue you encountered is very similar to the issue which has been fixed 
> in FLINK-10455 [1]. Could you check if that fix could solve your problem? The 
> root cause for that issue is that the method close() has not closed all 
> things. After the method "close()" is called, the classloader 
> (URLClassloader) will be closed. If there is thread still running after 
> "close()" method is called, it may access the classes in user provided jars. 
> However, as the URLClassloader has already been closed, NoClassDefFoundError 
> will be thrown.
> 
>  
> 
> Regards,
> 
> Dian
> 
>  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10455 
> 
>  
> 
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan 
>  > 写道:
> 
>  
> 
> Hi,
> 
>  
> 
> I was able to simulate the issue again and understand the cause a little 
> better.
> 
>  
> 
> The issue occurs when :
> 
> -One of the RichMapFunction transformations uses a third party 
> library in the open() method that spawns a thread.
> 
> -The thread doesn’t get properly closed in the close() method.
> 
> -Once the job starts failing, we start seeing a NoClassDefFound error 
> from that thread.
> 
>  
> 
> I understand that cleanup should be done in the close() method. However, just 
> wanted to know, do we have some kind of a configuration setting  which would 
> help us clean up such threads ? 
> 
> I can attach the code if required.
> 
>  
> 
> Thanks,
> 
> Subbu
> 
>  
> 
> From: Zhu Zhu [mailto:reed...@gmail.com ] 
> Sent: Friday, August 9, 2019 7:43 AM
> To: Subramanyam Ramanathan  >
> Cc: user@flink.apache.org 
> Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
> classloader
> 
>  
> 
> Hi Subramanyam,
> 
>  
> 
> Could you share more information? including:
> 
> 1. the URL pattern
> 
> 2. the detailed exception and the log around it
> 
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
> 
> 4. it's session mode or per job mode
> 
>  
> 
> This information would be helpful to identify the failure cause.
> 
>  

Re: Question about reading ORC file in Flink

2019-09-24 Thread 163
Hi Fabian,

After debugging in local mode, I found that Flink orc connector is no problem, 
but some fields in our schema is in capital form,so these fields can not be 
matched.
But the program directly read orc file using includeColumns method, which will 
use equalsIgnoreCase to match the column, so it can read the fields.

Thanks for your Help!

Qi Shu


> 在 2019年9月24日,下午4:36,Fabian Hueske  写道:
> 
> Hi QiShu,
> 
> It might be that Flink's OrcInputFormat has a bug.
> Can you open a Jira issue to report the problem? 
> In order to be able to fix this, we need as much information as possible.
> It would be great if you could create a minimal example of an ORC file and a 
> program that reproduces the issue.
> If that's not possible, we need the schema of an Orc file that cannot be 
> correctly read.
> 
> Thanks,
> Fabian
> 
> Am Mo., 23. Sept. 2019 um 11:40 Uhr schrieb ShuQi  >:
> Hi Guys,
> 
> The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS and 
> the job is executed successfully, no any exception or error. But some 
> fields(such as tagIndustry) are always null, actually these fields are not 
> null. I can read these fields by direct reading it. Below is my code:
> 
> //main
>  final ParameterTool params = ParameterTool.fromArgs(args);
> 
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> 
> env.getConfig().setGlobalJobParameters(params);
> 
> Configuration config = new Configuration();
> 
> 
> OrcTableSource orcTableSource = OrcTableSource
> .builder()
> .path(params.get("input"))
> .forOrcSchema(TypeDescription.fromString(TYPE_INFO))
> .withConfiguration(config)
> .build();
> 
> DataSet dataSet = orcTableSource.getDataSet(env);
> 
> DataSet> counts = dataSet.flatMap(new 
> Tokenizer()).groupBy(0).sum(1);
> 
> //read field
> public void flatMap(Row row, Collector> out) {
> 
> String content = ((String) row.getField(6));
> String tagIndustry = ((String) row.getField(35));
> 
> LOGGER.info("arity: " + row.getArity());
> LOGGER.info("content: " + content);
> LOGGER.info("tagIndustry: " + tagIndustry);
> LOGGER.info("===");
> 
> if (Strings.isNullOrEmpty(content) || 
> Strings.isNullOrEmpty(tagIndustry) || !tagIndustry.contains("FD001")) {
> return;
> }
> // normalize and split the line
> String[] tokens = content.toLowerCase().split("\\W+");
> 
> // emit the pairs
> for (String token : tokens) {
> if (token.length() > 0) {
> out.collect(new Tuple2<>(token, 1));
> }
> }
> }
> 
> Thanks for your help!
> 
> QiShu
> 
> 
> 
>  



Re: 请教初始化系统缓存的问题

2019-09-24 Thread 高博
你好,

我这里提供几个思路,我们公司做车联网的,目前线上运行的程序,都需要处理你说的这些场景。

1.
有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。
 目前我们需要使用的外部缓存数据,包含需要调用接口的,需要读取数据库的基础数据。
 针对调用接口,我们使用的guava的异步缓存刷新策略
 针对数据库中的基础数据,我们类似懒加载,当第一条数据过来的时候,我们会锁住流,等待所有的数据都读取完了,整个流才继续执行。



2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?)
 针对基础数据的刷新,可以分两个阶段来做。
 阶段一,可以启动一个定时刷新的线程,定时增量从数据库中读取数据来刷新缓存中的数据就OK。

 阶段二,可以基于canal+kafka+广播来做。简单来说,就是用canal监听数据库的变换,然后推送到kafka中,你的流中消费这个kafka
topic,然后获取到基础数据后,通过广播的形式来传递到各个算子。

另外,社区里有人使用的是流表与维表join的方式来做的,维表就是基础数据,维表的加载和刷新都是通过异步算子来实现的。具体的,你可以搜一下。
感觉这个更符合Flink的体系。我前面提到的,其实在java程序和在Flink中都会这么处理。


haoxin...@163.com  于2019年9月24日周二 上午10:45写道:

>
> 大家好,初学flink,版本1.8.1。想请教一个思路问题:在物联网系统中会实时处理一些设备上来的状态数据,但是设备元数据信息或者基础数据会存储在类似MySQL数据库中,当实时流数据上来的时候需要这些基础数据进行计算。但是因为性能问题,不能实时的去数据库获取,所以需要在系统启动的时候缓存起来,然后再开始收数据或者开始处理数据。数据来源kafka。
> 请教2个问题:
>
> 1.
> 有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。
> 2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?)
>
> 谢谢。
>
>
>
> haoxin...@163.com
>


Re: Re: 请教初始化系统缓存的问题

2019-09-24 Thread haoxin...@163.com
非常感谢,大家同行。
我们目前是确实按照类似你说的这些方式去完成的。但是我们始终觉得应该有更加flink的方式优雅完成,就像维表join。之前一直没有细看,谢谢提醒。



haoxin...@163.com
 
发件人: 高博
发送时间: 2019-09-24 16:03
收件人: user-zh
主题: Re: 请教初始化系统缓存的问题
你好,
 
我这里提供几个思路,我们公司做车联网的,目前线上运行的程序,都需要处理你说的这些场景。
 
1.
有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。
 目前我们需要使用的外部缓存数据,包含需要调用接口的,需要读取数据库的基础数据。
 针对调用接口,我们使用的guava的异步缓存刷新策略
 针对数据库中的基础数据,我们类似懒加载,当第一条数据过来的时候,我们会锁住流,等待所有的数据都读取完了,整个流才继续执行。
 
 
 
2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?)
 针对基础数据的刷新,可以分两个阶段来做。
 阶段一,可以启动一个定时刷新的线程,定时增量从数据库中读取数据来刷新缓存中的数据就OK。
 
阶段二,可以基于canal+kafka+广播来做。简单来说,就是用canal监听数据库的变换,然后推送到kafka中,你的流中消费这个kafka
topic,然后获取到基础数据后,通过广播的形式来传递到各个算子。
 
另外,社区里有人使用的是流表与维表join的方式来做的,维表就是基础数据,维表的加载和刷新都是通过异步算子来实现的。具体的,你可以搜一下。
感觉这个更符合Flink的体系。我前面提到的,其实在java程序和在Flink中都会这么处理。
 
 
haoxin...@163.com  于2019年9月24日周二 上午10:45写道:
 
>
> 大家好,初学flink,版本1.8.1。想请教一个思路问题:在物联网系统中会实时处理一些设备上来的状态数据,但是设备元数据信息或者基础数据会存储在类似MySQL数据库中,当实时流数据上来的时候需要这些基础数据进行计算。但是因为性能问题,不能实时的去数据库获取,所以需要在系统启动的时候缓存起来,然后再开始收数据或者开始处理数据。数据来源kafka。
> 请教2个问题:
>
> 1.
> 有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。
> 2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?)
>
> 谢谢。
>
>
>
> haoxin...@163.com
>


Re: Flink ORC 读取问题

2019-09-24 Thread 163
Terry Wang:

通过本地调试,发现Flink orc 
connector没有问题,是我们自己的schema中有部分字段有大小写,所以在匹配的时候没有匹配到,谢谢!

Qi Shu



> 在 2019年9月24日,上午11:01,Terry Wang  写道:
> 
> 能否起一个本地程序,设置断点,看看读取数据那块儿逻辑是不是有问题
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月23日,下午5:11,ShuQi  写道:
>> 
>> Flink版本为1.9.0,基于OrcTableSource进行ORC文件的读取,碰到一个问题,程序没有任何异常,顺利执行完毕,但又部分字段读出来始终为null,但实际是有值得,通过直接读取文件的方式可以读取到全部字段。
>> 
>> 
>> 请问大家是否有什么好的建议,谢谢!
>> 
>