Re: Testing Flink Jobs

2021-01-12 Thread KristoffSC
Hi, 
that helped however there is a problem with JobStatus. Please refer to [1]

In my case JobStatus is already Running but not all task are running. 
Any idea how to get task status from MiniCluster?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Testing Flink Jobs

2021-01-11 Thread KristoffSC
Hi,
I would like to write few tests that would check the message flow in my
Flink pipeline. 
I would like to base my test on [1].

My StreamJob class, that has the main method has all Sinks and Source
pluggable. The implementations are based also on [1].

In all examples available online I can see that in the actual test method
env.execute() is called, which starts deployment of a job.

However in my case, the deployment of job takes some significant amount of
time. This is caused by fact that we need to load some "special" libraries
that should not be mocked for tests. That is why, we would like to call it
only once, hence deploy the job on a MiniClsuter only once.

My StreamJob.main method contains all pipeline setup plus call to
env.execute().


However when I do that, for example when I initiate my job in another
ClassRule method or BeforeClass method, I noticed that tests hangs. The
thread from Junit is actually waiting on env.execute(). which in my case
never ends. However the underlying minicluster is working fine. 


Questions:
1. what would be a preferred way to setup my tests, when I would like to
deploy my StreamJobOnly once
2. how can i check if a cluster, used in my tests is ready, and Job
deployment is finished.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Logging on EMR

2020-12-29 Thread KristoffSC
Hi Mars,
Were you able to solve this problem?

I'm facing exact same issue. I dont see logs from taskmanager from my
operators (taskmnager.out file) on EMR although running this locally from
IDE logs are printed. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Exception: This method must be called from inside the mailbox thread

2020-11-24 Thread KristoffSC
Hi  Arvid,
Thank you for your answer. 

And what if a) would block task's thread?
Let's say I'm ok with making entire task thread to wait on this third party
lib. 

In that case I would be safe from having this exception even though I would
not use AsyncIO?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Exception: This method must be called from inside the mailbox thread

2020-11-24 Thread KristoffSC
Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot
reproduce it. However I think something is lurking there...

I cannot post full stack trace and user code however I will try to describe
the problem.

Setup without any resource groups with only one Operator chain restriction
mentioned below.

chained task #1 - AsyncOperator with orderedWait calling 3rd party system
forwards to
chained task #2 - with: 
a) ProcessFunction A calling multi threaded library. in Process Function we
do
CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
b) Process Function B (no multi thread operations)
c) AsyncOperator with ordered wait calling 3rd party system
d) process Function

Between task #1 and #2 there is a .startNewChain() so separate those two
tasks. 

During load tests we got:
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!

The question is, what it actually means and when it may happen?

The "full" stack trace, from where I had to remove user code:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
user---Code---calls
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
user---Code---calls
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
user---Code---calls
at 

Re: Using managed keyed state with AsynIo

2020-08-14 Thread KristoffSC
Thanks Arvid,
I like your propositions in my case I wanted to use the state value to
decide if I should do the Async Call to external system. The result of this
call would be a state input. So having this:

Process1(calcualteValue or take it from state) -> AsyncCall to External
system to persist/Validate the value -> Process2(feedback loop Via
meessagibg queue to process1).

Apart from that Process1 would have to consume two streams, which is ok, I
woudl actually have a delay. I wanted to avouid uneceserry calls to External
system by having the cashed/validated value in state. 

And this would be done without the delay if I could use State in Async
Operators. 


I'm finking bout manufacturing my own Semi Async Operator. My Idea is that I
would have normal KeyedProcessFunction that will wrap list of
SingleThreadExecutors.

In processElement method I will use Key to calculate the index of that Array
to make sure that message for same Key will go to the same ThreadExecutor. I
do want to keep the message order.

I will submit a task like
executor.submit(() -> {
MyResult result = rservice.process(message, mapState.get(key));
mapState.put(key, result);
out.collect(newMessage);
}



Big questions:
1. In my solution  out.collect(newMessage); will be called from few threads
(each will have different message). Is it ThreadSafe?
2. Is using the MapState in multiThreadEnv like I would have here is thread
safe?
Alternativelly I can have associate list of mapStates, one for each
SingleThreadExecutors, so It will be used only by one thread.

With this setup I will not block my Pipeline and I will be able to use
state. I agree that Size of SingleThreadExecutors list will be a limiting
factor. 


Is this setup possible with Flink?


Btw I will use RocksDbStateBackend






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using managed keyed state with AsynIo

2020-08-13 Thread KristoffSC
Hi Arvid,
thank you for the respond. 
Yeah I tried to run my job shortly after posting my message and I got "State
is not supported in rich async function" ;)

I came up with a solution that would solve my initial problem -
concurrent/Async problem of processing messages with the same key but
unfortunately stet is not sported here.

Thank you for the proposition
source -> keyby -> map (retrieve state) -> async IO (use state) -> map
(update state)

However I'm a little bit surprised. I thought that state on a keyed cannot
be shared between operators, and here you are suggesting doing that. Is it
possible then?


Using this occasion I have additional question, Is there any difference from
Flink perspective between this two approaches:

MyProcessFunction pf = new MyProcessFunction(); MyProcessFunction is
stateless object, but it uses Flink keyed state.

Setup 1:

source -> keyBy(key) ->  proces(pf) -> map() -> process(pf) -> sink

Setup 2:
source -> keyBy(key) ->  proces(new MyProcessFunction()) -> map() ->
process(new MyProcessFunction()) -> sink 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What async database library does the asyncio code example use?

2020-08-12 Thread KristoffSC
Hi,
I do believe that example from [1] where you see DatabaseClient is just a
hint that whatever library you would use (db or REST based or whatever else)
should be asynchronous or should actually not block. It does not have to be
non blocking until it runs on its own thread pool that will return a feature
or somewhat allowing you to register resultFuture.complete(...) on that
future.

I actually write my own semi library that registers onto
resultFuture.complete(...) from each library thread.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to use FsBackBackend without getting deprecation warning

2020-08-10 Thread KristoffSC
Hi,
had the same problem. 

Try this:
env.setStateBackend((StateBackend) new
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

yeah... not the cleanest way... I guess the API is not that clean after all. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Using managed keyed state with AsynIo

2020-08-10 Thread KristoffSC
Hi guys,
I'm using Flink 1.9.2

I have a question about uses case where I would like to use FLink's managed
keyed state with Async IO [1]


Lets take as a base line below example taken from [1] and lets assume that
we are executing this on a keyed stream.

final Future result = client.query(key);

CompletableFuture.supplyAsync(new Supplier() {

@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key,
dbResult)));
});


Imagine that instead passing key to client.query(..) we will pass some value
taken from Flinks Managed, keyed state. Later the supplier's get method will
return a value that should be stored in that state. In other words, we use
previous results as inputs for next computations. 

Is this achievable with Flinks AsyncIo? I can have many pending requests on
client.query which can finished in a random order. The
AsyncDataStream.orderedWait will not help he here since this affects only
the way how Flink "releases" the messages from it's internal queue for Async
operators. 


What is more, this scenario can result with multiple concurrent writes/reads
to/from Flink's managed state for same key values. Is this thread safe?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread KristoffSC
Theo,
thank you for clarification and code examples. 
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread KristoffSC
Hi,
Any ideas about that one?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Pojo Serialization for Map Values

2020-07-13 Thread KristoffSC
Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();

public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
public long timestamp;
public double value;
public Map badPojo = new HashMap<>();

public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps should be handled in Pojo definition to get best Serialization
performance (assuming that I do need access that map).

Thanks,
Krzysztof


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with Custom Trigger

2020-06-24 Thread KristoffSC
I think I've figured it out.

I switched to GlobalWidnow with my custom trigger. My Trigger combines
processingTime trigger logic and onElement trigger logic. Only one should be
executed in scope of particular window. 

I managed to do this by returning FIRE_AND_PURGE and cleat all timers and
state whenever I'm closing the window.

In my case I don't have "late events" that should be added into previously
ended window so it simplifies the job.

Thanks :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Use CheckpointedFunction interface on Custom Window Trigger

2020-06-24 Thread KristoffSC
Hi all,
is it possible that Custom Window Trigger (extending Trigger class) will
also implement CheckpointedFunction? 

In my custom Trigger I have a complicated logic executed in
Trigger::onElement method.
Currently I'm using a triggerctx.getPartitionedState to do all reads and
writes for data management. 

However I was wondering if it is possible to implement this interface, keep
state in Trogger's local Java variables and persist it into StateBeckend
only on snapshot.

I'm using RocksDB state backend and Flink 1.9.2



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with Custom Trigger

2020-06-23 Thread KristoffSC
It seems that I'm clearing the timers in a right way, but there is a new
timer created from WindowOperator::registerCleanupTimer method. This one is
called from WindowOperator::processElement at the end of both if/else
branches.

How can I mitigate this? I dont want to have any "late firings" for my
windows. The windwo should be fully closed after Trigger::onElement or
Trigger::onProcessingTimne method if there was no onElement Fire result
created.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with Custom Trigger

2020-06-23 Thread KristoffSC
Hi Marco Villalobos-2
unfortunately I don't think Tumbling window will work in my case.

The reasons:
1. Window must start only when there is a new event, and previous window is
closed. The new Tumbling window is created just after previews one is
purged. In my case I have to use SessionWindow where Session windows do not
overlap and do not have a fixed start and end time, in contrast to tumbling
windows and sliding windows [1]. 

2. My logic required to close the window earlier, before Window maxTime
hence custom trigger.


The issue I'm having though is that My Trigger firres the windwo two times. 
First time from onElement method and secodn time from onProcessignTime 
method.




[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with Custom Trigger

2020-06-23 Thread KristoffSC
One addition:
in clear method of my custom trigger I do call
ctx.deleteProcessingTimeTimer(window.maxTimestamp());



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Session Window with Custom Trigger

2020-06-23 Thread KristoffSC
Hi all,
I'm using Flink 1.9.2 and I would like to ask about my use case and approach
I've took to meet it. 

The use case: 
I have a keyed stream, where I have to buffer messages with logic:
1. Buffering should start only when message arrives.
2. The max buffer time should not be longer than 3 seconds
3. Each new message should NOT prolong the buffer time.
4. If particular business condition will be meet, buffering should stop and
all messages should be let through further processing.

The business logic in point 4 is taking under the consideration data from
previously buffered messages in this time buffer session.

My setup for this is
1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for
this).
2. Custom Trigger

The custom trigger:
1. keeps some data in its state under AggregatingStateDescriptor allowing me
to override "merge" method from Trigger class.

2. In onElement method, for the first call I execute
ctx.registerEventTimeTimer(window.maxTimestamp());
Additionally in this method I added the busioenss logic which returns
TriggerResult.FIRE or TriggerResult.CONTINUE

3. The onProcessingTime methods returns TriggerResult.FIRE

3. all other methods are returning TriggerResult.CONTINUE



As a result, I can observe that my window is fired two times. One from
onElement method where the busienss condition is meet and second time from
onProcessingTime method.

What is the best way to prevent this?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Support for Flink in EMR 6.0

2020-05-04 Thread KristoffSC
Actually it seems there is already ongoing discussion about installing Flink
1.10 on EMR

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/upgrade-flink-from-1-9-1-to-1-10-0-on-EMR-td34114.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Processing Message after emitting to Sink

2020-04-15 Thread KristoffSC
My point was, that as far as I know, Sinks are "terminating" operators, that
ends the stream like .collect in Java 8 stream API. The don't emit elements
further and I cannot link then in a way:

source - proces - sink - process - sink

Sink function produces DataStreamSink which is used for emitting elements
from a streaming topology. 
It is not SingleOutputStreamOperator or DataStream that I can use as input
for next operator.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Processing Message after emitting to Sink

2020-04-15 Thread KristoffSC
Thank you very much for your answer.

I have a question regarding your first paragraph:
" it requires that a sink participates in the pipeline. So it is not located
as a "leaf" operator but location somewhere in the middle."

Isn't Sink a terminating operator? So as far as I know Sinks cannot be in
the middle of stream chain.


I will appreciate other comments as well.

Thanks,
Krzysztof 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Processing Message after emitting to Sink

2020-04-14 Thread KristoffSC
Hi all,
I have a special use case that I'm not sure how I can fulfill.

The use case is:
I have my main business processing pipe line that has a MQ source,
processFunction1, processFunction2  and MQ sink

PocessFunction1 apart from processing the main business message is also
emitting some side effects using side outputs. Those side outputs are send
to SideOutputMqSink that sends them to the queue.

The requirement is that PocessFunction1 must not send out the main business
message further to processFunction2 until side output from processFunction1
is send to the queue via SideOutputMqSink.

In general I don't have to use side outputs, although I do some extra
processing on them before sending to the sink so having sideOutput stream is
nice to have. Never the less, the key requirement is that we should wait
with further processing until side utput is send to the queue. 

I could achieve it in a way that my processFunction1 in processElement
method will call MQ directly before sending out the main message, although I
dot like that idea. 

I was thinking is there a way to have a Sink function that would be also a
FlatMap function?

The best solution would be to be able to process two streams (main and side
effect) in some nice way but with some barrier, so the main pipeline will
wait until side output is send. 
Both streams can be keyed. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State size Vs keys number perfromance

2020-04-08 Thread KristoffSC
Thanks Congxian Qiu,
I'm aware about your second point. In Value state I will keep String or very
simple POJO, without any collections inside. 

I didn't get your third point, could you clarify it please?
"disk read/write is somewhat about the whole state size"

Actually what I will keep in Value state is what it would be kept in single
MapState entry. Depends what key I will choose, my state can be "broader"
where I will use MapState, or can be very narrow so I will be able to use
Value state that will keep actually only one entry.

This is the essence of my question , what are the trade offs here.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


State size Vs keys number perfromance

2020-04-07 Thread KristoffSC
Hi,
I would to ask about what has more memory footprint and what could be more
efficient regarding 
less keys with bigger keyState vs many keys with smaller keyState

For this use case I'm using RocksDB StateBackend and state TTL is, well..
infinitive. So I'm keeping the state forever in Flink.

The use case:
I have a stream of messages that I have to process it in some custom way.
I can take one of two approaches

1. use a keyBy that will give me some number of distinct keys but for each
key, the state size will be significant. It will be MapState in this case.
The keyBy I used will still give me ability to spread operations across
operator instances. 

2. In second approach I can use a different keyBy, where I would have huge
number of distinct keys, but each keyState will be very small and it will be
a ValueState in this case.

To sum up:
"reasonable" number of keys with very big keySatte VS huge number of keys
with very small state each.

What are the pros and cons for both?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Creating singleton objects per task manager

2020-04-07 Thread KristoffSC
Hi Seth,
I would like to piggyback on this question :)

You wrote:
"I would strongly encourage you to create one instance of your object per
ProcessFunction, inside of open. That would be one instance per slot which
is not equal to the parallelism of your operator."

Especially the second part "That would be one instance per slot which is not
equal to the parallelism of your operator"

For my understanding the number of processFunction instances is equal to the
parallelism level of this operator. Paralleled instances are not deployed on
the same task slot, therefore if you create objects in open() method then
you will have as many objects as there are processFunction instances which
in my understanding is equal to the parallelism level for this operator.

Thanks,



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Storing Operator state in RocksDb during runtime - plans

2020-04-05 Thread KristoffSC
Hi,
according to [1] operator state and broadcast state (which is a "special"
type of operator state) are not stored in RocksDb during runtime when
RocksDb is choosed as state backend. 

Are there any plans to change this?

I'm working around a case where I will have a quite large control stream. 


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Questions regarding Key Managed state

2020-04-03 Thread KristoffSC
Thank you for your answers. 

I have one more question. 
The Key Managed state for Keyed stream is per key or per operator?

For example I have a keyed stream that is processed by MyProcessFunction
with parallelism = 3. So I have three instances of MyProcessFuntion. The
process function has a KeyManaged state field (Value/List/Key state).

I will have quite big umber of distinct key values in my stream. 
With this, the each state will be per each key value or function instance or
maybe key group?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink long Running Jon on AWS EMR/Amazon Kinesis Data Analytics

2020-04-02 Thread KristoffSC
Hi,
I'm interested with uising Managed Flink service. 
does anyone has an experience with hosting long running (generally 24/7) 
Flink jobs on AWS EMR?

I'm interested, is it stable enough to host long running state size
intensive job.  
With EMR i have all the config, HA zookeeper part handled by AWS with
ability to customize Flink config files. As far as I know, only the state
backed is predefined as RocksDb. 

On the other hand there is Amazon Kinesis Data Analytics, where there is not
much to change on underlying Flink cluster. Actually I', not sure what I can
and cannot change when using Flink on Amazon Kinesis Data Analytics. But
question will be the same. Is it suitable for long running (24/7) state size
heavy jobs. 


Regards, 
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Questions regarding Key Managed state

2020-04-02 Thread KristoffSC
Hi 
I have few question regarding Flink's state.

Lets say we have:

Case 1.
stream.keybBy(...).process(myProcessFunction).parallelism(3).

MyProcessFucntion uses a managed state (mapState, ListState etc). I'm using
state checkpoints.

Flink will redistribute events across 3 instances of myProcessFunction
according to keyby function.
When job is restarted with the same parallelism level, state is recovered
from last checkpoint and traffic is redistributed across process Function
with the same manner. 

What will happen though, if I will increase the parallelism level to 4. 
The traffic will be distributed across 4 instances now, so key that was
originally going to operator 3, now can go to operator 4. What will happen
with managed state that originally was builder for this key. Will it be
accessible from new operator instance now? From [1] where I can read " Flink
is able to automatically redistribute state when the parallelism is changed,
and also do better memory management." I will assume YES.

Case 2:
Lets assume I have two operators, where each of them is using a managed
state. From documentation I can read that managed state can be used only on
a keyed stream. This means that I will have to key my stream twice (or more)
if I want to use managed stream in all of my operators? What if the actual
keyBy function will be the same for all pipeline. Each keyBy function hit
performance right?


Case 3:
Is there a possibility to use managed state on non keyed stream? For example
I have a process function that has a "map of key value mappings" This map
can be delivered/build using a broadcast state pattern and can be quite big.
Sounds like a good place to use MapState, but the stream is not keyed. 
How can I approach this?

Lets assume for all cases that I'm using a RocksDB state backend 

Thanks,


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reinterpreting a pre-partitioned data stream as keyed stream

2020-04-02 Thread KristoffSC
Hi, 
sorry for a long wait. 

Answering our questions: 
1 - yes
2 - thx
3 - rigth, understood
4 - well, in general I want to understand how this works. To be able in
future to modify my job, for example extracting cpu heavy operators to
separate tasks. Actually in my job some of my operators are chained and some
of them are not, depending on the logic they are executing. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing RichAsyncFunction with TestHarness

2020-04-02 Thread KristoffSC
Thanks, 
I would suggest adding my "tutorial" about using testHarnes for
AsynOperators, to the documentation. Or maybe build something based on this
use case, that could be helpful for others in the future :)

Thanks, 
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
HI :) I have finally figured it out :)

On top of changes from last email,
in my test method, I had to wrap "testHarness.processElement" in
synchronized block, like this:

  @Test
 public void foo() throws Exception {
synchronized (this.testHarness.getCheckpointLock()) {
  testHarness.processElement(MyMessage.builder().build(), 1L);
}
  }

That worked. 

I think that this could be added to official documentation in [1]. 


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing RichAsyncFunction with TestHarness

2020-03-29 Thread KristoffSC
Hi, 
another update on this one. 
I managed to make the workaround a little bit cleaner. 

The test setup I have now is like this:

ByteArrayOutputStream streamEdgesBytes = new ByteArrayOutputStream();
ObjectOutputStream oosStreamEdges = new
ObjectOutputStream(streamEdgesBytes);
oosStreamEdges.writeObject(Collections.emptyList());

KryoSerializer kryoSerializer = new KryoSerializer<>(
MyMessage.class, executionConfig);
ByteArrayOutputStream kryoSerializerBytes = new ByteArrayOutputStream();
ObjectOutputStream oosKryoSerializer = new
ObjectOutputStream(kryoSerializerBytes);
oosKryoSerializer.writeObject(kryoSerializer);

Configuration configuration = new Configuration();
configuration.setBytes("edgesInOrder", streamEdgesBytes.toByteArray());
configuration.setBytes("typeSerializer_in_1",
kryoSerializerBytes.toByteArray());

MockEnvironment environment = MockEnvironment.builder().build();
ExecutionConfig executionConfig = environment.getExecutionConfig();
environment.getTaskConfiguration().addAll(configuration);

this.testHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.UNORDERED), environment);

With this setup, this.testHarness.open(); works. 
However there is another problem, 
When calling:
testHarness.processElement(myMessage, 1L); 
it throws another exception:

java.lang.AssertionError
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addAsyncBufferEntry(AsyncWaitOperator.java:400)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:228)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:112)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:107)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing RichAsyncFunction with TestHarness

2020-03-28 Thread KristoffSC
I think I got this to work, although with "nasty" workaround.

I've debugged that configuration for this testHarnes operator was missing
two entries:
"edgesInOrder"
"typeSerializer_in_1"

I added conditional break points to InstantiationUtils.readObjectFromConfig
method for those two keys and I ran my "real" FlinkJob from IntelliJ.

I saw that for "edgesInOrder" an empty array of StreamEdge object was added
and for  "typeSerializer_in_1" the instance of PojoSerializer class.

I took the byte[] for those two and simply added those to arrays to my
TestHarnes setup under appropriate keys, like this:

Configuration configuration = new Configuration();
configuration.setBytes("edgesInOrder", emptyEdgesListBytes);
configuration.setBytes("typeSerializer_in_1", pojoSerializerBytes);

MockEnvironment environment = MockEnvironment.builder().build();
environment.getTaskConfiguration().addAll(configuration);

Then I used this mock environment to initialize
OneInputStreamOperatorTestHarness for AsyncWaitOperator.

That seems work, but its a workaround though. 






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
I've debug it a little bit and I found that it fails in
InstantiationUtil.readObjectFromConfig method when we execute
byte[] bytes = config.getBytes(key, (byte[])null);  This returns null.

The key that it is looking for is "edgesInOrder". In the config map, there
are only two entries though. 
For "checkpointing -> {Boolean@6347} true" and "operatorID ->
{byte[16]@6351} "




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Testing RichAsyncFunction with TestHarness

2020-03-27 Thread KristoffSC
Hi,
Im trying to test my RichAsyncFunction implementation with
OneInputStreamOperatorTestHarness based on [1]. I'm using Flink 1.9.2

My test setup is:
 this.processFunction = new MyRichAsyncFunction();
this.testHarness = new OneInputStreamOperatorTestHarness<>(
new AsyncWaitOperator<>(processFunction, 2000, 1,
OutputMode.ORDERED));

this.testHarness.open();

I'm having below exception when calling  this.testHarness.open();

java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.(StreamElementSerializer.java:64)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:142)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:287)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:275)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:393)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:300)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:308)
at
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:483)


I will appreciate help with this one.

Additionally even though I add all necessary dependencies defiend in [1] I
cannot see ProcessFunctionTestHarnesses class.

Thanks.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


"Legacy Source Thread" line in logs

2020-03-27 Thread KristoffSC
Hi all,
When I run Flink from IDE i can see this prefix in logs
"Legacy Source Thread"

Running the same job as JobCluster on docker, this prefix is not present.
What this prefix means?
Btw, I'm using [1] as ActiveMQ connector.

Thanks.

[1]
https://github.com/apache/bahir-flink/tree/master/flink-connector-activemq





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.time.LocalDateTime in POJO type

2020-03-05 Thread KristoffSC
Thanks,
do you have any example how I could use it?

Basically I have a POJO class that has LocalDateTime filed in it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread KristoffSC
Hi Tzu-Li,
I think you misunderstood Oskar's question. 
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String, Date, BigDecimal, and BigInteger.

So we have Java Date, the question is, will there be a support for
LocalDateTime? 

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink on AWS - ActiveMQ connector

2020-03-02 Thread KristoffSC
Hi all,
In AWS documentation [1] we can see that AWS provides some set of connectors
for Flink. I would need to use an ActiveMQ one provided by [2]. Currently
I'm using Docker based stand alone Job Cluster and not AWS one. 

Whats up with those connectors provided by AWS? Will I be able to use my
connector in AWS? I assume that its Jar will be part of Job jar.


[1] https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/#connectors-in-apache-bahir



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How JobManager and TaskManager find each other?

2020-03-01 Thread KristoffSC
Thanks about clarification for NAT,

Moving NAT issue aside for a moment",

Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
familiar with [1] [2] but in there there is no information about sending the
IP information of Task managers.


Another question is how this all sums for Kubernetes Job Session Cluster
deployment when nodes will be deployed across many physical machines inside
Kubernetes cluster.
If I'm using Kubernetes like described in [3]

The final question would be, do I have to modify jobmanager.rpc.address and
flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
default values are localhost. 
Or just following [3] will be fine?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
[3]
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How can I programmatically set RocksDBStateBackend?

2020-02-27 Thread KristoffSC
Hi,
I had same case but with FsStateBackend.

Cast it to StateBackend type. 
env.setStateBackend((StateBackend) new FsStateBackend("some/Path"));


I think this is some inconsistency in API. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How JobManager and TaskManager find each other?

2020-02-27 Thread KristoffSC
/So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is
different?/

Well that also but actually I was thinking about running FLink on PaaS
platforms where process can be re-spawned during runtime on different
machine under different IP.

Supposedly OpenShift is doing this. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How JobManager and TaskManager find each other?

2020-02-26 Thread KristoffSC
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only. 
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.

Also what if there is NAT between Task Manager and Job Manager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How JobManager and TaskManager find each other?

2020-02-26 Thread KristoffSC
Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink Cluster on PaaS

2020-02-21 Thread KristoffSC
Thank you Yang Wang,

Regarding [1] and a sentence from that doc.
"This page describes deploying a standalone Flink session"

I always wonder what do you guys mean by "Standalone Flink session" or
"Standalone Cluster" that can be found here [2].

I'm using a Docker with Job Cluster approach, I know that there is also a
Session Cluster docker images. I understand the differences, but I'm not
sure what you are referring to using those to terms from [1] and [2].

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#cluster_setup.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Running Flink Cluster on PaaS

2020-02-20 Thread KristoffSC
Hi all,
are there any obstacles from running Flink Cluster on PaaS like OpenShift
for example?
Where for example, task manager could be reasigned to different physical
box?
Especially when Flink will be in form of Docker Job Cluster.

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How JobManager and TaskManager find each other?

2020-02-20 Thread KristoffSC
Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?

Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
B not A?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Side Outputs from RichAsyncFunction

2020-02-19 Thread KristoffSC
Hi,
any thoughts about this one?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Side Outputs from RichAsyncFunction

2020-02-18 Thread KristoffSC
Hi all,
Is there a way to emit a side output from RichAsyncFunction operator like it
is possible with ProcessFunctions via ctx.output(outputTag, value); At first
glance I don't see a way to do it

In my use case RichAsyncFunction is used to call REST services and I would
like to handle REST error codes and exceptions by emitting special Events as
a SideOutput.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread KristoffSC
Hi all,
I have a small question regarding 1.10

Correct me if I'm wrong, but 1.10 should support Java 11 right?

If so, then I noticed that docker images [1] referenced in [2] are still
based on openjdk8 not Java 11.

Whats up with that?

P.S.
Congrats on releasing 1.10 ;)

[1]
https://github.com/apache/flink/blob/release-1.10/flink-container/docker/Dockerfile
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink HA for Job Cluster

2020-02-10 Thread KristoffSC
Thanks you both for answers.

So I just want to have this right.

I can I achieve HA for Job Cluster Docker config having the zookeeper quorum
configured like mentioned in [1] right (with s3 and zookeeper)?

I assume to modify default Job Cluster config to match the [1] setup.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Minimal requirements

2020-02-10 Thread KristoffSC
Hi all,
well this may be a little bit strange question, but are there any minimal
machine requirements (memory size, CPU etc) and  non functional requirements
(number of nodes, network ports ports, etc) for Flink?

I know it all boils down to what my deployed Job will be, but if we just
could put this aside for a moment and focus on a bare minimum just for
Flink.

Probably we can say that Flink requires minim 2 nodes right?
What about minimal memory needed for Flink runtime. How many threads Flink's
runtime is using.

Any thought about this one?

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink HA for Job Cluster

2020-02-09 Thread KristoffSC
Thanks you both for answers.

So I just want to have this right.

I can I achieve HA for Job Cluster Docker config having the zookeeper quorum
configured like mentioned in [1] right (with s3 and zookeeper)?

I assume to modify default Job Cluster config to match the [1] setup.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink HA for Job Cluster

2020-02-07 Thread KristoffSC
Hi, 
In [1] where we can find setup for Stand Alone an YARN clusters to achieve
Job Manager's HA.

Is Standalone Cluster High Availability with a zookeeper the same approach
for Docker's Job Cluster approach with Kubernetes?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


SSL configuration - default behaviour

2020-02-07 Thread KristoffSC
Hi,
In documentation [1] we can read that

All internal connections are SSL authenticated and encrypted. The
connections use mutual authentication, meaning both server and client side
of each connection need to present the certificate to each other. The
certificate acts effectively as a shared secret.

But is this a default behavior? Are internal connections encrypted by
default?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ActiveMQ connector

2020-01-30 Thread KristoffSC
Hi Piotr,
I'm not sure about:
"Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manage’d state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call."

>From [1] we can read
"Each parallel instance of the Kafka consumer maintains a map of topic
partitions and offsets as its Operator State."

Oskar was asking about ActiveMq and not Kafka but I guess the rule applies
here also. The ActiveMq connector he is using is this one [2].

His question actually boils down to one thing, regarding this class [3].
Does having HashMap and not ConcurentHashMap in context of [3] for
unacknowledgedMessages is thread safe.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/
[3]
https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
Piotr Nowojski-3 wrote
> Hi,
> 
> Regarding your last question, sorry I don’t know about ActiveMQ
> connectors.
> 
> I’m not sure exactly how you are implementing your SourceFunction.
> Generally speaking `run()` method is executed in one thread, and other
> operations like checkpointing, timers (if any) are executed from another
> thread. In order to synchronise between those, user is expected to acquire
> checkpoint lock in the `run()` method as it’s documented [1].
> 
> Note that if you want your state (your HashMap) to be actually
> checkpointed, it must be either already defined as Flink manage’d state
> (like `ListState` in the example [1]), or you must copy content of your
> `HashMap` to Flink managed state during `snapshotState` call.
> 
> Note 2, also keep in mind we are in the process of reimplementing source
> interfaces [2] and probably Flink 1.11 will offer a new and better API for
> that (SourceReader instead of SourceFunction). 
> 
> Piotrek
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html;
>  
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface;
> 
>> On 29 Jan 2020, at 13:08, OskarM 

> pentiak@

>  wrote:
>> 
>> Hi all,
>> 
>> I am using Flink with Bahir's Apache ActiveMQ connector. However it's
>> quite
>> dated and poses many limitations, most notably the source supports only
>> ByteMessages, does not support parallelism and has a bug that is only
>> fixed
>> in a snapshot version.
>> 
>> So I started implementing my own SourceFunction (still with parallelism
>> of
>> only 1) based on AMQSource.
>> I want it to support Flink's checkpointing and make it work with ActiveMQ
>> acks.
>> AMQSource uses ordinary HashMap to store Messages to be acked in the
>> broker
>> and this is where my question arises.
>> 
>> Is the HashMap safe to use here?
>> 
>> Please correct me if I'm wrong, but my understanding is that /run/ method
>> is
>> executed in one thread and /acknowledgeIDs/ in another so there is a
>> possibility of thread race (even if we assume all the message ids are
>> unique).
>> 
>> Also, do you know of any ActiveMQ specific (or JMS in general), more
>> up-to-date connectors I could use which do not have the issues mentioned
>> above?
>> 
>> Thanks,
>> Oskar
>> 
>> 
>> 
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-30 Thread KristoffSC
Hi,
thank you for the answer. 

I think I understand. 

In my uses case I have to keep the order of events for each key, but I dont
have to process keys in the same order that I received them. On one point of
my pipeline I'm also using a SessionWindow.

My Flink environment has operator chaining enabled. I woudl say, that some
of my operators can be chained. 

My pipeline is (each point is an operator after Flink's operator chainign
mechanism)
1. ActiveMQ connector + mapper, all with parallelism 1 (btw I'm using a
org.apache.bahir connector for Active MQ which does not support parallelism
bigger than 1)
2. Enrichment, where Im using AsyncDataStream.unorderedWait with parallelism
5.
3. Event split based on some criteria (not key by) that dispatches my stream
into two "sub streams"
4. Both substreams are keyed
4a. SubStream "A" has a session window applied - parallelism 6.
4b. Substream "B" has no windowing, no aggregation, but has a business logic
for witch order of events matters. - parallelism 6
5. Sink for both streams.


If I understand you and documentation correctly, Redistributing will forward
messages keeping the order for a key, but events between keys can be
delivered in a different order. 
"So in this example, the ordering within each key is preserved, but the
parallelism does introduce non-determinism regarding the order in which the
aggregated results for different keys arrive at the sink."

Then I could use a keyBy at the pipeline beginning, just after point 1.
But to use Window in point 4a and my process function in 4b I need to have a
keyedStream. I'm using a KeyedProcessFunction there. What my options with
this?


P.S.
Regarding the operator chaining, I'm aware that there is an API that allows
me to model which operators should be chained theatergoer and which not even
if they have the same parallelism level.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reinterpreting a pre-partitioned data stream as keyed stream

2020-01-28 Thread KristoffSC
Hi all,
we have a use case where order of received events matters and it should be
kept across pipeline.

Our pipeline would be paralleled. We can key the stream just after Source
operator, but in order to keep the ordering among next operators we would
have to still keep the stream keyed. 

Obviously we could key again and again but this would cause some performance
penalty.
We were thinking about using DataStreamUtils.reinterpretAsKeyedStream
instead.

Since this is an experimental functionality I would like to ask if there is
someone among the community that is using this feature? Do we know about any
open issues regarding this feature?

Thanks,
Krzysztof








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Running Flink on java 11

2020-01-10 Thread KristoffSC
Hi, 
Yangze Guo, Chesnay Schepler thank you very much for your answers.

I have actually a funny setup.
So I have a Flink Job module, generated from Flink's maven archetype.
This module has all operators and Flink environment config and execution.
This module is compiled by maven with "maven.compiler.target" set to 1.8

However I'm using a 3rd party library that was compiled with java 11.
In order to build my main Job module I have to use JDK 11, however I still
have "maven.compiler.target" set to 1.8 there.

As a result, I have a Flink job jar, that has classes from Java 8 and 11.
Running javap -verbose proves it. All classes from Flink Job module are in
Java 8.

I can build Flink Job cluster image that is based on [1]. However i had to
change base image from openjdk:8-jre-alpine to
adoptopenjdk/openjdk11:jre-11.0.5_10-alpine plus remove installing
libc6-compat.

After rebuilding the docker image, Job cluster started and process messges.

On original openjdk:8-jre-alpine it was unable to start due issues with
loading classes from my 3rd party library (Unsupported major.minor version
exception)
So this seems to work.


However if I would change "maven.compiler.target" to Java 11 in my Flink Job
module, then Flink is unable to run the Job giving me this exception

job-cluster_1  | Caused by: java.lang.UnsupportedOperationException
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental(ClassVisitor.java:158)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:541)
job-cluster_1  |at
org.apache.flink.shaded.asm6.org.objectweb.asm.ClassReader.accept(ClassReader.java:391)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:187)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
job-cluster_1  |at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
job-cluster_1  |at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:668)
job-cluster_1  |at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:645)
job-cluster_1  |at
com.epam.monity.job.StreamJob.doTheJob(StreamJob.java:140)
job-cluster_1  |at
com.epam.monity.job.StreamJob.main(StreamJob.java:46)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
job-cluster_1  |at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
job-cluster_1  |at java.base/java.lang.reflect.Method.invoke(Unknown
Source)
job-cluster_1  |at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
job-cluster_1  |... 13 more



Long story short, seems that for now, Job module has to be compiled to 1.8
with JDK 11 if Java 11 libraries are used.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Job claster scalability

2020-01-10 Thread KristoffSC
Hi Zhu Zhu,
well In my last test I did not change the job config, so I did not change
the parallelism level of any operator and I did not change policy regarding
slot sharing (it stays as default one). Operator Chaining is set to true
without any extra actions like "start new chain, disable chain etc"

What I assume however is that Flink will try find most efficient way to use
available resources during job submission. 

In the first case, where I had only 6 task managers (which matches max
parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task
slots did was not effective because reason described by David. This is
understandable.

However, I was assuming that if I submit my job on a cluster that have more
task managers than 6, Flink will not share task slots by default. That did
not happen. Flink deployed the job in the same way regardless of extra
resources.


So the conclusion is that simple job resubmitting will not work in this case
and actually I cant have any certainty that it will. Since in my case Flink
still reuses slot task.

If this would be the production case, I would have to do a test job
submission on testing env and potentially change the job. Not the config,
but adding  slot sharing groups etc. 
So if this would be the production case I will not be able to react fast, I
would have to deploy new version of my app/job which could be problematic. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Running Flink on java 11

2020-01-09 Thread KristoffSC
Hi guys,
well We have requirement in our project to use Java 11, although we would
really like to use Flink because it seems to match our needs perfectly. 

We were testing it on java 1.8 and all looks fine. 
We tried to run it on Java 11 and also looks fine, at least for now.

We were also running this as a Job Cluster, and since those images [1] are
based on openjdk:8-jre-alpine we switch to java 13-jdk-alpine. Cluster
started and submitted the job. All seemed fine.

The Job and 3rd party library that this job is using were compiled with Java
11.
I was looking for any posts related to java 11 issues and I've found this
[2] one. 
We are also aware of ongoing FLINK-10725 [3] but this is assigned to 1.10
FLink version

Having all of this, I would like to ask few questions

1. Is there any release date planed for 1.10?
2. Are you aware of any issues regarding running Flink on Java 11?
3. If my Job code would not use any code features from java 11, would flink
handle it when running on java 11? Or they are some internal functionalities
that would not be working on Java 11 (things that are using unsafe or
reflections?)

Thanks,
Krzysztof


[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/UnsupportedOperationException-from-org-apache-flink-shaded-asm6-org-objectweb-asm-ClassVisitor-visit1-td28571.html
[3] https://issues.apache.org/jira/browse/FLINK-10725



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: UnsupportedOperationException from org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor.visitNestHostExperimental using Java 11

2020-01-09 Thread KristoffSC
Hi,
are there any plans to support Java 11?

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Job Cluster vs Session Cluster deploying and configuration

2020-01-09 Thread KristoffSC
Hi all,
I'm researching docker/k8s deployment possibilities for Flink 1.9.1.

I'm after reading/watching [1][2][3][4].

Currently we do think that we will try go with Job Cluster approach although
we would like to know what is the community trend with this? We would rather
not deploy more than one job per Flink cluster.

Anyways, I was wondering about few things:

1. How can I change the number of task slots per task manager for Job and
Session Cluster? In my case I'm running docker on VirtualBox where I have 4
CPUs assigned to this machine. However each task manager is spawned with
only one task slot for Job Cluster. With Session Cluster however, on the
same machine, each task manager is spawned with 4 task slots.

In both cases Flink's UI shows that each Task manager has 4 CPUs.


2. How can I resubmit job if I'm using a Job Cluster. I'm referring this use
case [5]. You may say that I have to start the job again but with different
arguments. What is the procedure for this? I'm using checkpoints btw.

Should I kill all task manager containers and rerun them with different
parameters?

3. How I can resubmit job using Session Cluster?

4. How I can provide log config for Job/Session cluster?
I have a case, where I changed log level and log format in log4j.properties
and this is working fine on local (IDE) environment. However when I build
the fat jar, and ran a Job Cluster based on this jar it seams that my log4j
properties are not passed to the cluster. I see the original format and
original (INFO) level.

Thanks,


[1] https://youtu.be/w721NI-mtAA
[2] https://youtu.be/WeHuTRwicSw
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html
[4]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
[5]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Job-claster-scalability-td32027.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Job claster scalability

2020-01-09 Thread KristoffSC
Thank you David and Zhu Zhu,
this helps a lot.

I have follow up questions though.

Having this
/"Instead the Job must be stopped via a savepoint and restarted with a new
parallelism"/

and slot sharing [1] feature, I got the impression that if I would start my
cluster with more than 6 task slots, Flink will try deploy tasks across all
resources, trying to use all available resources during job submission

I did a two tests having my original task.
1. I started a Job Cluster with 7 task slots (7 task manager since in this
case 1 task manager has one task slot).
2. I started a Session cluster with 28 task slots in total. In this case I
had 7 task managers, 4 task slot each. 

For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
submitted my job from UI after Flink started to be operative. 


For both cases it used only 6 task slots, so it was still reusing task
slots. I got the impression that it will try to use as much available
resources as it can.

What do you think about this?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with dynamic gap

2020-01-08 Thread KristoffSC
Hi Aljoscha,
Thanks for the response.

This sounds ok for me. It's as if the message carries additional information
that can "tell" operators how to handle this message. Maybe we could use
this approach also for different use cases.

I will try this approach, thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Job claster scalability

2020-01-08 Thread KristoffSC
Hi all,
I must say I'm very impressed by Flink and what it can do.

I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject. 

My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
following [1]
3. env setup:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.setMaxParallelism(128);
env.enableCheckpointing(10 * 60 * 1000);

Please mind that I am using operator chaining here. 

My pipeline setup:

 


As you can see I have 7 operators (few of them were actually chained and
this is ok), with different parallelism level. This all gives me 23 tasks
total. 


I've noticed that with "one task manager = one task slot" approach I have to
have 6 task slots/task managers to be able to start this pipeline.


 

If number of task slots is lower than 6, job is scheduled but not started. 

With 6 task slots everything is working fine and I've must say that I'm very
impressed with a way that Flinks balanced data between task slots. Data was
distributed very evenly between operator instances/tasks. 

In this setup (7 operators, 23 tasks and 6 task slots), some task slots have
to be reused by more than one operator. While inspecting UI I've found
examples such operators. This is what I was expecting though.

However I was surprised a little bit after I added one additional task
manager (hence one new task slot)


 

After adding new resources, Flink did not re balanced/redistributed the
graph. So this host was sitting there and doing nothing. Even after putting
some load on the cluster, still this node was not used.

 
*After doing this exercise I have few questions:*

1. It seems that number of task slots must be equal or greater than max
number of parallelism used in the pipeline. In my case it was 6. When I
changed parallelism for one of the operator to 7, I had to have 7 task slots
(task managers in my setup) to be able to even start the job. 
Is this the case?

2. What I can do to use the extra node that was spanned while job was
running?
In other words, If I would see that one of my nodes has to much load what I
can do? Please mind that I'm using keyBy/hashing function in my pipeline and
in my tests I had around 5000 unique keys.

I've try to use REST API to call "rescale" but I got this response:
/302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/

Thanks.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Late outputs for Session Window

2020-01-08 Thread KristoffSC
Hi, 
thank you for your SO comment [1]. You are right. Sorry, I miss understand
the "late message" concepts. 
In fact I was never sending "late events" that should match just ended
window.

Thank you for your comments and clarification. 


[1]
https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Late outputs for Session Window

2020-01-03 Thread KristoffSC
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this. 


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);


DataStream rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false),
properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks() {

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}

@Override
public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");

messageStream
 .keyBy(tradeKeySelector)
 .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
 .sideOutputLateData(lateTradeMessages)
 .process(new CumulativeTransactionOperator())
 .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Ok, 
I did some more tests and yep, it seems that there is no way to use Flink's
State in class that will implement SessionWindowTimeGapExtractor. 

Even if I will implement this interface on a class that is an operator,
whenever extract method is called it does not have any access to Flink's
state. Even calling getRuntimeContext() from it throws an exception.

Are there any plans to add support of Flink State into
SessionWindowTimeGapExtractor?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Late outputs for Session Window

2020-01-02 Thread KristoffSC
Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink
1.9.1)

What I have is:


messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");

lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5
secodns.

Further I have:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction() {
 @Override
 public void processElement(EnrichedMessage value, Context ctx,
Collector out) throws Exception {
   System.out.println("Process Late messages For
Aggregation");
   out.collect(new Transaction());
}
   })
   .name("Process Late messages For Aggregation");


The problem is that I never see "Process Late messages For Aggregation" when
Im sending Messages with same key. 

When Session Window passes and I "immediately" sent a new message for the
same Key it triggerts new Session Window, without going into Late Side
Output.

Not sure What I'm doing wrong here.

What I would like to achieve heve is to catch "late events" and try to
reprocess them againts state that was builder for "on time" events for this
Window or if its is impossible, report late events into special Sink.

I will appreciate any help.
However it seems I do not see have any late Events.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
So I was trying to have something like this:

PipelineConfigOperator pipelineConfigOperator = new
PipelineConfigOperator();

messageStream
.connect(pipelineConfigStream)
.process(*pipelineConfigOperator*)
.keyBy(tradeKeySelector)
   
.window(ProcessingTimeSessionWindows.withDynamicGap(*pipelineConfigOperator*))
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");

where:

PipelineConfigOperator extends BroadcastProcessFunction implements
SessionWindowTimeGapExtractor


BroadcastStream pipelineConfigStream = configRulesStream
.broadcast(pipelineConfigStateDescriptor);

MapStateDescriptor pipelineConfigStateDescriptor = new
MapStateDescriptor<>(
"PipelineConfigBroadcastState",
Types.STRING,
TypeInformation.of(new TypeHint() {
}));

SingleOutputStreamOperator configRulesStream = env.addSource(new
FlinkKafkaConsumer<>("pipeline-config",
new SimpleStringSchema(), properties))
.name("Pipeline config stream");


PipelineConfigOperator keeps config in Broadcast state and its copy in
local, transient HashMap.
Whenever processBroadcastElement is called, Broadcast state and HashMap are
updated.

The problem is that when "extract" method is called the hashMap is null even
thou it was initialized in open method. 

I was implementing Broadcastr state Pattern to standard operators like it is
presented in documentation so Im familair with this concept. I assumed I can
reuse it here. The bottom line is that I will not want to fetch state every
time, only after config update




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Thank you for the answer,

the thing is that I would not like to call external system for each Window,
rather I woudl like to keep the gap size in Flink's state which I will be
able to change from external system, for example handle configUpdate message
from Kafka. 

So if SessionWindowTimeGapExtractor should not be implemented on operators,
then I understand that 
SessionWindowTimeGapExtractor implementation will fetch config value from
Flink's state (value/map) according to the state descriptor.

However I would not want to fetch data from the state for each call, because
it will be changed not that often. It is similar case for Broadcast state
pattern but for SessionWindowTimeGapExtractor case.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Session Window with dynamic gap

2020-01-02 Thread KristoffSC
Hi all,
I'm exploring Flink for our new project.

Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update. 

So I'm having this code:


messageStream
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new 
  SessionWindowTimeGapExtractor() {
@Override
public long extract(EnrichedMessage element) {
   * // Try to dynamically change the gap here
// milliseconds.
return 5000;*
}
}))
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");

I would assume something like "broadcast pattern" here, although this is
related to operators and we are interested with
SessionWindowTimeGapExtractor here.

Probably we will keep the gap size in a Flink State, not sure if it has to
be keyed state or "operator state". Updates will come from external system. 

So I guess, what i need here is actually an operator that will implements
SessionWindowTimeGapExtractor interface. Instance of this operator will
keep/update the state based on Config updates and returns the gap size like
SessionWindowTimeGapExtractor. 

Would it be a valid approach for this use case? Is it any other way to have
such a config in Flink state?







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deprecated SplitStream class - what should be use instead.

2019-12-20 Thread KristoffSC
Hi Kostas,
Thank you for the answer and clarification. 

If Side-outputs are treated in the same way and there is no significant
performance penalty then it seems that they are ok for my use case.

I can accept the name mismatch ;)

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread KristoffSC
Kostas, thank you for your response,

Well although the Side Outputs would do the job, I was just surprised that
those are the replacements for stream splitting.

The thing is, and this is might be only a subjective opinion, it that I
would assume that Side Outputs should be used only to produce something
aside of the main processing function like control messages or some
leftovers.

In my case, I wanted to simply split the stream into two new streams based
on some condition.
With side outputs I will have to "treat" the second stream as a something
additional to the main processing result.

Like it is written in the docs: 
"*In addition* to the main stream that results from DataStream
operations(...)"

or
"The type of data in the result streams does not have to match the type of
data in the *main *stream and the types of the different side outputs can
also differ. "


I'm my case I don't have any "addition" to my main stream and actually both
spitted streams are equally important :)

So by writing that side outputs are not good for my use case I meant that
they are not fitting conceptually, at least in my opinion.

Regards,
Krzysztof




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Deprecated SplitStream class - what should be use instead.

2019-12-19 Thread KristoffSC
Hi,
I've noticed that SplitStream class is marked as deprecated, although split
method of DataStream is not.
Also there is no alternative proposed in SplitStream doc for it.

In my use case I will have a stream of events that I have to split into two
separate streams based on some function. Events with field that meets some
condition should go to the first stream, where all other should go to the
different stream.

Later both streams should be processed in a different manner. 

I was planing to use approach presented here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

SplitStream split = someDataStream.split(new
OutputSelector() {
@Override
public Iterable select(Integer value) {
List output = new ArrayList();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});

But it turns out that SplitStream is deprecated. 
Also I've found similar question on SO
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
  

I don't fink filter and SideOutputs are good choice here.

I will be thankful for an any suggestion.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-19 Thread KristoffSC
Hi :)
Any thoughts about this? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi,
thanks for the replay.

Just to clarify, I will have to have *a new Flink Cluster* (Job Manager and
Task Manager) that will run in the secure zone which will ran the
AsyncEnrich Job right?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Deploying Operator on concrete Task Manager

2019-12-18 Thread KristoffSC
Hi,
I have a question regarding job/operator deployment on Task Managers.

If I understand correctly, my job will be spitted into individual tasks,
which will be "deployed and executed" on particular task slot/s of Task
Manager (depending on parallelism level of course).

Lets imagine I have a Job that has:

1. Kafka Source with map to RawEvent
2. Enrichment that has AsyncCall + map to EnrichedEvent
3. Key stream with another Map/Process functions maybe with some Windowing.
4. Sink

I have a grid that has 1 Job Manager and 3 Task Managers nodes. 
For a security reason one Task manager should be in a special network zone
(red zone).

The point 2 of my Job (Enrichment that has AsyncCall + map to EnrichedEvent)
should be executed on that particular node that is located in a Secured
Zone.

All other Operations should not be putted on this node.
Is there a way to configure this?


As an alternative I can see that we could have a separate job that will just
have a source, enrich, sink and this job will be "deployed" on this Secured
Task Manager. Or maybe we should have separate Flink cluster for this?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-17 Thread KristoffSC
Thank you for your reply Timo.

Regarding point 2. I'm sorry for the delay. I rerun my test and everything
seems to be in order. Open method was called as first. I guess it was a
false alarm. Sorry for that.

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Keyed stream, parallelism, load balancing and ensuring that the same key go to the same Task Manager and task slot

2019-12-17 Thread KristoffSC
Hi community, 
I'm trying to build a PoC pipeline for my project and I have few questions
regarding load balancing between task managers and ensuring that keyed
stream events for the same key will go to the same Task Manager (hence the
same task slot).

Lets assume that we have 3 task managers, 3 task slot each. So it gives us 9
task slots in total.
The source is a Kafka topic with N partitions. Events are "linked" with each
other by transactionId (long) field. So they can be keyed by this field.
Events for particular transactionId can be spanned across many partitions
(we don't have control over this).

The pipeline is:
1. Kafka Source -> produces RawEvents (map operator).
2. Enrichment with AsuncFuntion(simple DB/cache call) produces
EnrichedEvents with map operator.
3. Key EnrichedEvents by tradeId, buffer events for some time, sort them by
sequenceNumber (Window aggregation) and emit a new event based on those. 
N sorted EnrichedEvents produces one TransactionEvent for this
transactionId.
4. Sink TransactionEvents

Requirements:
1. Have high task slot utilization (Low number of idle/un-addressed task
slots).
2. EnrichedEvents for the same transactionId should go to the same TaskSlot
(hence the same TaskManager).

Question:
How this can be achieved?
How parallelism value for each operator should be set?

Note:
Probably I can already key the original RawEvents on transactionId.

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-11 Thread KristoffSC
Hi Vino,
Thank you for your response and provided links.

So just to clarify and small follow up.

1. Methods will be called only by one thread right?

2. The links you provided are tackling a case when we got a "fast stream"
element before we received broadcast stream element. In my case we had
Broadcast element first, before we got any "fast stream" element. Because
open method was not called (I've observed it will be called only before
first processElement method call, so before processing the first "fast
stream" element) we don't have the state descriptor which would be
initialized in open method. So we actually cannot "store/process" this
broadcast element in our broadcast state.


 @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processingRulesDesc = new MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));


}

In this case, bcState  will be null since open method was not yet called.
 public void processBroadcastElement(ProcessingRule rule, Context ctx,
Collector out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState bcState =
ctx.getBroadcastState(processingRulesDesc);
bcState.put(null, rule);
}






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Processing Events by custom rules kept in Broadcast State

2019-12-10 Thread KristoffSC
Hi,
I think this would be the very basic use case for Broadcast State Pattern
but I would like to know what are the best approaches to solve this problem.

I have an operator that extends BroadcastProcessFunction. The
brodcastElement is an element sent as Json format message by Kafka. It
describes a processing rules like key/value mapping, like so: ruleName -
ruleValue (both strings).

In processElement method I'm delegating to my custom RuleEngineService. It
is a class that has the "rule engine" logic and accepts received event and
"set of processing rules" in some form.

What would be the best approaches:
1. Keep original Json String in broadcast state. Whenever there is a new set
of rules streamed by Kafka, then in processBroadcastElement method parse
this Json, map to some RuleParams abstraction and keep it as transient field
in my BroadcastProcessFunction operator. Save Json in broadcast state. Pass
RuleParams to rule engine service.

2. Same as 1 but instead keeping Raw Json String in broadcast state, keep
already parsed JsonObject, somethign like ObjectNode from KafkaConnector
lib.

3. Keep each pair of ruleName - ruleValue (both strings) separate in
broadcast state. In processBrodcastElement method parse the received Json
and update the state. In processElement method take all rules, build
RulePArams object (basically a map) and pass them to rule engine

4. Parse Json in processBroadcastElement method, map it to RuleParams
abstraction method, keeping rules in a hashMap and keep this RulePrams in
broadcast state

5. any other...





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-10 Thread KristoffSC
Hi,
I was playing around with BroadcastProcessFunction and I've observe a
specific behavior. 

My setup:

MapStateDescriptor ruleStateDescriptor = new
MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));

BroadcastStream processingRulesBroadcastStream =
processingRulesStream
   .broadcast(ruleStateDescriptor);


SingleOutputStreamOperator evaluatedTrades =
enrichedTransactionStream
.connect(processingRulesBroadcastStream)
.process(new DriveEngineRuleOperator())
.name("Drive Rule Evaluation");

Where DriveEngineRuleOperator extends BroadcastProcessFunction and
implements open, processElement and processBroadcastElement methods.

I was following Flink's tutorials about broadcast state pattern and my
"open" method looks like this:

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processingRulesDesc = new MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));

  
}


I've noticed that all methods are called by the same thread. Would it be
always the case, or could those methods be called by different threads?

The second thing I've noticed is that "open" method was executed only before
the first "fast stream" element was received (before execution of
processElement method). That means that if I received the control stream
element first (the broadcast stream element) then method open would not be
called and I will not initialize my processing rule descriptor - I will
loose the event.

What are the good practices in this case?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Order events by filed that does not represent time

2019-12-10 Thread KristoffSC
Hi,
Is it possible to use an field that does not represent timestamp to order
events in Flink's pipeline?

In other words, I will receive a stream of events that will ha a sequence
number (gaps are possible).
Can I maintain the order of those events based on this field same as I would
do for time representing field?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Basic question about flink programms

2019-12-10 Thread KristoffSC
Hi Arvid Heise-3,
Thanks for your answer. I took this approach. 


I did not want to start a new thread since I wanted to avoid "subject
duplication" :)

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Basic question about flink programms

2019-12-06 Thread KristoffSC
Hi,
Im having the same problem now.  What is your approach now after gaining
some experience? 

Also do you use Spring DI to setup/initialize your jobs/process functions?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/