Re: Split one dataset into multiple

2018-11-05 Thread vino yang
Hi madan,

I think you need to hash partition your records.
Flink supports hash partitioning of data.
The operator is keyBy.
If the value of your tag field is enumerable, you can also use split/select
to achieve your purpose.

Thanks, vino.

madan  于2018年11月5日周一 下午6:37写道:

> Hi,
>
> I have a custom iterator which gives data of multitple entities. For
> example iterator gives data of Department, Employee and Address. Record's
> entity type is identified by a field value. And I need to apply different
> set of operations on each dataset. Ex., Department data may have
> aggregations, Employee and Address data are simply joined together after
> some filteration.
>
> If I have different datasets for each entity type the job is easy. So I am
> trying to split incoming data to different datasets. What is the best
> possible way to achieve this ?
>
> *Iterator can be read only once.
>
>
> --
> Thank you,
> Madan.
>


Never gets into ProcessWindowFunction.process()

2018-11-05 Thread Vijay Balakrishnan
Hi,
Running in IntelliJ IDE on a Mac with 4 vProcessors.
Code compiles fine. It never gets into the Window5SecProcessing's
process().I am able to get data from the Kinesis Consumer and it is
deserialized properly when I debug the code. It gets into the
Window5SecProcessing.open() method for initialization.

Not sure if I am failing with no slots available ???
In main():
 //trimmed a lot of code
*FlinkKinesisConsumer kinesisConsumer =
getMonitoringFlinkKinesisConsumer(local, kinesisTopicRead, region, ...,
...);*

*DataStream kinesisStream = env*
*.addSource(kinesisConsumer)*
*.uid(jobName + "KinesisSource");*
*KeyedStream>
enrichedComponentInstanceStream1Key = kinesisStream*
*.keyBy(new KeySelector>() {*
*public Tuple3
getKey(Monitoring mon) throws Exception {*
*return new Tuple3(mon.getComponent(), mon.getInstance(), mon.getOperation());*
*}});*

*WindowedStream,
TimeWindow> enrichedComponentInstanceStream1Win =
enrichedComponentInstanceStream1Key.timeWindow(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));*

*DataStream enrichedComponentInstanceStream1 =
enrichedComponentInstanceStream1Win*
*.process(new Window5SecProcessing(gameId, FIVE_SECONDS,
COMPONENT_INSTANCE_OPERATION))*
*.uid("Component Instance Operation Key Monitoring " +
FIVE_SECONDS);*
*enrichedComponentInstanceStream1.addSink(new
SinkFunction() {*
*@Override*
*public void invoke(MonitoringGrouping mg, Context context)
throws Exception {*
*//TODO call ES*
*logger.debug("In enrichedComponentInstanceStream1 Sink
received mg:{}", mg);*
*}*
*});*
*Window processing class*:
private static class Window5SecProcessing extends
ProcessWindowFunction, TimeWindow> {
private transient Histogram fiveSecHist;
private transient Histogram fiveMinHist;
private transient org.apache.flink.metrics.Histogram
fiveSecHistogram;
private transient org.apache.flink.metrics.Histogram
fiveMinHistogram;
private transient ValueState total5SecCountState;
private transient ValueStateDescriptor
total5SecCountValueStateDescriptor;

public Window5SecProcessing(String gameId, String interval, String
keyType) {
...
}

public void open(Configuration parameters) throws Exception {
super.open(parameters);
logger.debug("Window5SecProcessing -Entered open -
parameters:{}", parameters);//gets here
com.codahale.metrics.Histogram fiveSecHist =
new com.codahale.metrics.Histogram(new
SlidingTimeWindowReservoir(5, TimeUnit.SECONDS));
this.fiveSecHistogram = new
DropwizardHistogramWrapper(fiveSecHist);
total5SecCountValueStateDescriptor =
new ValueStateDescriptor("total5SecCount",
Long.class, 0L);
total5SecCountState =
getRuntimeContext().getState(total5SecCountValueStateDescriptor);
}
..

   * public void process(Tuple3 currentKey1,
Context ctx, Iterable input, Collector out)
throws Exception {*
*logger.debug("Window5SecProcessing - Entered process
");//never gets here*
*Tuple3 currentKey = (Tuple3) currentKey1;*
**
*}*

}
At 1 point in the logs, I seem to see that there are no slots available
? Is that the problem- how can I fix that if that is the case to test
locally on my Mac ??
*Log:*
flink-akka.actor.default-dispatcher-71 DEBUG
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Slot Pool
Status:
status: connected to
akka://flink/user/resourcemanager_466813ab-9e2c-4c88-9623-b783ebfd00cc
registered TaskManagers: [52fbcef4-6961-4b1a-96b9-bbf8dfd905ed]
*available slots: []*
allocated slots: [[AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0]]
pending requests: []
sharing groups: {
 5a0ae59368145d715b3cc0d39ba6c05a 
{
groupId=5a0ae59368145d715b3cc0d39ba6c05a
unresolved={}
resolved={52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost
(dataPort=-1)=[MultiTaskSlot{requestId=SlotRequestId{a5fd4a1b7478661f62350df3bea3695f},
allocatedRequestId=SlotRequestId{c99b7aea635f1792416d239a9b135584},
groupId=null, physicalSlot=AllocatedSlot
AllocationID{e13f284707cafef978a3c59f27e7f3f3} @
52fbcef4-6961-4b1a-96b9-bbf8dfd905ed @ localhost (dataPort=-1) - 0,
children=[SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{a3176498368d1123639f3ee94a9798b6},
group=8587a27f4c92252839400ce17054b261},
SingleTaskSlot{logicalSlot=(requestId=SlotRequestId{7b1ed3f0c53a4fe353e241216df1c0d9},
allocationId=AllocationID{e13f284707cafef978a3c59f27e7f3f3}),
request=SlotRequestId{7b1ed3f0c53a4fe35

Understanding checkpoint behavior

2018-11-05 Thread PranjalChauhan
Hi,

I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
understand how checkpoints actually work when Window operator is processing
events.

My pipeline has the following flow where each operator's parallelism is 1.
source -> flatmap -> tumbling window -> sink
In this pipeline, I had configured the window to be evaluated every 1 hour
(3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
timeout was set to 1 hour as I wanted the checkpoints to complete.

In my window function, the job makes https call to another service so window
function may take some time to evaluate/process all events.

Please refer the following image. In this case, the window was triggered at
23:00:00. Checkpoint 12 was triggered soon after that and I notice that
checkpoint 12 takes long time to complete (compared to other checkpoints
when window function is not processing events).

 

Following images shows checkpoint 12 details of window & sink operators.

 

 

I see that the time spent for checkpoint was actually just 5 ms & 8 ms
(checkpoint duration sync) for window & sink operators. However, End to End
Duration for checkpoint was 11m 12s for both window & sink operator.

Is this expected behavior? If yes, do you have any suggestion to reduce the
end to end checkpoint duration?

Please let me know if any more information is needed.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Understanding checkpoint behavior

2018-11-05 Thread PranjalChauhan
Hi,

I am new Fink user and currently, using Flink 1.2.1 version. I am trying to
understand how checkpoints actually work when Window operator is processing
events.

My pipeline has the following flow where each operator's parallelism is 1.
source -> flatmap -> tumbling window -> sink
In this pipeline, I had configured the window to be evaluated every 1 hour
(3600 seconds) and the checkpoint interval was 5 mins. The checkpoint
timeout was set to 1 hour as I wanted the checkpoints to complete.

In my window function, the job makes https call to another service so window
function may take some time to evaluate/process all events.

Please refer the following image. In this case, the window was triggered at
23:00:00. Checkpoint 12 was triggered soon after that and I notice that
checkpoint 12 takes long time to complete (compared to other checkpoints
when window function is not processing events).

 

Following images shows checkpoint 12 details of window & sink operators.

 

 

I see that the time spent for checkpoint was actually just 5 ms & 8 ms
(checkpoint duration sync) for window & sink operators. However, End to End
Duration for checkpoint was 11m 12s for both window & sink operator.

Is this expected behavior? If yes, do you have any suggestion to reduce the
end to end checkpoint duration?

Please let me know if any more information is needed.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-05 Thread Ravi Bhushan Ratnakar
Hi there,

some questions:

   1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
   flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
   hadoop-common) ? If so, could you please share your dependency versioning?

[Ravi]- I am using Aws Emr 5.18 which supports Flink 1.6.0.  On Emr, all
the Hadoop and aws related dependencies were available, so I explicitly
removed it from transitive dependencies of flink module like
flink-s3-fs-hadoop.
I was also facing issue in using flink-s3-fs-hadoop on Emr, class conflict
due to shaded aws API, so I unpack the jar and included the required class
in the project.

   1. Does this use a kafka source with high flink parallelism (~400) for
   all kafka partitions and does it run continuously for several days?

[Ravi]- I am using Kinesis source with parallelism 640. So far now, we are
not able to run more than 20 hours.


   1. Could you please share your checkpoint interval configuration, batch
   file size, batch rollover interval configuration, and sink prefix (s3:// ,
   s3a://)

[Ravi]- after doing couple of round of testing, we realized that checkpoint
will require more resources as per the outage duration and we face several
checkpoints related issue. So finally we decided not to use checkpoints as
state size for us of 5 minutes windows is around 40Gb. Checkpoint interval
- 10 sec, sink prefix s3a:/, batch file size 256mb,  rollover inactivity 60
secs

Thank you

On Mon 5 Nov, 2018, 17:24 Addison Higham  Hi there,
>
> This is going to be a bit of a long post, but I think there has been a lot
> of confusion around S3, so I am going to go over everything I know in hopes
> that helps.
>
> As mentioned by Rafi, The BucketingSink does not work for file systems
> like S3, as the bucketing sink makes some assumptions that are incorrect
> for eventually consistent file systems as well as for file systems that
> don't have certain atomic operations, which leads to inconsistency (see
> https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly
> documented in the docs, so I think a lot of people have tried to use s3
> only to face issues. There is a plan for moving forward however.
>
> However, that plan does NOT include "fixing" the BucketingSink. Instead, a
> new API - the StreamingFileSink - is the replacement for BucketingSink,
> which was first introduced in Flink 1.6 is planned to (eventually) fix the
> problem. The first release of StreamingFileSink in the 1.6 branch didn't
> support S3. This was originally seen as a bug that would be fixed in Flink
> 1.6.2, however, once all the work was done to add support for S3, it seems
> it was decided not to backport the fix (see this thread:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
> This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be
> included in 1.7, which is currently in feature freeze and will hopefully
> have an RC in the next couple of weeks.
>
> But yes, if you need S3 support ASAP, you are in a bit of a pickle. My
> team is in that situation, so this the options as we saw them:
>
> 0. Wait for flink 1.7
> 1. Run and build your own flink from master or flink-1.7 branches which
> has support for S3 and StreamingFileSink
> 2. Write our own custom sink for s3 (probably with some caveats)
> 3. Backport the changes into flink 1.6
>
> We really didn't want to wait for 1.7, as that would make our delivery
> timeline not great. We didn't love the idea of running a fun unreleased
> version of flink in production either. As we looked into writing something
> ourselves, it became clear
> pretty quick that we could fairly easily get an output sink to a file that
> would be at-least-once delivery to a file, but exactly-once delivery would
> be significantly more difficult. That is actually okay for our use case,
> but we decided we would rather not have to
> revisit this later on and change all the code and then run a one-off job
> to remove dupes. Instead, we decided to backport the changes into 1.6
> branch. Luckily, we already build our own flink, so we had that tooling
> already. The backport took a few hours (it was fairly complicated to get
> all the changes), but we seem to got everything
> working. The backport is here:
> https://github.com/instructure/flink/tree/s3_recover_backport. Our plan
> is to use that backport until 1.7 is stable, then we can upgrade without
> (hopefully) having to change any code. We still recognize there is a
> possibility for bugs in the backport, but
> for us that is mitigated by the fact that we are okay with at-least-once
> and if all else fails, we have a period of transition where we have this
> data being written in another location we can fall back to.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that
> should hopefully be fixed *soon*. If you can wait, that is the easiest, if
> you can't, building ei

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-05 Thread Addison Higham
Hi there,

This is going to be a bit of a long post, but I think there has been a lot
of confusion around S3, so I am going to go over everything I know in hopes
that helps.

As mentioned by Rafi, The BucketingSink does not work for file systems like
S3, as the bucketing sink makes some assumptions that are incorrect for
eventually consistent file systems as well as for file systems that don't
have certain atomic operations, which leads to inconsistency (see
https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly
documented in the docs, so I think a lot of people have tried to use s3
only to face issues. There is a plan for moving forward however.

However, that plan does NOT include "fixing" the BucketingSink. Instead, a
new API - the StreamingFileSink - is the replacement for BucketingSink,
which was first introduced in Flink 1.6 is planned to (eventually) fix the
problem. The first release of StreamingFileSink in the 1.6 branch didn't
support S3. This was originally seen as a bug that would be fixed in Flink
1.6.2, however, once all the work was done to add support for S3, it seems
it was decided not to backport the fix (see this thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be
included in 1.7, which is currently in feature freeze and will hopefully
have an RC in the next couple of weeks.

But yes, if you need S3 support ASAP, you are in a bit of a pickle. My team
is in that situation, so this the options as we saw them:

0. Wait for flink 1.7
1. Run and build your own flink from master or flink-1.7 branches which has
support for S3 and StreamingFileSink
2. Write our own custom sink for s3 (probably with some caveats)
3. Backport the changes into flink 1.6

We really didn't want to wait for 1.7, as that would make our delivery
timeline not great. We didn't love the idea of running a fun unreleased
version of flink in production either. As we looked into writing something
ourselves, it became clear
pretty quick that we could fairly easily get an output sink to a file that
would be at-least-once delivery to a file, but exactly-once delivery would
be significantly more difficult. That is actually okay for our use case,
but we decided we would rather not have to
revisit this later on and change all the code and then run a one-off job to
remove dupes. Instead, we decided to backport the changes into 1.6 branch.
Luckily, we already build our own flink, so we had that tooling already.
The backport took a few hours (it was fairly complicated to get all the
changes), but we seem to got everything
working. The backport is here:
https://github.com/instructure/flink/tree/s3_recover_backport. Our plan is
to use that backport until 1.7 is stable, then we can upgrade without
(hopefully) having to change any code. We still recognize there is a
possibility for bugs in the backport, but
for us that is mitigated by the fact that we are okay with at-least-once
and if all else fails, we have a period of transition where we have this
data being written in another location we can fall back to.

So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that
should hopefully be fixed *soon*. If you can wait, that is the easiest, if
you can't, building either your own custom sink or your own flink with the
backport isn't a terrible option.

Hope that helps!

Adddison




On Sun, Nov 4, 2018 at 3:09 AM Flink Developer 
wrote:

> Hi Ravi, some questions:
>
>1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
>flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
>hadoop-common) ? If so, could you please share your dependency versioning?
>2. Does this use a kafka source with high flink parallelism (~400) for
>all kafka partitions and does it run continuously for several days?
>3. Could you please share your checkpoint interval configuration,
>batch file size, batch rollover interval configuration, and sink prefix
>(s3:// ,  s3a://)
>
> Thank you
> ‐‐‐ Original Message ‐‐‐
> On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
> I have done little changes in BucketingSink and implemented as new
> CustomBucketingSink to use in my project which works fine with s3 and s3a
> protocol.  This implementation doesn't require xml file configuration,
> rather than it uses configuration provided using flink configuration object
> by calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer  wrote:
>
>> It seems the issue also appears when using
>> *Flink version 1.6.2 . *
>> ‐‐‐ Original Message ‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
>> developer...@protonmail.com> wrote:
>>
>> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
>> version 1.6.2* fix

Re: Non deterministic result with Table API SQL

2018-11-05 Thread Fabian Hueske
Thanks Flavio for reporting the error helping to debug it.
A job to reproduce the error is very valuable :-)

Best, Fabian

Am Mo., 5. Nov. 2018 um 14:38 Uhr schrieb Flavio Pompermaier <
pomperma...@okkam.it>:

> Here it is the JIRA ticket and, attached to if, the Flink (Java) job to
> reproduce the error: https://issues.apache.org/jira/browse/FLINK-10795
>
> On Wed, Oct 31, 2018 at 4:46 PM Timo Walther  wrote:
>
>> As far as I know STDDEV_POP is translated into basic aggregate functions
>> (SUM/AVG/COUNT). But if this error is reproducible in a little test
>> case, we should definitely track this in JIRA.
>>
>>
>> Am 31.10.18 um 16:43 schrieb Flavio Pompermaier:
>> > Adding more rows to the dataset lead to a deterministic error. My
>> > tests says that the problem arise when adding the STDDEV_POP to the
>> > query..
>> > Do you think it could be possible that there's a concurrency problem
>> > in its implementation?
>> >
>>
>>
>
>


Re: Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-05 Thread Hao Sun
Thanks all.

On Mon, Nov 5, 2018 at 2:05 AM Ufuk Celebi  wrote:

> On Sun, Nov 4, 2018 at 10:34 PM Hao Sun  wrote:
> > Thanks that also works. To avoid same issue with zookeeper, I assume I
> have to do the same trick?
>
> Yes, exactly. The following configuration [1] entry takes care of this:
>
> high-availability.cluster-id: application-1
>
> This will result in ZooKeeper entries as follows:
> /flink/application-1/[...].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html#config-file-flink-confyaml
> 
>


Re: Non deterministic result with Table API SQL

2018-11-05 Thread Flavio Pompermaier
Here it is the JIRA ticket and, attached to if, the Flink (Java) job to
reproduce the error: https://issues.apache.org/jira/browse/FLINK-10795

On Wed, Oct 31, 2018 at 4:46 PM Timo Walther  wrote:

> As far as I know STDDEV_POP is translated into basic aggregate functions
> (SUM/AVG/COUNT). But if this error is reproducible in a little test
> case, we should definitely track this in JIRA.
>
>
> Am 31.10.18 um 16:43 schrieb Flavio Pompermaier:
> > Adding more rows to the dataset lead to a deterministic error. My
> > tests says that the problem arise when adding the STDDEV_POP to the
> > query..
> > Do you think it could be possible that there's a concurrency problem
> > in its implementation?
> >
>
>


Re: Questions about Savepoints

2018-11-05 Thread Yun Tang
Hi Ning

You have asked several questions, I'll try to answer some of them:

- In my job, it takes over 30 minutes to take a savepoint of over 100GB
  on 3 TMs. Most time spent after the alignment. I assume it was
  serialization and uploading to S3. However, when I resume a new job
  from the savepoint, it only takes seconds to recover. It seems too
  fast to me. I've tried resuming from the savepoint with a different
  parallelism. It was also very fast. Is this expected?
The checkpoint alignment would increase the overall duration for checkpoint. 
And I think the 'seconds to recover' you mean here may not be accurate, when 
you see the task RUNNING in the web UI, this is just the beginning, 
task-running does not mean state-backend has been recovered, the rocksDB 
state-backend would step into recovering after you see logs below:

INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
Initializing RocksDB keyed state backend.


- Is there any log messages on the JM or the TMs indicating when a job
  or operator restored state from a savepoint? It'll be very helpful to know if 
state is restored especially when the
  "--allowNonRestoredState" flag is set.
If allowNonRestoredState is set, and not all operator states can be mapped to 
the running job, you could see logs in JM:

Skipped checkpoint state for operator xxx


 - If a checkpoint was successfully taken after a savepoint, will
  resuming a job from the savepoint try to leverage the checkpoint?

Definitely NO. You could just resume from the checkpoint if you want to 
leverage that [1]


 - The job uses Kafka as the source, when I resume it from savepoint,
  when will the job start consuming from Kafka again? Does it wait until
  all operators have finished restoring state or does it start as soon
  as the source operator finishes restoring? I assume it waits for all
  because that's the only way to guarantee transactionality.
As I have said below, task transforms to RUNNING does not mean they have 
finished restoring state, they just start to restore state actually. The source 
operator would start to consume data once they have reached run() in 
SteamTask.java [2], the checkpoint mechanism would guarantee the 
transactionality.[3]

- When cancelling a job with a savepoint, is there anyway to prevent the
  job from cancelling if the savepoint fails? Otherwise, it sounds too
  dangerous to use this operation.
You could read the guide, "The job will only be cancelled if the savepoint 
succeeds." [4]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://github.com/apache/flink/blob/ddcdfa5b8e89a7fb9bfe065bae376ff8571abf85/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L300
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html#checkpointing
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/cli.html#cancel-with-a-savepoint

Best
Yun Tang

From: Ning Shi 
Sent: Monday, November 5, 2018 8:28
To: user@flink.apache.org
Subject: Questions about Savepoints

I have the following questions regarding savepoint recovery.

- In my job, it takes over 30 minutes to take a savepoint of over 100GB
  on 3 TMs. Most time spent after the alignment. I assume it was
  serialization and uploading to S3. However, when I resume a new job
  from the savepoint, it only takes seconds to recover. It seems too
  fast to me. I've tried resuming from the savepoint with a different
  parallelism. It was also very fast. Is this expected?

- Is there any log messages on the JM or the TMs indicating when a job
  or operator restored state from a savepoint? It'll be very helpful to
  know if state is restored especially when the
  "--allowNonRestoredState" flag is set.

- If a checkpoint was successfully taken after a savepoint, will
  resuming a job from the savepoint try to leverage the checkpoint?

- The job uses Kafka as the source, when I resume it from savepoint,
  when will the job start consuming from Kafka again? Does it wait until
  all operators have finished restoring state or does it start as soon
  as the source operator finishes restoring? I assume it waits for all
  because that's the only way to guarantee transactionality.

- When cancelling a job with a savepoint, is there anyway to prevent the
  job from cancelling if the savepoint fails? Otherwise, it sounds too
  dangerous to use this operation.

Thanks,

--
Ning


Split one dataset into multiple

2018-11-05 Thread madan
Hi,

I have a custom iterator which gives data of multitple entities. For
example iterator gives data of Department, Employee and Address. Record's
entity type is identified by a field value. And I need to apply different
set of operations on each dataset. Ex., Department data may have
aggregations, Employee and Address data are simply joined together after
some filteration.

If I have different datasets for each entity type the job is easy. So I am
trying to split incoming data to different datasets. What is the best
possible way to achieve this ?

*Iterator can be read only once.


-- 
Thank you,
Madan.


Re: Flink 1.6 job cluster mode, job_id is always 00000000000000000000000000000000

2018-11-05 Thread Ufuk Celebi
On Sun, Nov 4, 2018 at 10:34 PM Hao Sun  wrote:
> Thanks that also works. To avoid same issue with zookeeper, I assume I have 
> to do the same trick?

Yes, exactly. The following configuration [1] entry takes care of this:

high-availability.cluster-id: application-1

This will result in ZooKeeper entries as follows: /flink/application-1/[...].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html#config-file-flink-confyaml


Always trigger calculation of a tumble window in Flink SQL

2018-11-05 Thread yinhua.dai
We have a requirement that always want to trigger a calculation on a timer
basis e.g. every 1 minute.

*If there are records come in flink during the time window then calculate it
with the normal way, i.e. aggregate for each record and getResult() at end
of the time window.*

*If there are no records come in flink during the time window, then send the
last calculated result.*

I know that Flink will not trigger the calculation in the second case(when
no records come in the system during the time window), if there a solution
for me in Flink SQL?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


"org.apache.flink.client.program.ProgramInvocationException: Unknown I/O error while extracting contained jar files

2018-11-05 Thread wangziyu
Hi,
I use monitor Restful api ,“/jars/{jars}/run” to test my environment.The
exception is happend.
I did exactly that:
1.I use “/jars/upload” to upload my jar.
2.I wanted to test my jar.
 That is all. How can I  solve this exception.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/