Aggregation using event timestamp than clock window

2018-01-11 Thread Rohan Thimmappa
Hi All,


I have following requirement

1. i have avro json message containing {eventid, usage, starttime, endtime}
2. i am reading this from kafka source

3. if there is overlapping hour in a record split the record by rounding
off to hourly bounderies
4.My objective is a) read the message b) aggregate the usage between the
hour
5. send the aggregated data to another kafka topic.

i don't want aggregate based on clock window. if i see next hour in endtime
then i would like to close the window and aggregated usage to be send down
to kafka sink topic.


eg:
input data
4.55 - 5.00
5.00 -5.25
5.25- 5.55.
5.55-625

after split
4.55- 5.00 - expect record to be going out with this
5.00 -5.25
5.25- 5.55.
5.55-6.00 - expect record to be going out with this
5.00-625




1. i have set the eventime :
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String,
Report]] = stream
  .flatMap(new SplitFlatMap()  // checks if the overlapping hour if
yes then create split recordr with hourly boundarry
  .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))

  .reduce(new Counter()) //aggrigates the usage collected within window

3. here is the implementation for timestampeextractor

class ReportTimestampExtractor extends
AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with
Serializable {
  override def extractTimestamp(e: Tuple2[String, Report],
prevElementTimestamp: Long) = {
e.f1.getEndTime
  }

  override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
  }
}


I see the aggregation is generated only the clock window rather than
when the window sees next hour in the record.



Is there anything i am missing. by definition eventtime if i set it
should respect message time rather than clock window




-- 
Thanks
Rohan


How can I count the element in datastream

2018-01-11 Thread 邓俊华
hi,
How can I count the element in datastream? I don't want the keyBy().




can we recover job use latest checkpointed state instead of savepoint, and how?

2018-01-11 Thread hzyuemeng1






i found that use savepoint to recover job sometimes failed, can we use  latest checkpointed to recover failed jobhow?thank you.






发自网易邮箱大师






Re: Parallelizing a tumbling group window

2018-01-11 Thread Colin Williams
Thanks for the reply. Unfortunately that project was unexpectedly cancelled
but for other reasons. I was happy to work on it, and hopefully gained some
insight. I have another question today unrelated towards Elasticsearch
sinks, and will ask there.

On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske  wrote:

> Hi Colin,
>
> There are two things that come to my mind:
>
> 1) You mentioned "suspect jobs are grouping by a field of constant
> values". Does that mean that the grouping key is always constant? Flink
> parallelizes the window computation per key, i.e., there is one thread per
> key. Although it would be possible to perform pre-aggregations, this is not
> done yet. There is an effort to add support for this to the DataStream API
> [1]. The Table API will hopefully leverage this once it has been added to
> the DataStream API.
> 2) Another reason for backpressure can be non-aligned watermarks, i.e.,
> the watermarks of different partitions diverge too much from each other. In
> this case, windows cannot be finalized because everything is aligned to the
> lowest watermark.
>
> Hope this helps to clarify things.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-7561
>
> 2017-12-30 0:11 GMT+01:00 Colin Williams  >:
>
>> Hi Timo and flink-user,
>>
>>
>> It's been a few weeks and we've made some changes to the application
>> mentioned on this email. we've also updated for flink 1.4 . We are using
>> the SQL / Table API with a tumbling window and user defined agg to generate
>> a SQL query string like:
>>
>>
>> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).
>>
>>
>>
>> I've experimented with parallelism of the operators and setting the
>> environments parallelism as suggested. I've been setting parallelism values
>> of 2 or 4 to all operators except the consumer and sink.
>>
>>
>> For some jobs with large kafka source topics, under load we experience
>> back pressure and see some lag. But when trying to address via parallelism:
>> so far I've only seen very degraded performance from the increased
>> parallelism settings.
>>
>>
>> Furthermore, the suspect jobs are grouping by a field of constant values.
>> Then these jobs usually have 40,000 or so grouped records enter the
>> aggregator for each minute window.
>>
>>
>>
>> I would think that the tumbling windows would allow the job to process
>> each window in another task slot, parallelizing each window. But maybe
>> that's not happening?
>>
>>
>>
>> Can you help us to understand why parallelizing the job only has a
>> degraded impact on performance and what I can do to change this?
>>
>>
>>
>>
>> Happy New Year!
>>
>>
>>
>> Colin Williams
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther  wrote:
>>
>>> Hi Colin,
>>>
>>> unfortunately, selecting the parallelism for parts of a SQL query is not
>>> supported yet. By default, tumbling window operators use the default
>>> parallelism of the environment. Simple project and select operations have
>>> the same parallelism as the inputs they are applied on.
>>>
>>> I think the easiest solution so far is to explicilty set the parallelism
>>> of operators that are not part of the Table API and use the environment's
>>> parallelism to scale the SQL query.
>>>
>>> I hope that helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>>>
>>> Hello,
>>>
>>> I've inherited some flink application code.
>>>
>>> We're currently creating a table using a Tumbling SQL query similar to
>>> the first example in
>>>
>>>  https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>> /dev/table/sql.html#group-windows
>>>
>>> Where each generated SQL query looks something like
>>>
>>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
>>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>>>
>>> We are also using a UDFAGG function in some of the queries which I think
>>> might be cleaned up and optimized a bit (using scala types and possibly not
>>> well implemented)
>>>
>>> We then turn the result table back into a datastream using
>>> toAppendStream, and eventually add a derivative stream to a sink. We've
>>> configured TimeCharacteristic to event-time processing.
>>>
>>> In some streaming scenarios everything is working fine with a
>>> parallelism of 1, but in others it appears that we can't keep up with the
>>> event source.
>>>
>>> Then we are investigating how to enable parallelism specifically on the
>>> SQL table query or aggregator.
>>>
>>> Can anyone suggest a good way to go about this? It wasn't clear from the
>>> documentation.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Kinesis Connectors - With Temporary Credentials

2018-01-11 Thread Tzu-Li (Gordon) Tai
Ah, I see. Temporary Credentials are delegated through the AWS Security Token 
Service through the AssumeRole API.
Sorry, I wasn’t knowledgable of the Temporary Credentials feature before.

Seems like we should add support for the 
STSAssumeRoleSessionCredentialsProvider [1]. And yes, your observation is 
correct that I think this would be a matter of extending the AWSUtil class.

I’ve filed a JIRA for the issue: FLINK-8417 [2]. Would you like to contribute 
this feature? That would be of great help and I think it’ll be a useful 
addition. If yes, feel free to ping me for any questions you may have.

Cheers,
Gordon

[1] 
https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html
[2] https://issues.apache.org/jira/browse/FLINK-8417

On 12 January 2018 at 7:46:10 AM, sreenath kodedala (veda...@me.com) wrote:

No, they are not but we can definitely look into that.

If no, is there a workaround to implement or customize AWS Utils?

Thank you

On Jan 11, 2018, at 6:41 PM, Tzu-Li (Gordon) Tai  wrote:

Hi Sree,

Are Temporary Credentials automatically shipped with AWS EC2 instances when 
delegated to the role?
If yes, you should be able to just configure the properties so that the Kinesis 
consumer automatically fetches credentials from the AWS instance.
To do that, simply do not provide the Access Key and Secret Key explicitly in 
the properties, and it will use the above default behaviour.

Apparently, the Kinesis connector docs [1] do not educate this preferred 
default behavior well enough. I’ll file a JIRA to improve that.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html
On 12 January 2018 at 7:25:58 AM, sreenath kodedala (veda...@me.com) wrote:


>  
> Hi, 
>  
> According to my understanding, Kinesis Connector requires Access Key and 
> Secret Key to connect. 
>  
> Is it possible or any work around to use Temporary Credentials from AWS to 
> use in Kinesis Connector? 
> We have scenario where we are trying to access cross-account Stream and we 
> are assuming the role. So, in this scenario we get temporary credentials with 
> a token which will expire every hour.  
>  
> Thank you 
> -Sree



Re: Kinesis Connectors - With Temporary Credentials

2018-01-11 Thread sreenath kodedala
No, they are not but we can definitely look into that.

If no, is there a workaround to implement or customize AWS Utils?

Thank you

> On Jan 11, 2018, at 6:41 PM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Sree,
> 
> Are Temporary Credentials automatically shipped with AWS EC2 instances when 
> delegated to the role?
> If yes, you should be able to just configure the properties so that the 
> Kinesis consumer automatically fetches credentials from the AWS instance.
> To do that, simply do not provide the Access Key and Secret Key explicitly in 
> the properties, and it will use the above default behaviour.
> 
> Apparently, the Kinesis connector docs [1] do not educate this preferred 
> default behavior well enough. I’ll file a JIRA to improve that.
> 
> Cheers,
> Gordon
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html
>  
> 
> On 12 January 2018 at 7:25:58 AM, sreenath kodedala (veda...@me.com 
> ) wrote:
> 
>> 
>> >  
>> > Hi, 
>> >  
>> > According to my understanding, Kinesis Connector requires Access Key and 
>> > Secret Key to connect. 
>> >  
>> > Is it possible or any work around to use Temporary Credentials from AWS to 
>> > use in Kinesis Connector? 
>> > We have scenario where we are trying to access cross-account Stream and we 
>> > are assuming the role. So, in this scenario we get temporary credentials 
>> > with a token which will expire every hour.  
>> >  
>> > Thank you 
>> > -Sree



Re: Kinesis Connectors - With Temporary Credentials

2018-01-11 Thread Tzu-Li (Gordon) Tai
Hi Sree,

Are Temporary Credentials automatically shipped with AWS EC2 instances when 
delegated to the role?
If yes, you should be able to just configure the properties so that the Kinesis 
consumer automatically fetches credentials from the AWS instance.
To do that, simply do not provide the Access Key and Secret Key explicitly in 
the properties, and it will use the above default behaviour.

Apparently, the Kinesis connector docs [1] do not educate this preferred 
default behavior well enough. I’ll file a JIRA to improve that.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kinesis.html
On 12 January 2018 at 7:25:58 AM, sreenath kodedala (veda...@me.com) wrote:


>  
> Hi,  
>  
> According to my understanding, Kinesis Connector requires Access Key and 
> Secret Key to connect.  
>  
> Is it possible or any work around to use Temporary Credentials from AWS to 
> use in Kinesis Connector?  
> We have scenario where we are trying to access cross-account Stream and we 
> are assuming the role. So, in this scenario we get temporary credentials with 
> a token which will expire every hour.  
>  
> Thank you  
> -Sree  



CEP issue in 1.3.2. Does 1.4 fix this ?

2018-01-11 Thread Vishal Santoshi
When checkpointing is turned on a simple CEP loop pattern

 private Pattern, ?> alertPattern =
Pattern.>begin("start").where(checkStatusOn)
.followedBy("middle").where(checkStatusOn).times(2)
.next("end").where(checkStatusOn).within(Time.minutes(5))

I see failures.

SimpleBinaryEvent is

public class SimpleBinaryEvent implements Serializable {

private int id;
private int sequence;
private boolean status;
private long time;

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
this.id = id;
this.sequence = sequence;
this.status = status;
this.time = time;
}
public int getId() {
return id;
}
public int getSequence() {
return sequence;
}
public boolean isStatus() {
return status;
}
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SimpleBinaryEvent that = (SimpleBinaryEvent) o;

if (getId() != that.getId()) return false;
if (isStatus() != that.isStatus()) return false;
if (getSequence() != that.getSequence()) return false;
return getTime() == that.getTime();
}

@Override
public int hashCode() {
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());
int result = getId();
result = 31 * result + (isStatus() ? 1 : 0);
result = 31 * result + getSequence();
result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
return result;
}

@Override
public String toString() {
return "SimpleBinaryEvent{" +
"id='" + id + '\'' +
", status=" + status +
", sequence=" + sequence +
", time=" + time +
'}';
}

}

failure cause:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
status=true, sequence=95, time=150550338}), 150550338, 0),

I am sure I have the equals() and hashCode() implemented the way it should
be. I have tried the Objects.hashCode too. In other instances I have had
CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(),
which again points to issues with references ( equality and what not ).
Without checkpointing turned on it works as expected. I am running on a
local cluster. Is CEP production ready ?

I am using 1.3.2 Flink


Re: class loader issues when closing streams

2018-01-11 Thread Jared Stehler
Here’s a more complete view of the task manager log from the start of this 
occurrence:

2018-01-11 14:50:08.286 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-54-205.us-west-2.compute.internal:2181)] INFO  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable to read 
additional data from server sessionid 0x3c2d8a7603de, likely server has 
closed socket, closing socket connection and attempting reconnect
2018-01-11 14:50:08.388 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates (5/10)-EventThread] INFO  
c.i.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: 
SUSPENDED
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - TaskManager 
akka://flink/user/taskmanager disconnects from JobManager 
akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested 
disconnect: JobManager is no longer the leader
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  
o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Cancelling all 
computations and discarding all cached data.
2018-01-11 14:50:08.450 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally 
Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.452 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Sink: ES (5/10) 
(1a2951add18548188742e85d98da271f) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from 
JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager 
requested disconnect: JobManager is no longer the leader
at 
org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:314)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-11 14:50:08.486 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Triggering cancellation of task 
code Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally 
Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.taskmanager.Task  - Sink: Kafka (5/10) 
(71bebe47ce524c0d535845b1e4d9c595) switched from RUNNING to FAILED.

...

2018-01-11 14:50:08.550 [Sink: Kafka (5/10)] INFO  
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer 
with timeoutMillis = 9223372036854775807 ms.
2018-01-11 14:50:08.553 [Sink: Kafka (5/10)] ERROR 
org.apache.kafka.clients.producer.KafkaProducer  - Interrupted while joining 
ioThread
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
at 

Re: Two issues when deploying Flink on DC/OS

2018-01-11 Thread Gary Yao
Hi Dongwon,

I am not familiar with the deployment on DC/OS. However, Eron Wright and
Jörg
Schad (cc'd), who have worked on the Mesos integration, might be able to
help
you.

Best,
Gary

On Tue, Jan 9, 2018 at 10:29 AM, Dongwon Kim  wrote:

> Hi,
>
> I've launched JobManager and TaskManager on DC/OS successfully.
> Now I have two new issues:
>
> 1) All TaskManagers are scheduled on a single node.
> - Is it intended to maximize data locality and minimize network
> communication cost?
> - Is there an option in Flink to adjust the behavior of JobManager when it
> considers multiple resource offers from different Mesos agents?
> - I want to schedule TaskManager processes on different GPU servers so
> that each TaskManger process can use its own GPU cards exclusively.
> - Below is a part of JobManager log that is occurring while JobManager is
> negotiating resources with the Mesos master:
>
> 2018-01-09 07:34:54,872 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosJobManager  - JobManager 
> akka.tcp://flink@dnn-g08-233:18026/user/jobmanager was granted leadership 
> with leader session ID Some(----).
> 2018-01-09 07:34:55,889 INFO  
> org.apache.flink.mesos.scheduler.ConnectionMonitor- Connecting to 
> Mesos...
> 2018-01-09 07:34:55,962 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Trying to associate with JobManager leader 
> akka.tcp://flink@dnn-g08-233:18026/user/jobmanager
> 2018-01-09 07:34:55,977 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Resource Manager associating with leading JobManager 
> Actor[akka://flink/user/jobmanager#-1481183359] - leader session 
> ----
> 2018-01-09 07:34:56,479 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Scheduling Mesos task taskmanager-1 with (10240.0 MB, 8.0 cpus).
> 2018-01-09 07:34:56,481 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Scheduling Mesos task taskmanager-2 with (10240.0 MB, 8.0 cpus).
> 2018-01-09 07:34:56,481 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Scheduling Mesos task taskmanager-3 with (10240.0 MB, 8.0 cpus).
> 2018-01-09 07:34:56,481 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Scheduling Mesos task taskmanager-4 with (10240.0 MB, 8.0 cpus).
> 2018-01-09 07:34:56,481 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager  - 
> Scheduling Mesos task taskmanager-5 with (10240.0 MB, 8.0 cpus).
> 2018-01-09 07:34:56,483 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator- Now gathering 
> offers for at least 5 task(s).
> 2018-01-09 07:34:56,484 INFO  
> org.apache.flink.mesos.scheduler.ConnectionMonitor- Connected to 
> Mesos as framework ID 59b85b42-a4a2-4632-9578-9e480585ecdc-0004.
> 2018-01-09 07:34:56,690 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator- Received 
> offer(s) of 606170.0 MB, 234.2 cpus:
> 2018-01-09 07:34:56,692 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2174 from 50.1.100.233 of 86.0 MB, 
> 45.9 cpus for [*]
> 2018-01-09 07:34:56,692 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2175 from 50.1.100.235 of 123506.0 MB, 
> 47.3 cpus for [*]
> 2018-01-09 07:34:56,692 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2176 from 50.1.100.234 of 124530.0 MB, 
> 46.6 cpus for [*]
> 2018-01-09 07:34:56,692 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2177 from 50.1.100.231 of 123474.0 MB, 
> 47.2 cpus for [*]
> 2018-01-09 07:34:56,693 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 59b85b42-a4a2-4632-9578-9e480585ecdc-O2178 from 50.1.100.232 of 123474.0 MB, 
> 47.2 cpus for [*]
> 2018-01-09 07:34:57,711 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator- Processing 5 
> task(s) against 5 new offer(s) plus outstanding offers.
> 2018-01-09 07:34:57,726 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator- Resources 
> considered: (note: expired offers not deducted from below)
> 2018-01-09 07:34:57,727 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 50.1.100.234 has 124530.0 MB, 46.6 cpus
> 2018-01-09 07:34:57,728 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 50.1.100.235 has 123506.0 MB, 47.3 cpus
> 2018-01-09 07:34:57,728 INFO  
> org.apache.flink.mesos.scheduler.LaunchCoordinator-   
> 50.1.100.232 has 123474.0 MB, 47.2 cpus
> 2018-01-09 

Re: class loader issues when closing streams

2018-01-11 Thread Jared Stehler
As another data point, here’s an except from a stack dump for the task manager:

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 
tid=0x7f48c04d4800 nid=
0x68ef waiting on condition [0x7f48470eb000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xcd6121c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
- None

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> 
filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)" #93 daemon 
prio=5 os_prio=0 tid=0x7f48c04e1800 nid=0x68ee waiting on condition 
[0x7f48471ec000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051)

   Locked ownable synchronizers:
- None

"Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 
tid=0x7f48c80a4800 nid=0x6829 waiting on condition [0x7f4851e08000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xcc4f7950> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
- None

"Sink: ES (ip-10-80-53-99.us-west-2.compute.internal:2181)" #67 daemon prio=5 
os_prio=0 tid=0x7f48c80aa000 nid=0x6827 runnable [0x7f4851f09000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xcc4f7c18> (a sun.nio.ch.Util$3)
- locked <0xcc4f7c08> (a java.util.Collections$UnmodifiableSet)
- locked <0xcc4f7bc0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

   Locked ownable synchronizers:
- None

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jan 11, 2018, at 10:07 AM, Jared Stehler 
>  wrote:
> 
> I’m seeing sporadic issues where it appears that curator (or other) user 
> threads are left running after a stream shutdown, and then the user class 
> loader goes away and I get spammed with ClassNotFoundExceptions… I’m 
> wondering if this might have something to do with perhaps the UserClassLoader 
> being shut down before close is invoked on all operators?
> 
> Here’s a stack trace I see from an attempt at closing an elastic search sink:
> 
> java.lang.ClassNotFoundException: 
> com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
> at 
> com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
> at 
> com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
> at 
> com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
> at 
> 

class loader issues when closing streams

2018-01-11 Thread Jared Stehler
I’m seeing sporadic issues where it appears that curator (or other) user 
threads are left running after a stream shutdown, and then the user class 
loader goes away and I get spammed with ClassNotFoundExceptions… I’m wondering 
if this might have something to do with perhaps the UserClassLoader being shut 
down before close is invoked on all operators?

Here’s a stack trace I see from an attempt at closing an elastic search sink:

java.lang.ClassNotFoundException: 
com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
at 
com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
at 
com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
at 
com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
at 
com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
at 
com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
at 
com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
at 
com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
at com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
I’m using a curator connection for archaius, and closing it in the call 
bridge’s cleanup method. I’m ensuring that I’m not reaching up into the parent 
class loader by shading curator and zookeeper. 

I also see the following on repeat in my task manager log:

2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> 
filter-inactive-ds -> filter-duplicates 
(ip-10-80-53-99.us-west-2.compute.internal:2181)] WARN  
c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
0x3c2d8a7603de for server 
ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, unexpected error, 
closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: 
com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
at 
com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)


Does anyone have any insight into what might be happening here? Does this seem 
like I’m not closing a thread properly, or something else entirely?


--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703





CaseClassTypeInfo fails to deserialize in flink 1.4 with Parent First Class Loading

2018-01-11 Thread Seth Wiesman
This is less of a question and more of a PSA.

It looks like there is some sort of binary incompatible change in the scala 
standard library class `scala.collection.immutable.::`  between point releases 
of scala 2.11. CaseClassTypeInfo generated by the type information macro will 
fail to deserialize in user code with parent first class loading if the 
application is not compiled with 2.11.12.  The following will work with Child 
First Class Loading but fail with Parent First.


case class CustomClass(a: Int, b: Float)

class CustomMapFunction[T >: Null : TypeInformation] extends 
MapFunction[String, T] {
  override def map(value: String) = {
val typeInfo = implicitly[TypeInformation[T]]

// custom deserialization here
null
  }
}


env
  .fromCollection(Iterator[String](""))
  .map(new CustomMapFunction[CustomClass])
  .print()



[cid:image001.png@01D38AC3.21940180]

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007





Re: user Digest 11 Jan 2018 11:24:06 -0000 Issue 2610

2018-01-11 Thread Boris Lublinsky
Hi Timo 
"You don't need to specify the type in .flatMap() explicitly. It will be 
automatically extracted using the generic signature of DataDataConverter.”
It does not. That is the reason why I had to add it there

> Regarding your error. Make sure that you don't mix up the API classes. If you 
> want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
I rewrote the class in Java. Thats why I am so confused



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> 
> From: Timo Walther 
> Subject: Re: Java types
> Date: January 11, 2018 at 3:07:08 AM CST
> To: user@flink.apache.org
> 
> 
> Hi Boris,
> 
> each API is designed language-specific so they might not always be the same. 
> Scala has better type extraction features and let you write code very 
> precisely. Java requires sometime more code to archieve the same.
> 
> You don't need to specify the type in .flatMap() explicitly. It will be 
> automatically extracted using the generic signature of DataDataConverter.
> 
> Regarding your error. Make sure that you don't mix up the API classes. If you 
> want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
> 
> Regards,
> Timo
> 
> 
> 
> Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
>> More questions
>> In Scala my DataProcessor is defined as
>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
>> Double] with CheckpointedFunction {
>> And it is used as follows
>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>   .flatMap(BadDataHandler[ModelToServe])
>>   .keyBy(_.dataType)
>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>   .flatMap(BadDataHandler[WineRecord])
>>   .keyBy(_.dataType)
>> 
>> // Merge streams
>> data
>>   .connect(models)
>>   .process(DataProcessorKeyed())
>> When I am doing the same thing in Java
>> public class DataProcessorKeyed extends 
>> CoProcessFunction implements 
>> CheckpointedFunction{
>> Which I am using as follows
>> // Read data from streams
>> DataStream> models = modelsStream
>> .flatMap(new ModelDataConverter(), new 
>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
>> TypeInformation.of(ModelToServe.class)))
>> .keyBy(0);
>> DataStream> data = dataStream
>> .flatMap(new DataDataConverter(), new 
>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
>> TypeInformation.of(Winerecord.WineRecord.class)))
>> .keyBy(0);
>> 
>> // Merge streams
>> data
>> .connect(models)
>> .process(new DataProcessorKeyed());
>> I am getting an error
>> 
>> Error:(68, 17) java: no suitable method found for keyBy(int)
>> method 
>> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq)
>>  is not applicable
>>   (argument mismatch; int cannot be converted to 
>> scala.collection.Seq)
>> method 
>> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.Function1,K>,org.apache.flink.api.common.typeinfo.TypeInformation)
>>  is not applicable
>>   (cannot infer type-variable(s) K
>> (actual and formal argument lists differ in length))
>> So it assumes key/value pairs for the coprocessor
>> 
>> Why is such difference between APIs?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 
>> https://www.lightbend.com/ 
>>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky >> > wrote:
>>> 
>>> I am trying to covert Scala code (which works fine) to Java
>>> The sacral code is:
>>> // create a Kafka consumers
>>> // Data
>>> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   DATA_TOPIC,
>>>   new ByteArraySchema,
>>>   dataKafkaProps
>>> )
>>> 
>>> // Model
>>> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>>>   MODELS_TOPIC,
>>>   new ByteArraySchema,
>>>   modelKafkaProps
>>> )
>>> 
>>> // Create input data streams
>>> val modelsStream = env.addSource(modelConsumer)
>>> val dataStream = env.addSource(dataConsumer)
>>> 
>>> // Read data from streams
>>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>>   .flatMap(BadDataHandler[ModelToServe])
>>>   .keyBy(_.dataType)
>>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>>   .flatMap(BadDataHandler[WineRecord])
>>>   .keyBy(_.dataType)
>>> Now I am trying to re write it to Java and fighting with the requirement of 
>>> providing types, where they should be obvious
>>> 
>>> // create a Kafka consumers
>>> // Data
>>> FlinkKafkaConsumer010 dataConsumer = new FlinkKafkaConsumer010<>(
>>> 

Re: hadoop-free hdfs config

2018-01-11 Thread Till Rohrmann
Thanks for trying it out and letting us know.

Cheers,
Till

On Thu, Jan 11, 2018 at 9:56 AM, Oleksandr Baliev  wrote:

> Hi Till,
>
> thanks for your reply and clarification! With RocksDBStateBackend btw the
> same story, looks like a wrapper over FsStateBackend:
>
> 01/11/2018 09:27:22 Job execution switched to status FAILING.
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
> *at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)*
> *at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
> *at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
> * at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Then I also changed url for fs state backend to file:// which is ok, but
> then I have the same issue in BucketingSink:
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:358)
> initializeState(...)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.
> restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'hdfs'. The scheme
> is not directly supported by Flink and no Hadoop file system to support
> this scheme could be loaded.
> *at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
> *at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.initializeState(BucketingSink.java:355)
> ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> *Hadoop
> is not in the classpath/dependencies.*
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
> ... 13 more
>
>
> I was using for tests clean "Without bundled Hadood" flink binaries and
> didn't change anything in configs.
>
> Currently we have to persist checkpoints on "hdfs" so we will use some
> flink-shaded-hadoop2-uber*.jar anyway, thanks.
>
> Best,
> Sasha
>
> 2018-01-10 10:47 GMT+01:00 Till Rohrmann :
>
>> Hi Sasha,
>>
>> you're right that if you want to access HDFS from the user code only it
>> should be possible to use the Hadoop free Flink version and bundle the
>> Hadoop dependencies with your user code. However, if you want to use
>> Flink's 

Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-11 Thread Fabian Hueske
Another thing to point out is that watermarks are usually data-driven,
i.e., they depend on the timestamps of the events and not on the clock of
the machine.
Otherwise, you might observe a lot of late data, i.e., events with
timestamps smaller than the last watermark.

If you assign timestamps and watermarks based on the clock of the machine,
you might also use ingestion time instead of event time.

2018-01-11 11:49 GMT+01:00 Jayant Ameta :

> Thanks Gary,
> I was only trying with a fixed set of events, so the Watermark was not
> advancing, like you said.
>
>
> Jayant Ameta
>
> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao  wrote:
>
>> Hi Jayant,
>>
>> The difference is that the Watermarks from
>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest
>> timestamp of
>> all previous events. That is, if you do not receive new events, the
>> Watermark
>> will not advance. In contrast, your custom implementation of
>> AssignerWithPeriodicWatermarks always advances the Watermark based on the
>> wall
>> clock.
>>
>> Maybe this will already help you to debug your application. If not, it
>> would be
>> great to see a minimal working example.
>>
>> Best,
>> Gary
>>
>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta 
>> wrote:
>>
>>> Hi,
>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
>>> not firing. However, the trigger fires when using custom timestamp
>>> extractor with similar watermark.
>>>
>>> Sample code below:
>>> 1.Assigner as anonymous class which works fine
>>>
>>> AssignerWithPeriodicWatermarks> assigner = new 
>>> AssignerWithPeriodicWatermarks>() {
>>>
>>> @Override
>>> public long extractTimestamp(Tuple2 element, long 
>>> previousElementTimestamp) {
>>> return System.currentTimeMillis();
>>> }
>>>
>>> @Override
>>> public final Watermark getCurrentWatermark() {
>>> // this guarantees that the watermark never goes backwards.
>>> return new Watermark(System.currentTimeMillis()-100);
>>> }
>>> };
>>>
>>>
>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>>>
>>> AssignerWithPeriodicWatermarks> assigner = new 
>>> BoundedOutOfOrdernessTimestampExtractor>> T>>(Time.milliseconds(100)) {
>>>
>>> @Override
>>> public long extractTimestamp(Tuple2 element) {
>>> return System.currentTimeMillis();
>>> }
>>> };
>>>
>>>
>>> Do you see any difference in the approaches?
>>>
>>> - Jayant
>>>
>>
>>
>


Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-11 Thread Gary Yao
Hi Jayant,

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp
of
all previous events. That is, if you do not receive new events, the
Watermark
will not advance. In contrast, your custom implementation of
AssignerWithPeriodicWatermarks always advances the Watermark based on the
wall
clock.

Maybe this will already help you to debug your application. If not, it
would be
great to see a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta  wrote:

> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not
> firing. However, the trigger fires when using custom timestamp extractor
> with similar watermark.
>
> Sample code below:
> 1.Assigner as anonymous class which works fine
>
> AssignerWithPeriodicWatermarks> assigner = new 
> AssignerWithPeriodicWatermarks>() {
>
> @Override
> public long extractTimestamp(Tuple2 element, long 
> previousElementTimestamp) {
> return System.currentTimeMillis();
> }
>
> @Override
> public final Watermark getCurrentWatermark() {
> // this guarantees that the watermark never goes backwards.
> return new Watermark(System.currentTimeMillis()-100);
> }
> };
>
>
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>
> AssignerWithPeriodicWatermarks> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor T>>(Time.milliseconds(100)) {
>
> @Override
> public long extractTimestamp(Tuple2 element) {
> return System.currentTimeMillis();
> }
> };
>
>
> Do you see any difference in the approaches?
>
> - Jayant
>


Re: Java types

2018-01-11 Thread Timo Walther

Hi Boris,

each API is designed language-specific so they might not always be the 
same. Scala has better type extraction features and let you write code 
very precisely. Java requires sometime more code to archieve the same.


You don't need to specify the type in .flatMap() explicitly. It will be 
automatically extracted using the generic signature of DataDataConverter.


Regarding your error. Make sure that you don't mix up the API classes. 
If you want to use the Java API you should not use 
"org.apache.flink.streaming.api.scala.DataStream" but the Java one.


Regards,
Timo



Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:

More questions
In Scala my DataProcessor is defined as
class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, 
Double]with CheckpointedFunction {
And it is used as follows
val models = modelsStream.map(ModelToServe.fromByteArray(_))
   .flatMap(BadDataHandler[ModelToServe])
   .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
   .flatMap(BadDataHandler[WineRecord])
   .keyBy(_.dataType)

// Merge streams data
   .connect(models)
   .process(DataProcessorKeyed())
When I am doing the same thing in Java
public class DataProcessorKeyedextends CoProcessFunctionimplements CheckpointedFunction{
Which I am using as follows
// Read data from streams DataStream> models = 
modelsStream
 .flatMap(new ModelDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)))
 .keyBy(0); DataStream> data = 
dataStream
 .flatMap(new DataDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(Winerecord.WineRecord.class)))
 .keyBy(0); // Merge streams data
 .connect(models)
 .process(new DataProcessorKeyed());
I am getting an error

Error:(68, 17) java: no suitable method found for keyBy(int)
    method 
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq) 
is not applicable
      (argument mismatch; int cannot be converted to 
scala.collection.Seq)
    method 
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.Function1,K>,org.apache.flink.api.common.typeinfo.TypeInformation) 
is not applicable

      (cannot infer type-variable(s) K
        (actual and formal argument lists differ in length))
So it assumes key/value pairs for the coprocessor

Why is such difference between APIs?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com 
https://www.lightbend.com/

On Jan 10, 2018, at 6:20 PM, Boris Lublinsky 
> wrote:


I am trying to covert Scala code (which works fine) to Java
The sacral code is:
// create a Kafka consumers // Data val dataConsumer =new 
FlinkKafkaConsumer010[Array[Byte]](
   DATA_TOPIC, new ByteArraySchema, dataKafkaProps
)

// Model val modelConsumer =new FlinkKafkaConsumer010[Array[Byte]](
   MODELS_TOPIC, new ByteArraySchema, modelKafkaProps
)

// Create input data streams val modelsStream = env.addSource(modelConsumer)
val dataStream = env.addSource(dataConsumer)

// Read data from streams val models = 
modelsStream.map(ModelToServe.fromByteArray(_))
   .flatMap(BadDataHandler[ModelToServe])
   .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
   .flatMap(BadDataHandler[WineRecord])
   .keyBy(_.dataType)
Now I am trying to re write it to Java and fighting with the 
requirement of providing types, where they should be obvious


// create a Kafka consumers // Data FlinkKafkaConsumer010 dataConsumer =new 
FlinkKafkaConsumer010<>(
 ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // 
Model FlinkKafkaConsumer010  modelConsumer =new FlinkKafkaConsumer010<>(
 ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); 
// Create input data streams DataStream modelsStream = 
env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); 
DataStream dataStream = env.addSource(dataConsumer, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
// Read data from streams DataStream> models = 
modelsStream
  .flatMap(new ModelConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)));
Am I missing something similar toimport org.apache.flink.api.scala._
 In java?

Now if this is an only way, Does this seems right?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com 
https://www.lightbend.com/







Re: hadoop-free hdfs config

2018-01-11 Thread Oleksandr Baliev
Hi Till,

thanks for your reply and clarification! With RocksDBStateBackend btw the
same story, looks like a wrapper over FsStateBackend:

01/11/2018 09:27:22 Job execution switched to status FAILING.
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 'hdfs'. The scheme is not
directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
*at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.(FsCheckpointStreamFactory.java:99)*
*at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:277)*
*at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStreamFactory(StreamTask.java:787)*
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)


Then I also changed url for fs state backend to file:// which is ok, but
then I have the same issue in BucketingSink:

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
initializeState(...)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'hdfs'. The scheme
is not directly supported by Flink and no Hadoop file system to support
this scheme could be loaded.
*at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)*
*at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)*
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
*Hadoop
is not in the classpath/dependencies.*
at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
... 13 more


I was using for tests clean "Without bundled Hadood" flink binaries and
didn't change anything in configs.

Currently we have to persist checkpoints on "hdfs" so we will use some
flink-shaded-hadoop2-uber*.jar anyway, thanks.

Best,
Sasha

2018-01-10 10:47 GMT+01:00 Till Rohrmann :

> Hi Sasha,
>
> you're right that if you want to access HDFS from the user code only it
> should be possible to use the Hadoop free Flink version and bundle the
> Hadoop dependencies with your user code. However, if you want to use
> Flink's file system state backend as you did, then you have to start the
> Flink cluster with the Hadoop dependency in its classpath. The reason is
> that the FsStateBackend is part of the Flink distribution and will be
> loaded using the system class loader.
>
> One thing you could try out is to use the RocksDB state backend instead.
> Since the RocksDBStateBackend 

Re: Anyone got Flink working in EMR with KinesisConnector

2018-01-11 Thread Fabian Hueske
Great, thanks for reporting back!

2018-01-10 22:46 GMT+01:00 xiatao123 :

> got the issue fixed after applying patch from
> https://github.com/apache/flink/pull/4150
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>