Hi, all.
The invoke method of sink seems no way to make async io? e.g. returns Future?
For example, the redis connector uses jedis lib to execute redis
command synchronously:
Running with these settings:
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: false
taskmanager.memory.preallocate: true
env.getConfig().setExecutionMode(ExecutionMode.BATCH)
Seems like 3 possibilities:
1. Change the user flink runs as to the user with hdfs rights
2. hdfs chown the directory you're writing to (or hdfs chmod to open up
access)
3. I've seen where org.apache.hadoop.security.UserGroupInformation can be
used to do something like this:
Hi Nishu,
the data loss might be caused by the fact that processing time triggers do
not fire when the program terminates.
So, if your program has records stored in a window and program terminates
because the input was fully consumed, the window operator won't process the
remaining windows but
Hi Fabian,
Program is running until I manually stop it. Trigger is also firing as
expected because I read the entire data after the trigger firing to see
what data is captured. And pass that data over to GroupByKey as Input.
Its using Global window so I accumulate entire data each time the
Hi,
Thanks for your inputs.
I am reading Kafka topics in Global windows and have defined some
ProcessingTime triggers. Hence there is no late records.
Program is performing join between multiple kafka topics. It consists
following types of Transformation sequence is something like :
1. Read
Hi Garrett,
In the Web UI, when viewing a job under overview / subtasks, selecting the
checkbox "Aggregate task statistics by TaskManager” will reduce the number of
displayed rows (though in your case only by half).
The following documents profiling a Flink job with Java Flight Recorder:
I have the same question.
I am setting fs.hdfs.hadoopconf to the location of a Hadoop config. However,
when I start a job, I get an error message that it's trying to connect to
the HDFS directory as user "flink":
Caused by:
Nishu
You might consider sideouput with metrics at least after window. I would
suggest having that to catch data screw or partition screw in all flink
jobs and amend if needed.
Chen
On Thu, Dec 7, 2017 at 9:48 AM Fabian Hueske wrote:
> Is it possible that the data is
Ah, no direct memory buffer...
Can you try to disable off-heap memory?
2017-12-07 18:35 GMT+01:00 Garrett Barton :
> Stacktrace generates every time with the following settings (tried
> different memory fractions):
> yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
>
Is it possible that the data is dropped due to being late, i.e., records
with timestamps behind the current watemark?
What kind of operations does your program consist of?
Best, Fabian
2017-12-07 10:20 GMT+01:00 Sendoh :
> I would recommend to also print the count of
Hi Hayden,
It would be nice if you could share a bit more details about your use case and
the load that you expect to have,
as this could allow us to have a better view of your needs.
As a general set of rules:
1) I would say that the bigger your cluster (in terms of resources, not
necessarily
Stacktrace generates every time with the following settings (tried
different memory fractions):
yarn-session.sh -n 400 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7/0.3/0.1
taskmanager.memory.off-heap: true
Hi,
A ClassNotFoundException should not be expected behavior.
Can you post the stacktrace of the exception?
We had a few issues in the past where Flink didn't use the correct
classloader.
So this would not be an unusual bug.
Thanks,
Fabian
2017-12-07 10:44 GMT+01:00 Tugdual Grall
We are currently voting on the third release candidate for 1.4.0.
Feel free to propose this feature on the dev mailing list [1], but I don't
think this will result in cancelling the vote.
If we identify a blocking issue for 1.4.0, it could be included as well.
But we're already a few weeks behind
Hi Miguel,
I just had another idea that you could try.
The problem seems to be that the plan space that the optimizer enumerates
exponentially grows as you add more iterations.
This happens when there are multiple valid execution strategies for a given
operator (mostly applied to Joins).
You
Let me check details, on top of my mind I remember the job id changes, I
might be wrong.
On Thu, Dec 7, 2017, 08:48 Fabian Hueske wrote:
> AFAIK, a job keeps its ID in case of a recovery.
> Did you observe something else?
>
> 2017-12-07 17:32 GMT+01:00 Hao Sun
Hi Tovi,
What you need is the TwoInputStreamOperatorTestHarness. This will allow you to
do something like:
TwoInputStreamOperatorTestHarness testHarness =
new TwoInputStreamOperatorTestHarness<>(myoperator);
testHarness.setup();
testHarness.open();
Could that be put into 1.4 release ? This significantly eases tooling
around savepoints and though a minor change it is not intrusive or should
cause regression.. thank you though...
On Dec 7, 2017 3:51 AM, "Chesnay Schepler" wrote:
> I've finished the implementation. I
Hmm, the OOM sounds like a bug to me. Can you provide the stacktrace?
The managed memory should be divided among all possible consumers. In case
of your simple job, this should just be Sorter.
In fact, I'd try to reduce the fraction to give more memory to the JVM heap
(OOM means there was not
Hi group,
What is the best practice for testing CoFlatMap operator correctness?
We have two source functions, each emits data to stream, and a connect between
them, and I want to make sure that when streamA element arrive before stream
element, a certain behavior happens.
How can I test this
AFAIK, a job keeps its ID in case of a recovery.
Did you observe something else?
2017-12-07 17:32 GMT+01:00 Hao Sun :
> I mean restarted during failure recovery
>
> On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske wrote:
>
>> What do you mean by rescheduled?
>>
I mean restarted during failure recovery
On Thu, Dec 7, 2017 at 8:29 AM Fabian Hueske wrote:
> What do you mean by rescheduled?
> Started from a savepoint or restarted during failure recovery?
>
>
> 2017-12-07 16:59 GMT+01:00 Hao Sun :
>
>> Anything I can
What do you mean by rescheduled?
Started from a savepoint or restarted during failure recovery?
2017-12-07 16:59 GMT+01:00 Hao Sun :
> Anything I can do for the job reschedule case? Thanks.
> Or is there a way to add job lifecycle hooks to trace it?
>
> On Mon, Dec 4, 2017 at
Hi Jörn,
thanks for the little example. Maybe Avro changed the behavior about
maps from the old version we used in 1.3 to the newest version in Flink
1.4.0.
I will investigate this and might open an issue for it.
Regards,
Timo
Am 12/7/17 um 5:07 PM schrieb Joern Kottmann:
Hello Timo,
Hello Timo,
thanks for your quick response. I can't share the code of that pipeline here.
The Flink version I am using is this one:
http://people.apache.org/~aljoscha/flink-1.4.0-rc3/flink-1.4.0-src.tgz
That was compiled by me with mvn install -DskipTests (for some reason
the tests failed, can
Anything I can do for the job reschedule case? Thanks.
Or is there a way to add job lifecycle hooks to trace it?
On Mon, Dec 4, 2017 at 12:01 PM Hao Sun wrote:
> Thanks Fabian, there is one case can not be covered by the REST API. When
> a job rescheduled to run, but jobid
Thanks for the reply again,
I'm currently doing runs with:
yarn-session.sh -n 700 -s 2 -tm 9200 -jm 5120
akka.ask.timeout: 60s
containerized.heap-cutoff-ratio: 0.15
taskmanager.memory.fraction: 0.7
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true
When I change the config
Can you also check the type of the keys in your map. Avro distinguished
between String and Utf8 class. Maybe this is why your key cannot be found.
Regards,
Timo
Am 12/7/17 um 3:54 PM schrieb Timo Walther:
Hi Jörn,
could you tell us a bit more about your job? Did you import the
flink-avro
Hi Jörn,
could you tell us a bit more about your job? Did you import the
flink-avro module? How does the Flink TypeInformation for your Avro type
look like using println(ds.getType)? It sounds like a very weird error
if the toString() method shows the key. Can you reproduce the error in
your
Hello Fabian,
It really looks like an issue requiring attention.
Since I am launching the client application via Maven, I opted to change
the maximum memory setting with
export MAVEN_OPTS="-Xms256m -Xmx4m".
To give an example, for three (3) iterations it worked fine with around 4
GB of
Hello,
after having a version mismatch between Avro in Flink 1.3.2 I decided
to see how things work with Flink 1.4.0.
The pipeline I am building runs now, deployed as standalone on YARN
with Flink 1.3.2 and putting it "FIRST" on the classpath (to use Avro
1.8.2 instead of an 1.7.x version).
The
Hi,
I'm looking for guidelines for Reference architecture for Hardware for a
small/medium Flink cluster - we'll be installing on in-house bare-metal
servers. I'm looking for guidance for:
1. Number and spec of CPUs
2. RAM
3. Disks
4. Network
5. Proximity of servers to each other
(Most
I hope it can be put in 1.4.1.
I have one concern about the rest api. We running 1.3.1 on dc/os and if we
apply parameters as arguments and in our code validate these parameters and
throw an exception during startup if something is wrong we see that all
uploaded jar disappear all running jobs
Hello,
Thank you for much advice. Sorry for my late response.
First, I made a little mistake.
I set `env.java.opts.taskmanager` to enable GC log, and it cancelled
to automatically set `UseG1GC` feature by accident.
This means I watched log of Parallel GC.
When I enabled both GC log and `UseG1GC`
In retrospect I'm quite frustrated we didn't get around to implementing
this for 1.4.
The least-effort implementation would have required copying one class,
and modifying ~10 lines.
Doesn't get any more low-hanging than that.
On 07.12.2017 12:51, Chesnay Schepler wrote:
I've finished the
I've finished the implementation. I created 2 new branches in my
repository, for 1.3.2 and 1.4.
1.3.2: https://github.com/zentol/flink/tree/release-1.3.2-custom
1.4: https://github.com/zentol/flink/tree/release-1.4-custom
Triggering savepoints works exactly like cancel-with-savepoint, just
Thank you Gary.
I know that theoretically there shouldn't be any performance issue.
I was curious to know if any other users have tried out docker-flink and
whether they have faced/reported any performance hit. I would want real
time processing for some of the events, and was looking existing
Hi Jayant,
Running Flink in a Docker container should not have an impact on the
performance
in itself. Docker does not employ virtualization. To put it simply, Docker
containers are processes on the host operating system that are isolated
against
each other using kernel features. See [1] for a
Hi Max,
You are right that Queryable State is not designed to be used as a means for a
job to query its own state.
In fact, given that you do not know the jobId of your job from within the job
itself, I do not think you can use
queryable state in your scenario.
What you can do is to have a
That doesn't look like a bad configuration.
I have to correct myself regarding the size of the managed memory. The
fraction (70%) is applied on the free memory after the TM initialization.
This means that memory for network buffers (and other data structures) are
subtracted before the managed
ok
On Thu, Dec 7, 2017 at 10:35 AM, romain.jln wrote:
> Hi all,
>
> I am experiencing some problems with a custom source that I have
> implemented. I am getting some ClassNotFoundException randomly during the
> execution of the job meanwhile the fat jar submitted to Flink
Hi all,
I am experiencing some problems with a custom source that I have
implemented. I am getting some ClassNotFoundException randomly during the
execution of the job meanwhile the fat jar submitted to Flink contains the
given classes.
After several hours of investigation, I think I have been
I would recommend to also print the count of input and output of each
operator by using Accumulator.
Cheers,
Sendoh
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
exactly! I initially thought this class is in table API. I was building a
custom table source and found I have to add Kafka connector dependency for
reading JSON encoded data, although my table source doesn't need it.
Cheers,
Hung
--
Sent from:
Hi,
I am running a Streaming pipeline(written in Beam Framework) with Flink.
*Operator sequence* is -> Reading the JSON data, Parse JSON String to the
Object and Group the object based on common key. I noticed that
GroupByKey operator throws away some data in between and hence I don't get
all
46 matches
Mail list logo