Re: Fire and Purge with Idle State

2018-10-12 Thread Hequn Cheng
Hi shkob1,

Currently, the idle state retention time is only used for unbounded
operators in sql/table-api. The unbounded operators include non-window
group by, non-window join, unbounded over, etc. The retention time affects
neither sql/table-api window operators nor DataStream operators.

Best, Hequn

On Sat, Oct 13, 2018 at 2:40 AM shkob1  wrote:

> Hey
>
> Say im aggregating an event stream by sessionId in SQL and im emitting the
> results once the session is "over", i guess i should be using Fire and
> Purge
> - i dont expect to need to session data once over. How should i treat the
> Idle state retention time - is it needed at all if im using purge? will it
> become relevant only if a session is both never-ending AND never has more
> records?
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Custom Trigger + SQL Pattern

2018-10-12 Thread Hequn Cheng
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific
event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The
session window may output multi `last` elements. However, we can use a udtf
to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions

On Sat, Oct 13, 2018 at 2:28 AM shkob1  wrote:

> Hey!
>
> I have a use case in which im grouping a stream by session id - so far
> pretty standard, note that i need to do it through SQL and not by the table
> api.
> In my use case i have 2 trigger conditions though - while one is time
> (session inactivity) the other is based on a specific event marked as a
> "last" event.
> AFAIK SQL does not support custom triggers - so what i end up doing is
> doing
> group by in the SQL - then converting the result to a stream along with a
> boolean field that marks whether at least one of the events was the end
> event - then adding my custom trigger on top of it.
> It looks something like this:
>
>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
> sessionId, count(*) FROM source Group By sessionId");
> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
> .filter(tuple -> tuple.f0)
> .map(...)
> .returns(...)
> .keyBy("sessionId")
> .window(EventTimeSessionWindows.withGap(Time.hours(4)))
> .trigger(new SessionEndedByTimeOrEndTrigger())
> .process(...take last element from the group by result..)
>
> This seems like a weird work around to, isn't it? my window is basically of
> the SQL result rather than on the source stream. Ideally i would keyby the
> sessionId before running the SQL but then a) would I need to register a
> table per key? b) would i be able to use the custom trigger per window?
>
> basically i want to group by session id and have a window for every session
> that supports both time and custom trigger. Assuming i need to use SQL
> (reason is the query is dynamically loaded), is there a better solution for
> it?
>
>
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Questions in sink exactly once implementation

2018-10-12 Thread Hequn Cheng
Hi Henry,

Yes, exactly once using atomic way is heavy for mysql. However, you don't
have to buffer data if you choose option 2. You can simply overwrite old
records with new ones if result data is idempotent and this way can also
achieve exactly once.
There is a document about End-to-End Exactly-Once Processing in Apache
Flink[1], which may be helpful for you.

Best, Hequn

[1]
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html



On Fri, Oct 12, 2018 at 5:21 PM 徐涛  wrote:

> Hi
> I am reading the book “Introduction to Apache Flink”, and in the
> book there mentions two ways to achieve sink exactly once:
> 1. The first way is to buffer all output at the sink and commit
> this atomically when the sink receives a checkpoint record.
> 2. The second way is to eagerly write data to the output, keeping
> in mind that some of this data might be “dirty” and replayed after a
> failure. If there is a failure, then we need to roll back the output, thus
> overwriting the dirty data and effectively deleting dirty data that has
> already been written to the output.
>
> I read the code of Elasticsearch sink, and find there is a
> flushOnCheckpoint option, if set to true, the change will accumulate until
> checkpoint is made. I guess it will guarantee at-least-once delivery,
> because although it use batch flush, but the flush is not a atomic action,
> so it can not guarantee exactly-once delivery.
>
> My question is :
> 1. As many sinks do not support transaction, at this case I have
> to choose 2 to achieve exactly once. At this case, I have to buffer all the
> records between checkpoints and delete them, it is a bit heavy action.
> 2. I guess mysql sink should support exactly once delivery,
> because it supports transaction, but at this case I have to execute batch
> according to the number of actions between checkpoints but not a specific
> number, 100 for example. When there are a lot of items between checkpoints,
> it is a heavy action either.
>
> Best
> Henry


Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-12 Thread Abdul Qadeer
We were able to fix it by passing IP address instead of hostname for actor
system listen address when starting taskmanager:

def runTaskManager(
taskManagerHostname: String,
resourceID: ResourceID,
actorSystemPort: Int,
configuration: Configuration,
highAvailabilityServices: HighAvailabilityServices)
: Unit = {


The following log message at jobmanager gave some clue:

{"timeMillis":1539297842333,"thread":"jobmanager-future-thread-2","level":"DEBUG","loggerName":"org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher","message":"Could
not retrieve 
QueryServiceGateway.","thrown":{"commonElementCount":0,"localizedMessage":"akka.actor.ActorNotFound:
Actor not found for:
ActorSelection[Anchor(akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070/),
Path(/user/MetricQueryService_5261ccab66b86b53a4edd64f26c1f282)]"...

...


We figured there is some problem with hostname resolution after the
actor is quarantined, would you know why this happens? Is it some
cache problem in Flink or Akka code JobManager is using?


On Fri, Oct 12, 2018 at 1:05 AM Till Rohrmann  wrote:

> It is hard to tell without all logs but it could easily be a K8s setup
> problem. Also problematic is that you are running a Flink version which is
> no longer actively supported. Try at least to use the latest bug fix
> release for 1.4.
>
> Cheers,
> Till
>
> On Fri, Oct 12, 2018, 09:43 Abdul Qadeer  wrote:
>
>> Hi Till,
>>
>> A few more data points:
>>
>> In a rerun of the same versions with fresh deployment, I see 
>> *log*.debug(*s"RegisterTaskManager:
>> $*msg*"*) in JobManager, however the
>> *AcknowledgeRegistration/AlreadyRegistered *messages are never sent, I
>> have taken tcpdump for the taskmanager which doesn't recover and compared
>> it with another taskmanager which recovers after restart (i.e. receives
>> *AcknowledgeRegistration *message).
>>
>> Restarting the docker container of bad taskmanager doesn't work. The only
>> workaround right now is to delete the kubernetes pod holding the bad
>> taskmanager container. Does it have to do something with the akka address
>> the jobmanager stores for a taskmanager? The only variable I see between
>> restarting container vs pod is the change in the akka address.
>>
>> Also, the infinite retries for registration start after the taskmanager
>> container restarts with Jobmanager actor system quarantined:
>>
>> {"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The
>> actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has
>> quarantined the remote actor system akka.tcp://flink@192.168.83.52:6123.
>> Shutting the actor system down to be able to reestablish a
>> connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}
>>
>>
>> A manual restart by docker restart or killing the JVM doesn't reproduce
>> this problem.
>>
>> On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I didn't try with newer versions as it is not possible to update the
>>> Flink version atm.
>>> If you could give any pointers for debugging that would be great.
>>>
>>> On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Abdul,

 have you tried whether this problem also occurs with newer Flink
 versions (1.5.4 or 1.6.1)?

 Cheers,
 Till

 On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> Hi Abdul,
>
> I've added Till and Gary to cc, who might be able to help you.
>
> Best,
>
> Dawid
>
> On 11/10/18 03:05, Abdul Qadeer wrote:
>
> Hi,
>
>
> We are facing an issue in standalone HA mode in Flink 1.4.0 where
> Taskmanager restarts and is not able to register with the Jobmanager. It
> times out awaiting *AcknowledgeRegistration/AlreadyRegistered*
> message from Jobmanager Actor and keeps sending *RegisterTaskManager 
> *message.
> The logs at Jobmanager don’t show anything about registration
> failure/request. It doesn’t print *log*.debug(*s"RegisterTaskManager:
> $*msg*"*) (from JobManager.scala) either. The network connection
> between taskmanager and jobmanager seems fine; tcpdump shows message sent
> to jobmanager and TCP ACK received from jobmanager. Note that the
> communication is happening between docker containers.
>
>
> Following are the logs from Taskmanager:
>
>
>
> {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager akka.tcp://
> flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout:
> 3
> milliseconds)","endOfBatc

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

2018-10-12 Thread Fabian Hueske
Hi,

Which file system are you reading from? If you are reading from S3, this
might be cause by S3's eventual consistency property.
Have a look at FLINK-9940 [1] for a more detailed discussion.
There is also an open PR [2], that you could try to patch the source
operator with.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9940
[2] https://github.com/apache/flink/pull/6613

Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
jua...@tagtog.net>:

> Dear flinksters,
>
>
> I'm using the class `ContinuousFileMonitoringFunction` as a source to
> monitor a folder for new incoming files.* I have the problem that not all
> the files that are sent to the folder get processed / triggered by the
> function*. Specific details of my workflow is that I send up to 1k files
> per minute, and that I consume the stream as a `AsyncDataStream`.
>
> I myself raised an unrelated issue with the
> `ContinuousFileMonitoringFunction` class some time ago (
> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
> shared the very same timestamp, only the first one (non-deterministically
> chosen) would be processed. However, I patched the file myself to fix that
> problem by using a LinkedHashMap to remember which files had been really
> processed before or not. My patch is working fine as far as I can tell.
>
> The problem seems to be rather that some files (when many are sent at once
> to the same folder) do not even get triggered/activated/registered by the
> class.
>
>
> Am I properly explaining my problem?
>
>
> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
> YOU
>
> --
> Juanmi, CEO and co-founder @ 🍃tagtog.net
>
> Follow tagtog updates on 🐦 Twitter: @tagtog_net
> 
>
>


RE: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

2018-10-12 Thread Samir Tusharbhai Chauhan
Hi Till,

Can you tell when do I receive below error message?

2018-10-13 03:02:01,337 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler  - Could 
not retrieve the redirect address.
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(8b79d4540b45b3e622748b813d3a464b, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@127.0.0.1:50010/user/dispatcher because the fencing token is 
null.

Warm Regards,
Samir Chauhan

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Sunday, October 07, 2018 1:24 AM
To: Samir Tusharbhai Chauhan 
Cc: user 
Subject: Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

Hi Samir,

1. In your setup (not running on top of Yarn or Mesos) you need to set the 
jobmanager.rpc.address such that the JM process knows where to bind to. The 
other components use ZooKeeper to find out the addresses. The other properties 
should not be needed.
3. You can take a look at the ZooKeeper leader latch node. Alternatively, you 
can take a look at the address to which you are redirected when accessing the 
web UI.
4. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-ssl.html

Cheers,
Till

On Sat, Oct 6, 2018 at 5:57 PM Samir Tusharbhai Chauhan 
mailto:samir.tusharbhai.chau...@prudential.com.sg>>
 wrote:
Hi Till,

Thanks for identifying the issue. My cluster is up and running now.

I have few queries. Can you have to anwer that?


  1.  Do I need to set below properties in my cluster?

jobmanager.rpc.address

rest.address

rest.bind-address

jobmanager.web.address

  1.  Is there anything I should be take care while setting it up?
  2.  How do I know which job manager is active?
  3.  How do I secure my cluster?

Samir Chauhan

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: Friday, October 05, 2018 11:09 PM
To: Samir Tusharbhai Chauhan 
mailto:samir.tusharbhai.chau...@prudential.com.sg>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: org.apache.flink.runtime.rpc.exceptions.FencingTokenException:

Hi Samir,

could you share the logs of the two JMs and the log where you saw the 
FencingTokenException with us?

It looks to me as if the TM had an outdated fencing token (an outdated leader 
session id) with which it contacted the ResourceManager. This can happen and 
the TM should try to reconnect to the RM once it learns about the new leader 
session id via ZooKeeper. You could, for example check in ZooKeeper that it 
contains the valid leader information.

Cheers,
Till

On Fri, Oct 5, 2018 at 9:58 AM Samir Tusharbhai Chauhan 
mailto:samir.tusharbhai.chau...@prudential.com.sg>>
 wrote:

Hi,



I am having issue in setting up cluster for Flink. I have 2 nodes for Job 
Manager and 2 nodes for Task Manager.



My configuration file looks like this.



jobmanager.rpc.port: 6123

jobmanager.heap.size: 2048m

taskmanager.heap.size: 2048m

taskmanager.numberOfTaskSlots: 64

parallelism.default: 1

rest.port: 8081

high-availability.jobmanager.port: 50010

high-availability: zookeeper

high-availability.storageDir: file:///sharedflink/state_dir/ha/

high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181

high-availability.zookeeper.path.root: /flink

high-availability.cluster-id: /flick_ns



state.backend: rocksdb

state.checkpoints.dir: file:///sharedflink/state_dir/backend

state.savepoints.dir: file:///sharedflink/state_dir/savepoint

state.backend.incremental: false

state.backend.rocksdb.timer-service.factory: rocksdb

state.backend.local-recovery: false



But when I start services, I get this error message.



java.util.concurrent.CompletionException:

org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token

mismatch: Ignoring message

RemoteFencedMessage(b00185a18ea3da17ebe39ac411a84f3a,

RemoteRpcInvocation(registerTaskExecutor(String, ResourceID, int, 
HardwareDescription, Time))) because the fencing token 
b00185a18ea3da17ebe39ac411a84f3a did not match the expected fencing token 
bce1729df0a2ab8a7ea0426ba9994482.

at

java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)





But when I run JM and TM in single box, it is working fine.



Please help to resolve this issue ASAP 

Not all files are processed? Stream source with ContinuousFileMonitoringFunction

2018-10-12 Thread Juan Miguel Cejuela
Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to
monitor a folder for new incoming files.* I have the problem that not all
the files that are sent to the folder get processed / triggered by the
function*. Specific details of my workflow is that I send up to 1k files
per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the
`ContinuousFileMonitoringFunction` class some time ago (
https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
shared the very same timestamp, only the first one (non-deterministically
chosen) would be processed. However, I patched the file myself to fix that
problem by using a LinkedHashMap to remember which files had been really
processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once
to the same folder) do not even get triggered/activated/registered by the
class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

-- 
Juanmi, CEO and co-founder @ 🍃tagtog.net

Follow tagtog updates on 🐦 Twitter: @tagtog_net



Fire and Purge with Idle State

2018-10-12 Thread shkob1
Hey

Say im aggregating an event stream by sessionId in SQL and im emitting the
results once the session is "over", i guess i should be using Fire and Purge
- i dont expect to need to session data once over. How should i treat the
Idle state retention time - is it needed at all if im using purge? will it
become relevant only if a session is both never-ending AND never has more
records?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Custom Trigger + SQL Pattern

2018-10-12 Thread shkob1
Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it. 
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
.filter(tuple -> tuple.f0)
.map(...)
.returns(...)
.keyBy("sessionId")
.window(EventTimeSessionWindows.withGap(Time.hours(4)))
.trigger(new SessionEndedByTimeOrEndTrigger())
.process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Making calls to external API wit Data Streams

2018-10-12 Thread Dominik Wosiński
Hey,

It seems that You have written Async function that takes *String* and
returns *String*. But in execution you expect the result of the function to
be the tuple (*String, String).*  That's where the mismatch occurs, the
function itself is ok :)
If you will change *DataStream[(String,String)] *to *Datastream[String]* it
should work smoothly.

Best Regards,
Dom.

pt., 12 paź 2018 o 16:26 Krishna Kalyan  napisał(a):

> Thanks for the quick reply Dom,
>
> I am using flink 1.6.1.
>
> [image: image.png]
>
> Error: Type Mismatch expected AsyncFunction actual AsyncWeatherAPIRequest
>
>
>
> On Fri, 12 Oct 2018 at 16:21, Dominik Wosiński  wrote:
>
>> Hey,
>> What is the exact issue that you are facing and the Flink version that
>> you are using ??
>>
>>
>> Best Regards,
>> Dom.
>>
>> pt., 12 paź 2018 o 16:11 Krishna Kalyan 
>> napisał(a):
>>
>>> Hello All,
>>>
>>> I need some help making async API calls. I have tried the following code
>>> below.
>>>
>>> class AsyncWeatherAPIRequest extends AsyncFunction[String, String] {
>>>   override def asyncInvoke(input: String, resultFuture:
>>> ResultFuture[String]): Unit = {
>>> val query = url("")
>>> val response = Http.default(query OK as.String)
>>> resultFuture.complete(Collections.singleton(response()))
>>>   }
>>> }
>>>
>>> The code below leads to a compilation issue while calling the
>>> AsyncDataStream api.
>>>
>>> val resultStream: DataStream[(String, String)] =
>>>   AsyncDataStream.unorderedWait(userData, new
>>> AsyncWeatherAPIRequest(), 1000, TimeUnit.MILLISECONDS, 1)
>>>
>>> I would really appreciate some examples in scala to make an external API
>>> call with datastreams.
>>>
>>> Regards,
>>> Krishna
>>>
>>>
>>>
>>> Standorte in Stuttgart und Berlin  · Zoi
>>> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
>>> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
>>> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
>>> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
>>> E-Mail zu. This message (including any attachments) contains confidential
>>> information intended for a specific individual and purpose, and is
>>> protected by law. If you are not the intended recipient, you should delete
>>> this message. Any disclosure, copying, or distribution of this message, or
>>> the taking of any action based on it, is strictly prohibited.
>>>
>>>
>
> --
>
> Krishna Kalyan
>
> M +49 151 44159906 <+49%20151%2044159906>
>
> Standorte in Stuttgart und Berlin  · Zoi
> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
> E-Mail zu. This message (including any attachments) contains confidential
> information intended for a specific individual and purpose, and is
> protected by law. If you are not the intended recipient, you should delete
> this message. Any disclosure, copying, or distribution of this message, or
> the taking of any action based on it, is strictly prohibited.
>
>


Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-12 Thread Ahmad Hassan
Any help/pointers on this please ?

Thanks.

On Thu, 11 Oct 2018 at 10:33, Ahmad Hassan  wrote:

> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction
> {
> private final MapState products;
> private transient MapStateDescriptor
> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *// Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is that is
> that if i instantiate a java hashmap of products within DefaultVector fold
> accumulator then for each incoming event the full set of products will be
> deserialised and stored on heap which will eventually cause heap overflow
> error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske  wrote:
>
>> Yes, it would be good to post your code.
>> Are you using a FoldFunction in a window (if yes, what window) or as a
>> running aggregate?
>>
>> In general, collecting state in a FoldFunction is usually not something
>> that you should do. Did you consider using an AggregateFunction?
>>
>> Fabian
>>
>> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> In which method are you calling getRuntimeContext()? This method can
>>> only be used after open() has been called.
>>>
>>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>>
>>> Hi,
>>>
>>> We want to use MapState inside fold function to keep the map of all
>>> products that we see in 24 hour window to store huge state in rocksdb
>>> rather than overflowing heap. However, I don't seem to initialise mapstate
>>> within foldfunction or any class that is extending RichMapFunction
>>>
>>> private transient MapStateDescriptor descr = new
>>> MapStateDescriptor<>("mymap", String.class, String.class);
>>> this.getRuntimeContext().getMapState(descr);
>>>
>>> I get error
>>>
>>> java.lang.IllegalStateException: The runtime context has not been
>>> initialized.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>>
>>>
>>> Any clues how to get the runtime context please?
>>>
>>> Thanks.
>>>
>>> Best regards
>>>
>>>
>>>


Re: Making calls to external API wit Data Streams

2018-10-12 Thread Krishna Kalyan
Thanks for the quick reply Dom,

I am using flink 1.6.1.

[image: image.png]

Error: Type Mismatch expected AsyncFunction actual AsyncWeatherAPIRequest



On Fri, 12 Oct 2018 at 16:21, Dominik Wosiński  wrote:

> Hey,
> What is the exact issue that you are facing and the Flink version that you
> are using ??
>
>
> Best Regards,
> Dom.
>
> pt., 12 paź 2018 o 16:11 Krishna Kalyan 
> napisał(a):
>
>> Hello All,
>>
>> I need some help making async API calls. I have tried the following code
>> below.
>>
>> class AsyncWeatherAPIRequest extends AsyncFunction[String, String] {
>>   override def asyncInvoke(input: String, resultFuture:
>> ResultFuture[String]): Unit = {
>> val query = url("")
>> val response = Http.default(query OK as.String)
>> resultFuture.complete(Collections.singleton(response()))
>>   }
>> }
>>
>> The code below leads to a compilation issue while calling the
>> AsyncDataStream api.
>>
>> val resultStream: DataStream[(String, String)] =
>>   AsyncDataStream.unorderedWait(userData, new
>> AsyncWeatherAPIRequest(), 1000, TimeUnit.MILLISECONDS, 1)
>>
>> I would really appreciate some examples in scala to make an external API
>> call with datastreams.
>>
>> Regards,
>> Krishna
>>
>>
>>
>> Standorte in Stuttgart und Berlin  · Zoi
>> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
>> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
>> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
>> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
>> E-Mail zu. This message (including any attachments) contains confidential
>> information intended for a specific individual and purpose, and is
>> protected by law. If you are not the intended recipient, you should delete
>> this message. Any disclosure, copying, or distribution of this message, or
>> the taking of any action based on it, is strictly prohibited.
>>
>>

-- 

Krishna Kalyan

M +49 151 44159906 <+49%20151%2044159906>

-- 
Standorte in Stuttgart und Berlin  · Zoi 
TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin 
Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619, 
Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch 
hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der 
E-Mail zu. This message (including any attachments) contains confidential 
information intended for a specific individual and purpose, and is 
protected by law. If you are not the intended recipient, you should delete 
this message. Any disclosure, copying, or distribution of this message, or 
the taking of any action based on it, is strictly prohibited.




Re: Making calls to external API wit Data Streams

2018-10-12 Thread Dominik Wosiński
Hey,
What is the exact issue that you are facing and the Flink version that you
are using ??


Best Regards,
Dom.

pt., 12 paź 2018 o 16:11 Krishna Kalyan  napisał(a):

> Hello All,
>
> I need some help making async API calls. I have tried the following code
> below.
>
> class AsyncWeatherAPIRequest extends AsyncFunction[String, String] {
>   override def asyncInvoke(input: String, resultFuture:
> ResultFuture[String]): Unit = {
> val query = url("")
> val response = Http.default(query OK as.String)
> resultFuture.complete(Collections.singleton(response()))
>   }
> }
>
> The code below leads to a compilation issue while calling the
> AsyncDataStream api.
>
> val resultStream: DataStream[(String, String)] =
>   AsyncDataStream.unorderedWait(userData, new
> AsyncWeatherAPIRequest(), 1000, TimeUnit.MILLISECONDS, 1)
>
> I would really appreciate some examples in scala to make an external API
> call with datastreams.
>
> Regards,
> Krishna
>
>
>
> Standorte in Stuttgart und Berlin  · Zoi
> TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin
> Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619,
> Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch
> hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der
> E-Mail zu. This message (including any attachments) contains confidential
> information intended for a specific individual and purpose, and is
> protected by law. If you are not the intended recipient, you should delete
> this message. Any disclosure, copying, or distribution of this message, or
> the taking of any action based on it, is strictly prohibited.
>
>


Making calls to external API wit Data Streams

2018-10-12 Thread Krishna Kalyan
Hello All,

I need some help making async API calls. I have tried the following code
below.

class AsyncWeatherAPIRequest extends AsyncFunction[String, String] {
  override def asyncInvoke(input: String, resultFuture:
ResultFuture[String]): Unit = {
val query = url("")
val response = Http.default(query OK as.String)
resultFuture.complete(Collections.singleton(response()))
  }
}

The code below leads to a compilation issue while calling the
AsyncDataStream api.

val resultStream: DataStream[(String, String)] =
  AsyncDataStream.unorderedWait(userData, new AsyncWeatherAPIRequest(),
1000, TimeUnit.MILLISECONDS, 1)

I would really appreciate some examples in scala to make an external API
call with datastreams.

Regards,
Krishna

-- 
Standorte in Stuttgart und Berlin  · Zoi 
TechCon GmbH · Quellenstr. 7 · 70376 Stuttgart · Geschäftsführer: Benjamin 
Hermann, Dr. Daniel Heubach. Amtsgericht Stuttgart HRB 759619, 
Gerichtsstand Stuttgart. Die genannten Angaben werden automatisch 
hinzugefügt und lassen keine Rückschlüsse auf den Rechtscharakter der 
E-Mail zu. This message (including any attachments) contains confidential 
information intended for a specific individual and purpose, and is 
protected by law. If you are not the intended recipient, you should delete 
this message. Any disclosure, copying, or distribution of this message, or 
the taking of any action based on it, is strictly prohibited.




Re: How do I initialize the window state on first run?

2018-10-12 Thread bupt_ljy
Yes…that’s an option, but it’ll be very complicated because of our storage and 
business.
Now I’m trying to write an handler like the “KvStateHandler” so that I can 
access(read/write) the state from my client.


Original Message
Sender:Congxian qiuqcx978132...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:yanghua1127yanghua1...@gmail.com; useru...@flink.apache.org
Date:Friday, Oct 12, 2018 20:14
Subject:Re: How do I initialize the window state on first run?


IIUC, we can't initialize state at first run, maybe you could store the 
aggregated data in another place other than use flink's state, then use flink 
to aggregate the data realtime.


bupt_ljy bupt_...@163.com 于2018年10月12日周五 下午3:33写道:

Hi, vivo,
  My Flink program is to aggregate the data of a whole day, assume we start 
this program on 6:00 am, the default state in the window should be the 
aggregated result of 0:00 am to 6:00 am.


Original Message
Sender:vino yangyanghua1...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Friday, Oct 12, 2018 15:13
Subject:Re: How do I initialize the window state on first run?


Hi Jiayi,


If you don't mind, I would like to ask you what kind of situation do you have 
in this situation?



Thanks, vino.


bupt_ljy bupt_...@163.com 于2018年10月12日周五 下午1:59写道:

Hi,
 I’m going to run a new Flink program with some initialized window states.
 I can’t see there is an official way to do this, right? I’ve tried the bravo 
project, but it doesn’t support FsStateBackend and it costs too much work if we 
add a new StateBackend in it.
 Any good ideas about this?






Jiayi Liao,Best
 




-- 

Blog:http://www.klion26.com
GTalk:qcx978132955
一切随心

Re: How do I initialize the window state on first run?

2018-10-12 Thread Congxian Qiu
IIUC, we can't  initialize state at first run,  maybe you could store the
aggregated data in another place other than use flink's state, then use
flink to aggregate the data realtime.

bupt_ljy  于2018年10月12日周五 下午3:33写道:

> Hi, vivo,
>
> My Flink program is to aggregate the data of a whole day, assume we
> start this program on 6:00 am, the default state in the window should be
> the aggregated result of 0:00 am to 6:00 am.
>
>  Original Message
> *Sender:* vino yang
> *Recipient:* bupt_ljy
> *Cc:* user
> *Date:* Friday, Oct 12, 2018 15:13
> *Subject:* Re: How do I initialize the window state on first run?
>
> Hi Jiayi,
>
> If you don't mind, I would like to ask you what kind of situation do you
> have in this situation?
>
> Thanks, vino.
>
> bupt_ljy  于2018年10月12日周五 下午1:59写道:
>
>> Hi,
>>
>>I’m going to run a new Flink program with some initialized window
>> states.
>>
>>I can’t see there is an official way to do this, right? I’ve tried the
>> bravo project, but it doesn’t support FsStateBackend and it costs too much
>> work if we add a new StateBackend in it.
>>
>>Any good ideas about this?
>>
>>
>>
>>
>> Jiayi Liao,Best
>>
>>
>>
>

-- 
Blog:http://www.klion26.com
GTalk:qcx978132955
一切随心


Re: Are savepoints / checkpoints co-ordinated?

2018-10-12 Thread Congxian Qiu
AFAIK, "Cancel Job with Savepoint" will stop checkpointScheduler -->
trigger a savepoint, then cancel your job. there will no more checkpoints.

 于2018年10月12日周五 上午1:30写道:

> Hi,
>
>
>
> I had a couple questions about savepoints / checkpoints
>
>
>
> When I issue "Cancel Job with Savepoint", how is that instruction
> co-ordinated with check points? Am I certain the savepoint will be the last
> operation (i.e. no more check points)?
>
>
>
> I have a kafka src>operation>kafka sink task in flink. And it looks like
> on restart from the savepoint there are duplicates written to the sink
> topic in kafka. The dupes overlap with the last few events prior to save
> point, and I am trying to work out what could have happened.
>
> My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but
> env.enableCheckpointing(parameters.getInt("checkpoint.interval"),
> CheckpointingMode.EXACTLY_ONCE).
>
> I thought at least once still implies flushes to kafka still only occur
> with a checkpoint.
>
>
>
> One  theory is a further checkpoint occurred after/ during the savepoint -
> which would have flushed events to kafka that are not in my savepoint.
>
>
>
> Any pointers to schoolboy errors I may have made would be appreciated.
>
>
>
> -
>
> Also  am I right in thinking if I have managed state with rocksdb back end
> that is using 1G on disk, but substantially less keyed state in memory, a
> savepoint needs to save the full 1G to complete?
>
>
>
> Regards
>
> Anand
>


-- 
Blog:http://www.klion26.com
GTalk:qcx978132955
一切随心


Re: Getting NoMethod found error while running job on flink 1.6.1

2018-10-12 Thread Chesnay Schepler
The cause cannot be that flink-metrics-core is not on the classpath as 
in that case you'd get a ClassNotFoundError.


This is a version conflict, either caused by your fat jar bundling an 
older version of flink-metrics-core but a newer version of the kafka 
connector, or you upgrade your application correctly but the cluster is 
actually still running an older version.


Please check your dependencies and build setup.

On 11.10.2018 08:05, Chandu Kempaiah wrote:
Flink is running as standalone cluster in High Availability mode, My 
application jar is a fat jar which has all the necessary dependencies 
included.


I will check once again and verify by adding the flink-metrics-core to 
the classpath.


Thanks
Chandu

On Wed, Oct 10, 2018 at 8:38 PM vino yang > wrote:


Hi Chandu,

What mode does your Flink run in?
In addition, can you check if the flink-metrics-core is included
in the classpath of the Flink runtime environment?

Thanks, vino.

Chandu Kempaiah mailto:chandu.kempa...@reflektion.com>> 于2018年10月11日周四
上午9:51写道:


Hello,

I am have a job that reads messages from kafka, processes them
and writes back to kafka, this jobs works fine on flink 1.3.2.
I upgraded cluster to 1.6.1 but now see below error. Has any
one faced similar issue?

I have updated all the dependencies to use

1.6.1


 org.apache.flink
 
flink-connector-kafka-0.10_${scala.version}
 ${flink.version}



java.lang.NoSuchMethodError: 
org.apache.flink.metrics.MetricGroup.addGroup(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/flink/metrics/MetricGroup;
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.registerOffsetMetrics(AbstractFetcher.java:622)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:200)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:91)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:64)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:209)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:647)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Thanks

Chandu





Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-12 Thread Stefan Richter
Hi,

Can you maybe show us what is inside of one of the directory instance? 
Furthermore, your TM logs show multiple instances of OutOfMemoryErrors, so that 
might also be a problem. Also how was the job moved? If a TM is killed, of 
course it cannot cleanup. That is why the data goes to tmp dir so that the OS 
can eventually take care of it, in container environments this dir should 
always be cleaned anyways.

Best,
Stefan

> On 11. Oct 2018, at 10:15, Sayat Satybaldiyev  wrote:
> 
> Thank you Piotr for the reply! We didn't run this job on the previous version 
> of Flink. Unfortunately, I don't have a log file from JM only TM logs. 
> 
> https://drive.google.com/file/d/14QSVeS4c0EETT6ibK3m_TMgdLUwD6H1m/view?usp=sharing
>  
> 
> 
> On Wed, Oct 10, 2018 at 10:08 AM Piotr Nowojski  > wrote:
> Hi,
> 
> Was this happening in older Flink version? Could you post in what 
> circumstances the job has been moved to a new TM (full job manager logs and 
> task manager logs would be helpful)? I’m suspecting that those leftover files 
> might have something to do with local recovery.
> 
> Piotrek 
> 
>> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev > > wrote:
>> 
>> After digging more in the log, I think it's more a bug. I've greped a log by 
>> job id and found under normal circumstances TM supposed to delete flink-io 
>> files. For some reason, it doesn't delete files that were listed above.
>> 
>> 2018-10-08 22:10:25,865 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
>> 2018-10-08 22:10:25,867 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
>> 2018-10-08 22:10:25,874 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
>> 2018-10-08 22:17:38,680 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close 
>> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 2018-10-08 22:17:38,686 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 2018-10-08 22:17:38,691 INFO  
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
>> Deleting existing instance base directory 
>> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> org.apache.flink.util.FlinkException: JobManager responsible for 
>> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>> 
>> 
>> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev > > wrote:
>> Dear all,
>> 
>> While running Flink 1.6.1 with RocksDB as a backend and hdfs as checkpoint 
>> FS, I've noticed that after a job has moved to a different host it leaves 
>> quite a huge state in temp folder(1.2TB in total). The files are not used as 
>> TM is not running a job on the current host. 
>> 
>> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but 
>> then it was moved to a different TM. I'm wondering is it intended behavior 
>> or a possible bug?
>> 
>> I've attached files that are left and not used by a job in PrintScreen.
> 



Questions in sink exactly once implementation

2018-10-12 Thread 徐涛
Hi 
I am reading the book “Introduction to Apache Flink”, and in the book 
there mentions two ways to achieve sink exactly once:
1. The first way is to buffer all output at the sink and commit this 
atomically when the sink receives a checkpoint record.
2. The second way is to eagerly write data to the output, keeping in 
mind that some of this data might be “dirty” and replayed after a failure. If 
there is a failure, then we need to roll back the output, thus overwriting the 
dirty data and effectively deleting dirty data that has already been written to 
the output.

I read the code of Elasticsearch sink, and find there is a 
flushOnCheckpoint option, if set to true, the change will accumulate until 
checkpoint is made. I guess it will guarantee at-least-once delivery, because 
although it use batch flush, but the flush is not a atomic action, so it can 
not guarantee exactly-once delivery. 

My question is : 
1. As many sinks do not support transaction, at this case I have to 
choose 2 to achieve exactly once. At this case, I have to buffer all the 
records between checkpoints and delete them, it is a bit heavy action.
2. I guess mysql sink should support exactly once delivery, because it 
supports transaction, but at this case I have to execute batch according to the 
number of actions between checkpoints but not a specific number, 100 for 
example. When there are a lot of items between checkpoints, it is a heavy 
action either.

Best
Henry

Re: [BucketingSink] notify on moving into pending/ final state

2018-10-12 Thread Kostas Kloudas
Hi Rinat,

I have commented on your PR and on the JIRA.
Let me know what you think.

Cheers,
Kostas

> On Oct 11, 2018, at 4:45 PM, Dawid Wysakowicz  wrote:
> 
> Hi Ribat, 
> I haven't checked your PR but we introduced a new connector in flink 1.6 
> called StreamingFileSink that is supposed to replace BucketingSink long term. 
> I think it might solve a few problems of yours. Have you checked it by chance?
> 
> Best,
> Dawid
> 
> On Thu, 11 Oct 2018, 14:10 Rinat,  > wrote:
> Hi Piotr, during the migration to the latest Flink version, we’ve decided to 
> try to contribute this functionality to the master branch.
> 
> PR is available here https://github.com/apache/flink/pull/6824 
>  
> More details about hooking the state changes in BucketingSink are available 
> in https://issues.apache.org/jira/browse/FLINK-9592 
>  
> 
> Thx !
> 
>> On 14 Jun 2018, at 23:29, Rinat > > wrote:
>> 
>> Hi Piotr, I’ve create an issue 
>> https://issues.apache.org/jira/browse/FLINK-9592 
>> 
>> 
>> The third proposal looks great, may I try to contribute this issue ?
>> 
>>> On 14 Jun 2018, at 12:29, Piotr Nowojski >> > wrote:
>>> 
>>> Hi,
>>> 
>>> Couple of things:
>>> 
>>> 1. Please create a Jira ticket with this proposal, so we can move 
>>> discussion from user mailing list.
>>> 
>>> I haven’t thought it through, so take my comments with a grain of salt, 
>>> however:
>>> 
>>> 2. If we were to go with such callback, I would prefer to have one 
>>> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
>>> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
>>> interface passed three times/four times for different purposes.
>>> 
>>> 3. Other thing that I had in mind is that BucketingSink could be rewritten 
>>> to extend TwoPhaseCommitSinkFunction. In that case, with 
>>> 
>>> public class BucketingSink2 extends TwoPhaseCommitSinkFunction
>>> 
>>> user could add his own hooks by overriding following methods
>>> 
>>> BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
>>> BucketingSink2#commit, BucketingSink2#abort. For example:
>>> 
>>> public class MyBucketingSink extends BucketingSink2 {
>>>   @Override
>>>   protected void  commit(??? txn) {
>>> super.commit(txn);
>>> // My hook on moving file from pending to commit state
>>>   };
>>> }
>>> 
>>> Alternatively, we could implement before mentioned callbacks support in 
>>> TwoPhaseCommitSinkFunction and provide such feature to 
>>> Kafka/Pravega/BucketingSink at once.
>>> 
>>> Piotrek
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru 
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 



Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-12 Thread Till Rohrmann
It is hard to tell without all logs but it could easily be a K8s setup
problem. Also problematic is that you are running a Flink version which is
no longer actively supported. Try at least to use the latest bug fix
release for 1.4.

Cheers,
Till

On Fri, Oct 12, 2018, 09:43 Abdul Qadeer  wrote:

> Hi Till,
>
> A few more data points:
>
> In a rerun of the same versions with fresh deployment, I see 
> *log*.debug(*s"RegisterTaskManager:
> $*msg*"*) in JobManager, however the
> *AcknowledgeRegistration/AlreadyRegistered *messages are never sent, I
> have taken tcpdump for the taskmanager which doesn't recover and compared
> it with another taskmanager which recovers after restart (i.e. receives
> *AcknowledgeRegistration *message).
>
> Restarting the docker container of bad taskmanager doesn't work. The only
> workaround right now is to delete the kubernetes pod holding the bad
> taskmanager container. Does it have to do something with the akka address
> the jobmanager stores for a taskmanager? The only variable I see between
> restarting container vs pod is the change in the akka address.
>
> Also, the infinite retries for registration start after the taskmanager
> container restarts with Jobmanager actor system quarantined:
>
> {"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The
> actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has
> quarantined the remote actor system akka.tcp://flink@192.168.83.52:6123.
> Shutting the actor system down to be able to reestablish a
> connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}
>
>
> A manual restart by docker restart or killing the JVM doesn't reproduce
> this problem.
>
> On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer 
> wrote:
>
>> Hi Till,
>>
>> I didn't try with newer versions as it is not possible to update the
>> Flink version atm.
>> If you could give any pointers for debugging that would be great.
>>
>> On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Abdul,
>>>
>>> have you tried whether this problem also occurs with newer Flink
>>> versions (1.5.4 or 1.6.1)?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz 
>>> wrote:
>>>
 Hi Abdul,

 I've added Till and Gary to cc, who might be able to help you.

 Best,

 Dawid

 On 11/10/18 03:05, Abdul Qadeer wrote:

 Hi,


 We are facing an issue in standalone HA mode in Flink 1.4.0 where
 Taskmanager restarts and is not able to register with the Jobmanager. It
 times out awaiting *AcknowledgeRegistration/AlreadyRegistered* message
 from Jobmanager Actor and keeps sending *RegisterTaskManager *message.
 The logs at Jobmanager don’t show anything about registration
 failure/request. It doesn’t print *log*.debug(*s"RegisterTaskManager:
 $*msg*"*) (from JobManager.scala) either. The network connection
 between taskmanager and jobmanager seems fine; tcpdump shows message sent
 to jobmanager and TCP ACK received from jobmanager. Note that the
 communication is happening between docker containers.


 Following are the logs from Taskmanager:



 {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
 to register at JobManager akka.tcp://
 flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 3
 milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}

 {"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
 ping response for sessionid: 0x1260ea5002d after
 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

 {"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
 ping response for sessionid: 0x1260ea5002d after
 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}

 {"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
 to register at JobManager akka.tcp://
 flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 3
 milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId"

Re: When does Trigger.clear() get called?

2018-10-12 Thread Fabian Hueske
Hi Andrew,

The PURGE action of a window removes the window state (i.e., the collected
events or computed aggregate) but the window meta data including the
Trigger remain.
The Trigger.close() method is called, when the winodw is completely (i.e.,
all meta data) discarded. This happens, when the time (wallclock time for
processing time or watermark for event time windows) exceeds the window's
end timestamp.

Best, Fabian

Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng :

> Hi Andrew,
>
> Do you use CountWindow? You can switch to TimeWindow to have a test.
> I'm not quite familiar with window. I checked the code and found that
> clear() is called only when timer is triggered, i.e, called at the end of
> time window.
> Hope this helps.
>
> Best, Hequn
>
> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks  wrote:
>
>> Hello,
>>
>> I see that the clear() function is implemented for various types of
>> Triggers in the Flink API. For example:
>>
>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>>
>> I am working on a custom Trigger for my application and have implemented
>> clear() in a similar way.
>>
>> However, having put a breakpoint in this function it doesn’t seem to get
>> called when I expect. The source code says that is called "when a window is
>> purged”[1] but when my Trigger emits a PURGE this function never seems to
>> get called. I am on Flink 1.3.
>>
>> Hoping someone can shed more light on the purpose of clear() and how/when
>> it gets called
>>
>> Thanks!
>> Andrew
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>>
>>


Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-12 Thread Abdul Qadeer
Hi Till,

A few more data points:

In a rerun of the same versions with fresh deployment, I see
*log*.debug(*s"RegisterTaskManager:
$*msg*"*) in JobManager, however the
*AcknowledgeRegistration/AlreadyRegistered *messages are never sent, I have
taken tcpdump for the taskmanager which doesn't recover and compared it
with another taskmanager which recovers after restart (i.e. receives
*AcknowledgeRegistration *message).

Restarting the docker container of bad taskmanager doesn't work. The only
workaround right now is to delete the kubernetes pod holding the bad
taskmanager container. Does it have to do something with the akka address
the jobmanager stores for a taskmanager? The only variable I see between
restarting container vs pod is the change in the akka address.

Also, the infinite retries for registration start after the taskmanager
container restarts with Jobmanager actor system quarantined:

{"timeMillis":1539282282329,"thread":"flink-akka.actor.default-dispatcher-3","level":"ERROR","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"The
actor system akka.tcp://flink@taskmgr-6b59f97748-fmgwn:8070 has quarantined
the remote actor system akka.tcp://flink@192.168.83.52:6123. Shutting the
actor system down to be able to reestablish a
connection!","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":49,"threadPriority":5}


A manual restart by docker restart or killing the JVM doesn't reproduce
this problem.

On Thu, Oct 11, 2018 at 11:15 AM Abdul Qadeer  wrote:

> Hi Till,
>
> I didn't try with newer versions as it is not possible to update the Flink
> version atm.
> If you could give any pointers for debugging that would be great.
>
> On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann 
> wrote:
>
>> Hi Abdul,
>>
>> have you tried whether this problem also occurs with newer Flink versions
>> (1.5.4 or 1.6.1)?
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Abdul,
>>>
>>> I've added Till and Gary to cc, who might be able to help you.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 11/10/18 03:05, Abdul Qadeer wrote:
>>>
>>> Hi,
>>>
>>>
>>> We are facing an issue in standalone HA mode in Flink 1.4.0 where
>>> Taskmanager restarts and is not able to register with the Jobmanager. It
>>> times out awaiting *AcknowledgeRegistration/AlreadyRegistered* message
>>> from Jobmanager Actor and keeps sending *RegisterTaskManager *message.
>>> The logs at Jobmanager don’t show anything about registration
>>> failure/request. It doesn’t print *log*.debug(*s"RegisterTaskManager: $*
>>> msg*"*) (from JobManager.scala) either. The network connection between
>>> taskmanager and jobmanager seems fine; tcpdump shows message sent to
>>> jobmanager and TCP ACK received from jobmanager. Note that the
>>> communication is happening between docker containers.
>>>
>>>
>>> Following are the logs from Taskmanager:
>>>
>>>
>>>
>>> {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>>> to register at JobManager akka.tcp://
>>> flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 3
>>> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>>>
>>> {"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>>> ping response for sessionid: 0x1260ea5002d after
>>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>>>
>>> {"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>>> ping response for sessionid: 0x1260ea5002d after
>>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>>>
>>> {"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>>> to register at JobManager akka.tcp://
>>> flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 3
>>> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>>>
>>> {"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>>> ping response for sessionid: 0x1260ea5002d after
>>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadP

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-12 Thread Jörn Franke
Thank you very nice , I fully agree with that. 

> Am 11.10.2018 um 19:31 schrieb Zhang, Xuefu :
> 
> Hi Jörn,
> 
> Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact 
> it is one of the two approaches that I named in the beginning of the thread. 
> As also pointed out there, this isn't mutually exclusive from work we 
> proposed inside Flink and they target at different user groups and user 
> cases. Further, what we proposed to do in Flink should be a good showcase 
> that demonstrate Flink's capabilities in batch processing and convince Hive 
> community of the worth of a new engine. As you might know, the idea 
> encountered some doubt and resistance. Nevertheless, we do have a solid plan 
> for Hive on Flink, which we will execute once Flink SQL is in a good shape.
> 
> I also agree with you that Flink SQL shouldn't be closely coupled with Hive. 
> While we mentioned Hive in many of the proposed items, most of them are 
> coupled only in concepts and functionality rather than code or libraries. We 
> are taking the advantage of the connector framework in Flink. The only thing 
> that might be exceptional is to support Hive built-in UDFs, which we may not 
> make it work out of the box to avoid the coupling. We could, for example, 
> require users bring in Hive library and register themselves. This is subject 
> to further discussion.
> 
> #11 is about Flink runtime enhancement that is meant to make task failures 
> more tolerable (so that the job don't have to start from the beginning in 
> case of task failures) and to make task scheduling more resource-efficient. 
> Flink's current design in those two aspects leans more to stream processing, 
> which may not be good enough for batch processing. We will provide more 
> detailed design when we get to them.
> 
> Please let me know if you have further thoughts or feedback.
> 
> Thanks,
> Xuefu
> 
> 
> --
> Sender:Jörn Franke 
> Sent at:2018 Oct 11 (Thu) 13:54
> Recipient:Xuefu 
> Cc:vino yang ; Fabian Hueske ; dev 
> ; user 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
> 
> Would it maybe make sense to provide Flink as an engine on Hive 
> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
> coupled than integrating hive in all possible flink core modules and thus 
> introducing a very tight dependency to Hive in the core.
> 1,2,3 could be achieved via a connector based on the Flink Table API.
> Just as a proposal to start this Endeavour as independent projects (hive 
> engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
> distant future if the Hive integration is heavily demanded one could then 
> integrate it more tightly if needed. 
> 
> What is meant by 11?
> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> 
> Hi Fabian/Vno,
> 
> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
> to the spam folder.)
> 
> My proposal contains long-term and short-terms goals. Nevertheless, the 
> effort will focus on the following areas, including Fabian's list:
> 
> 1. Hive metastore connectivity - This covers both read/write access, which 
> means Flink can make full use of Hive's metastore as its catalog (at least 
> for the batch but can extend for streaming as well).
> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
> created by Hive can be understood by Flink and the reverse direction is true 
> also.
> 3. Data compatibility - Similar to #2, data produced by Hive can be consumed 
> by Flink and vise versa.
> 4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
> own implementation or make Hive's implementation work in Flink. Further, for 
> user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
> to import them into Flink without any code change required.
> 5. Data types -  Flink SQL should support all data types that are available 
> in Hive.
> 6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) 
> with extension to support Hive's syntax and language features, around DDL, 
> DML, and SELECT queries.
> 7.  SQL CLI - this is currently developing in Flink but more effort is needed.
> 8. Server - provide a server that's compatible with Hive's HiverServer2 in 
> thrift APIs, such that HiveServer2 users can reuse their existing client 
> (such as beeline) but connect to Flink's thrift server instead.
> 9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
> application to use to connect to its thrift server
> 10. Support other user's customizations in Hive, such as Hive Serdes, storage 
> handlers, etc.
> 11. Better task failure tolerance and task scheduling at Flink runtime.
> 
> As you can see, achieving all those requires significant effort and across 
> all layers in Flink. Howev

Re: How do I initialize the window state on first run?

2018-10-12 Thread bupt_ljy
Hi, vivo,
  My Flink program is to aggregate the data of a whole day, assume we start 
this program on 6:00 am, the default state in the window should be the 
aggregated result of 0:00 am to 6:00 am.


Original Message
Sender:vino yangyanghua1...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Friday, Oct 12, 2018 15:13
Subject:Re: How do I initialize the window state on first run?


Hi Jiayi,


If you don't mind, I would like to ask you what kind of situation do you have 
in this situation?



Thanks, vino.


bupt_ljy bupt_...@163.com 于2018年10月12日周五 下午1:59写道:

Hi,
 I’m going to run a new Flink program with some initialized window states.
 I can’t see there is an official way to do this, right? I’ve tried the bravo 
project, but it doesn’t support FsStateBackend and it costs too much work if we 
add a new StateBackend in it.
 Any good ideas about this?






Jiayi Liao,Best

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-12 Thread Taher Koitawala
Sounds smashing; I think the initial integration will help 60% or so flink
sql users and a lot other use cases will emerge when we solve the first one.

Thanks,
Taher Koitawala




On Fri 12 Oct, 2018, 10:13 AM Zhang, Xuefu,  wrote:

> Hi Taher,
>
> Thank you for your input. I think you emphasized two important points:
>
> 1. Hive metastore could be used for storing Flink metadata
> 2. There are some usability issues around Flink SQL configuration
>
> I think we all agree on #1. #2 may be well true and the usability should
> be improved. However, I'm afraid that this is orthogonal to Hive
> integration and the proposed solution might be just one of the possible
> solutions. On the surface, the extensions you proposed seem going beyond
> the syntax and semantics of SQL language in general.
>
> I don't disagree on the value of your proposal. I guess it's better to
> solve #1 first and leave #2 for follow-up discussions. How does this sound
> to you?
>
> Thanks,
> Xuefu
>
> --
> Sender:Taher Koitawala 
> Sent at:2018 Oct 12 (Fri) 10:06
> Recipient:Xuefu 
> Cc:Rong Rong ; Timo Walther ;
> dev ; jornfranke ; vino yang <
> yanghua1...@gmail.com>; Fabian Hueske ; user <
> user@flink.apache.org>
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> One other thought on the same lines was to use hive tables to store kafka
> information to process streaming tables. Something like
>
> "create table streaming_table (
> bootstrapServers string,
> topic string, keySerialiser string, ValueSerialiser string)"
>
> Insert into streaming_table values(,"10.17.1.1:9092,10.17.2.2:9092,
> 10.17.3.3:9092", "KafkaTopicName", "SimpleStringSchema",
> "SimpleSchemaString");
>
> Create table processingtable(
> //Enter fields here which match the kafka records schema);
>
> Now we make a custom clause called something like "using"
>
> The way we use this is:
>
> Using streaming_table as configuration select count(*) from
> processingtable as streaming;
>
>
> This way users can now pass Flink SQL info easily and get rid of the Flink
> SQL configuration file all together. This is simple and easy to understand
> and I think most users would follow this.
>
> Thanks,
> Taher Koitawala
>
> On Fri 12 Oct, 2018, 7:24 AM Taher Koitawala, 
> wrote:
> I think integrating Flink with Hive would be an amazing option and also to
> get Flink's SQL up to pace would be amazing.
>
> Current Flink Sql syntax to prepare and process a table is too verbose,
> users manually need to retype table definitions and that's a pain. Hive
> metastore integration should be done through, many users are okay defining
> their table schemas in Hive as it is easy to main, change or even migrate.
>
> Also we could simply choosing batch and stream there with simply something
> like a "process as" clause.
>
> select count(*) from flink_mailing_list process as stream;
>
> select count(*) from flink_mailing_list process as batch;
>
> This way we could completely get rid of Flink SQL configuration files.
>
> Thanks,
> Taher Koitawala
>
> Integrating
> On Fri 12 Oct, 2018, 2:35 AM Zhang, Xuefu, 
> wrote:
> Hi Rong,
>
> Thanks for your feedback. Some of my earlier comments might have addressed
> some of your points, so here I'd like to cover some specifics.
>
> 1. Yes, I expect that table stats stored in Hive will be used in Flink
> plan optimization, but it's not part of compatibility concern (yet).
> 2. Both implementing Hive UDFs in Flink natively and making Hive UDFs work
> in Flink are considered.
> 3. I am aware of FLIP-24, but here the proposal is to make remote server
> compatible with HiveServer2. They are not mutually exclusive either.
> 4. The JDBC/ODBC driver in question is for the remote server that Flink
> provides. It's usually the servicer owner who provides drivers to their
> services. We weren't talking about JDBC/ODBC driver to external DB systems.
>
> Let me know if you have further questions.
>
> Thanks,
> Xuefu
>
> --
> Sender:Rong Rong 
> Sent at:2018 Oct 12 (Fri) 01:52
> Recipient:Timo Walther 
> Cc:dev ; jornfranke ; Xuefu <
> xuef...@alibaba-inc.com>; vino yang ; Fabian
> Hueske ; user 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Hi Xuefu,
>
> Thanks for putting together the overview. I would like to add some more on
> top of Timo's comments.
> 1,2. I agree with Timo that a proper catalog support should also address
> the metadata compatibility issues. I was actually wondering if you are
> referring to something like utilizing table stats for plan optimization?
> 4. If the key is to have users integrate Hive UDF without code changes to
> Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern
> mostly on the support of Hive UDFs that should be implemented in
> Flink-table natively?
> 7,8. Correct me if I am wrong, but I feel like some of the related
> components might

Re: How do I initialize the window state on first run?

2018-10-12 Thread vino yang
Hi Jiayi,

If you don't mind, I would like to ask you what kind of situation do you
have in this situation?

Thanks, vino.

bupt_ljy  于2018年10月12日周五 下午1:59写道:

> Hi,
>
>I’m going to run a new Flink program with some initialized window
> states.
>
>I can’t see there is an official way to do this, right? I’ve tried the
> bravo project, but it doesn’t support FsStateBackend and it costs too much
> work if we add a new StateBackend in it.
>
>Any good ideas about this?
>
>
>
>
> Jiayi Liao,Best
>
>
>