Re: Lot of data generated in out file

2018-04-15 Thread Ashish Attarde
Thanks Gordon for your reply.

The out file mistry got resolved. Someone accidently, modified the POJO
code on server that I work on to, and had put in println.

Thank you for the information. I am experimenting with windowing to
understand better and fit in my use case.

Thanks
-Ashish

On Sun, Apr 15, 2018 at 10:09 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Ashish,
>
> I don't really see why there are outputs in the out file for the program
> you
> provided. Perhaps others could chime in here ..
>
> As for your second question regarding window outputs:
> Yes, subsequent window operators should definitely be doable in Flink.
> This is just a matter of multiple transformations in your pipeline.
> The only restriction right now, is that after a window operation, the
> stream
> is no longer a KeyedStream, so you would need to "re-key" the stream before
> applying the second windowed transformation.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>



-- 

Thanks
-Ashish Attarde


Re: Lot of data generated in out file

2018-04-15 Thread Tzu-Li (Gordon) Tai
Hi Ashish,

I don't really see why there are outputs in the out file for the program you
provided. Perhaps others could chime in here ..

As for your second question regarding window outputs:
Yes, subsequent window operators should definitely be doable in Flink.
This is just a matter of multiple transformations in your pipeline.
The only restriction right now, is that after a window operation, the stream
is no longer a KeyedStream, so you would need to "re-key" the stream before
applying the second windowed transformation.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Watermark and multiple streams

2018-04-15 Thread Tzu-Li (Gordon) Tai
Hi,

How are your registering your event time timers on processElement?
If you are continuously registering them, and watermarks are correctly
generated upstream, then the onTimer method should be invoked properly.

For your 1-to-many case, I would assume that whenever a new key arrives
(that previously has not seen events with the same key from other streams),
an event time timer is registered to be fired after a certain amount of time
which you allow to wait for other matching join records.

Does this help?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Simulating Time-based and Count-based Custom Windows with ProcessFunction

2018-04-15 Thread Tzu-Li (Gordon) Tai
Hi Max!

Before we jump into the custom ProcessFunction approach:
Have you also checked out using the RocksDB state backend, and whether or not 
it is suitable for your use case?
For state that would not fit into memory, that is usually the to-go state 
backend to use.

If you’re sure a custom ProcessFunction is still the way to go, then some 
answers to your questions:

1 -- How do I assign timestamps into my data tuples? What type of 
time...process, event or ingestion? 

This should be done via the `.assignTimestampsAndWatermarks` method on the 
output of an operator prior to the process function.
The timestamps assigned using this method is for event time processing.
Timestamps for processing and ingestion time processing are determined by the 
system.

2 -- How to simulate count-based windows? In this case, what would be the 
best artificial timestamps to append? Just increasing integers? 
I’m not sure of your actual use case here, but if you want to implement 
count-based windows, then timers should not need to be part of the 
implementation. On each processElement, you should fire results of a window’s 
state if the window’s element count has reached the target count.

- Gordon

On 13 April 2018 at 5:36:02 PM, m@xi (makisnt...@gmail.com) wrote:

Hello Flinkers!  

Around here and there one may find some post for sliding windows in Flink. I  
have read that default sliding windows of Flink, the system maintains each  
window separately in memory, which in my case is prohibitive.  

Therefore, I want to implement my own sliding windows through  
ProcessFunction() via onTimer() function. Specifically, let's assume that  
the data does not contain any timestamps. So, if anyone can help providing  
concrete answers or even *code skeletons* on the following bullets, it is  
more than welcome :  

1 -- How do I assign timestamps into my data tuples? What type of  
time...process, event or ingestion?  

2 -- How to simulate count-based windows? In this case, what would be the  
best artificial timestamps to append? Just increasing integers?  

Thanks in advance.  

Best,  
Max  




--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: Trying to understand KafkaConsumer_records_lag_max

2018-04-15 Thread Tzu-Li (Gordon) Tai
Hi Julio,

I'm not really sure, but do you think it is possible that there could be
some hard data retention setting for your Kafka topics in the staging
environment?
As in, at some point in time and maybe periodically, all data in the Kafka
topics are dropped and therefore the consumers effectively jump directly
back to the head again.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Tiemrs and restore

2018-04-15 Thread Tzu-Li (Gordon) Tai
Hi Alberto,

Looking at the code, I think the current behavior is that all timers (both 
processing time and event time) are re-registered on restore, and therefore 
should be triggered automatically.
So, for processing time timers, on restore all timers that were supposed to be 
fired while the job was down should fire automatically; for event time timers, 
they will be triggered once the watermark passes their timestamps.

Also looped in Aljoscha on this, in case I misunderstood anything.

Cheers,
Gordon

On 16 April 2018 at 1:20:00 AM, Alberto Mancini (ab.manc...@gmail.com) wrote:

Hello,
according to this stackoverflow response 
https://stackoverflow.com/questions/36306136/will-apache-flink-restore-trigger-timers-after-failure
IIUC we should expect that after a restore the timers will be not executed 
until a new timer is scheduled. 
I wonder if this is still true and if there is any chance of forcing the 
restart of the timer task.

Thank you. 

Regards,
   Alberto. 

User-defined aggregation function and parallelism

2018-04-15 Thread 杨力
I am running flink SQL in streaming mode and implemented a UDAGG, which is
used in keyed HOP windows. But I found that the throughput decreases
dramatically when the function is used. Does UDAGG run in parallell? Or
does it run only in one thread?

Regards,
Bill


Re: data enrichment with SQL use case

2018-04-15 Thread Ken Krugler
If the SQL data is all (or mostly all) needed to join against the data from 
Kafka, then I might try a regular join.

Otherwise it sounds like you want to use an AsyncFunction to do ad hoc queries 
(in parallel) against your SQL DB.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/asyncio.html
 


— Ken


> On Apr 15, 2018, at 12:15 PM, miki haiat  wrote:
> 
> Hi,
> 
> I have a case of meta data enrichment and im wondering if my approach is the 
> correct way .
> input stream from kafka. 
> MD in msSQL .
> map to new pojo 
> I need to extract  a key from the kafka stream   and use it to select some 
> values from the sql table  .
> 
> SO i thought  to use  the table SQL api in order to select the table MD 
> then convert the kafka stream to table and join the data by  the stream key .
> 
> At the end i need to map the joined data to a new POJO and send it to 
> elesticserch .
> 
> Any suggestions or different ways to solve this use case ?
> 
> thanks,
> Miki  
> 
> 
> 

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Scaling down Graphite metrics

2018-04-15 Thread Chesnay Schepler

Hello,

you can configure the rate at which metrics are reported by setting 
"metrics.reporter..interval" as described in the reporter 
documentation 
.


At this time there is no way to disable specific metrics.
You can however extend the reporter that you are using and override 
"MetricReporter#notifyOfAddedMetric(Metric, String, MetricGroup)" to 
ignore metrics that you aren't interested in.


On 13.04.2018 18:52, ashish pok wrote:

All,

We love Flinks OOTB metrics but boy there is a ton :) Any way to scale 
them down (frequency and metric itself)?


Flink apps are becoming huge source of data right now.

Thanks,

-- Ashish





Re: Old Flink jobs restarting on Job Manager failover

2018-04-15 Thread Gary Yao
Hi Steve,

What is the Flink version you are using?

Jobs are recovered from metadata stored in ZooKeeper. The behavior you
describe
indicates that the submitted job graph is not deleted from ZooKeeper. By
default, the jobs that should be running/recovered are stored in znode:

  /flink/default/jobgraphs

Can you check if the job id is still present in ZK after the cancelation? If
that is the case then there should be relevant warnings or errors in the
jobmanager log that should help debugging why the deletion failed.

Best,
Gary

On Thu, Apr 12, 2018 at 6:01 PM,  wrote:

> Hi all,
>
>
>
> We have a Flink environment using zookeeper to manage the cluster. The
> high availability option is set up with the high-availability.storageDir
> parameter set to a shared directory on NAS; this is available to all nodes.
>
>
>
> When zookeeper fails over to the standby JobManager during a cluster
> change, we see old jobs that have long been cancelled being restarted
> automatically by Flink. It seems like the standby JobManager is
> reconnecting with old configuration and old job details.
>
>
>
> I can’t see anything in the log that gives any indication why this old job
> is restarting. I have noticed that the blob.storage.directory is set to a
> local directory.
>
>
>
> Are there any other settings in Flink that might cause a Job Manager to
> restart against an old local state rather than the latest shared state?
>
>
>
> Thanks,
>
>
>
> Steve
>
>
>
>
>
>
>
>
> *Stephen Hesketh Reporting Shared Services, NatWest Markets*
>
> 250 Bishopsgate, London EC2M 4AA
> 
>
> Office:
> 
> +44
> 
> (0)20 7678 1482 (internal 381482) | Mobile: +44 (0)7968 039848
>
>
> **
> NatWest Markets is a marketing name of The Royal Bank of Scotland plc.
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for distribution
> to, or use by any person or entity in any jurisdiction or country where
> such distribution or use would be contrary to local law or regulation. The
> Royal Bank Of Scotland plc or any affiliated entity ("RBS") accepts no
> responsibility for any changes made to this message after it was sent.
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a product
> or service, confirmation of any transaction, a valuation, indicative price
> or an official statement. This communication has been prepared by the RBS
> trading desk, which may have a position or interest in the products or
> services mentioned that is inconsistent with any views expressed in this
> message. In evaluating the information contained in this message, you
> should know that it could have been previously provided to other clients
> and/or internal RBS personnel, who could have already acted on it.
> RBS cannot provide absolute assurances that all electronic communications
> (sent or received) are secure, error free, not corrupted, incomplete or
> virus free and/or that they will not be lost, mis-delivered, destroyed,
> delayed or intercepted/decrypted by others. Therefore RBS disclaims all
> liability with regards to electronic communications (and the contents
> therein) if they are corrupted, lost destroyed, delayed, incomplete,
> mis-delivered, intercepted, decrypted or otherwise misappropriated by
> others.
> Any electronic communication that is conducted within or through RBS
> systems will be subject to being archived, monitored and produced to
> regulators and in litigation in accordance with RBS's policy and local
> laws, rules and regulations. Unless expressly prohibited by local law,
> electronic communications may be archived in countries other than the
> country in which you are located, and may be treated in accordance with the
> laws and regulations of the country of each individual included in the
> entire chain.
> Copyright 2014 The Royal Bank of Scotland plc. All rights reserved. See
> http://www.natwestmarkets.com/legal/s-t-discl.html for further risk
> disclosure.
> **
>


data enrichment with SQL use case

2018-04-15 Thread miki haiat
Hi,

I have a case of meta data enrichment and im wondering if my approach is
the correct way .

   1. input stream from kafka.
   2. MD in msSQL .
   3. map to new pojo

I need to extract  a key from the kafka stream   and use it to select some
values from the sql table  .

SO i thought  to use  the table SQL api in order to select the table MD
then convert the kafka stream to table and join the data by  the stream key
.

At the end i need to map the joined data to a new POJO and send it to
elesticserch .

Any suggestions or different ways to solve this use case ?

thanks,
Miki


Re: Issue in Flink/Zookeeper authentication via Kerberos

2018-04-15 Thread Eron Wright
I believe that the solution here is to ensure that the znodes created by
Flink have an ACL that allows access only to the original creator.   For
example, if a given Flink job has a Kerberos identity of "us...@example.com",
it should set the znode ACL appropriately to disallow access to any client
that doesn't successfully authenticate as that user.  This may be
accomplished with the following Flink configuration setting:

high-availability.zookeeper.client.acl: creator

Some code links:
-
https://github.com/apache/flink/blob/release-1.4.2/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L171
-
https://github.com/apache/flink/blob/release-1.4.2/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L93

Hope this helps!
Eron

On Sun, Apr 15, 2018 at 2:16 AM, Sahu, Sarthak 1. (Nokia - IN/Bangalore) <
sarthak.1.s...@nokia.com> wrote:

> Glad to get the reply. With wrong Kerberos information I am expecting an
> ‘access denied’.
>
>
>
> As per flink log, it clear states that authentication failed due to
> Kerberos wrong information and trying to connect with zookeeper with
> unauthorised mode if zookeeper allows.
>
> And then it connected successfully!
>
>
>
> Do I missing any configuration in flink/zookeeper side.
>
> Expecting you suggestion here.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
> *From:* Eron Wright [mailto:eronwri...@gmail.com]
> *Sent:* Tuesday, April 3, 2018 3:07 AM
> *To:* Sahu, Sarthak 1. (Nokia - IN/Bangalore) 
> *Cc:* suez1...@gmail.com; Timo Walther 
>
> *Subject:* Re: Issue in Flink/Zookeeper authentication via Kerberos
>
>
>
> Hello, I'm happy to help.  Could you elaborate on the issue that you see?
> Are you saying that you expect to get 'access denied' but Zookeeper is
> allowing the connection anyway?
>
>
>
> My first thought is, maybe ZK allows unauthenticated connections but
> relies on the authorization layer to deny access to nodes based on the
> ACL.   FLink has a configuration setting to set the 'owner' of the znode.
>
>
>
> -Eron
>
>
>
> On Mon, Apr 2, 2018 at 1:50 AM, Sahu, Sarthak 1. (Nokia - IN/Bangalore) <
> sarthak.1.s...@nokia.com> wrote:
>
> Hi Eron/Shuyi
>
>
>
> Could you please help me on this below issue.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
> *From:* Timo Walther [mailto:twal...@apache.org]
> *Sent:* Monday, March 26, 2018 3:05 PM
> *To:* user@flink.apache.org
> *Cc:* eronwri...@gmail.com; suez1...@gmail.com
> *Subject:* Re: Issue in Flink/Zookeeper authentication via Kerberos
>
>
>
> Hi Sarthak,
>
> I'm not a Kerberos expert but maybe Eron or Shuyi are more familiar with
> the details?
>
> Would be great if somebody could help.
>
> Thanks,
> Timo
>
> Am 22.03.18 um 10:16 schrieb Sahu, Sarthak 1. (Nokia - IN/Bangalore):
>
> Hi Folks,
>
>
>
>   *Environment Setup:*
>
>1. I have configured KDC 5 server.
>2. Configured Kerberos in zookeeper-3.4.10 wherein I can able to
>connect ZooKeeper Server/Client via Kerberos authentication.
>3. Now flink-1.4.0 has configured for Kerberos authentication as per
>below instruction.
>
> ·   https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/config.html#kerberos-based-security
>
> ·   https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/ops/config.html#kerberos-based-security-1
>
>   *Success Scenario:*
>
>1. All Kerberos configuration parameter is correct and flink/zookeeper
>able to connect trough TGT.
>
>  *Problem:*
>
>1. Even if wrong Kerberos credentials given, flink able to connect
>ZooKeeper.
>
>
>
> Please find the taskmanager/jobmanger logs and flink config file for both
> scenario attached.
>
>
>
> Hoping for quick resolution.
>
>
>
> Regards
>
> Sarthak Sahu
>
>
>
>
>
>
>


Tiemrs and restore

2018-04-15 Thread Alberto Mancini
Hello,
according to this stackoverflow response
https://stackoverflow.com/questions/36306136/will-apache-flink-restore-trigger-timers-after-failure
IIUC we should expect that after a restore the timers will be not executed
until a new timer is scheduled.
I wonder if this is still true and if there is any chance of forcing the
restart of the timer task.

Thank you.

Regards,
   Alberto.


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-15 Thread Chesnay Schepler

Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to 
finish.


To further debug this I would look into what the preceding operators are 
doing, whether they are blocked on something or are emitting records 
(which you can check in the UI/metrics).


On 15.04.2018 18:40, Miguel Coimbra wrote:

​Hello,

I am running into a situation where the Flink threads responsible for 
my operator execution are all stuck on WAITING mode.

Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz 
GenuineIntel GNU/Linux

256 GB RAM

I am running in local mode on a machine with a considerable amount of 
memory, so perhaps that may be triggering some execution edge-case?


Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT 
with LocalEnvironment on this large-memory machine, with parallelism 
set to one:


Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) 
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential 
jobs (any data dependency between jobs is flushed to disk on job /i 
/and read back from disk into a DataSet in job /i + 1/).
To reiterate, I am not launching a Flink cluster, I am just executing 
in local mode from a code base compiled with Maven.


I have tested this program via mvn exec:exec with different values of 
memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the 
result is always the same: the process' memory fills up completely and 
then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an 
OutOfMemoryError.


I have debugged with IntelliJ IDEA and obtained thread dumps from 
different executions, and realized quite a few operator threads are 
stuck on java.lang.Thread.State: WAITING.


There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

*Number 1:

*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine 
(Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 
tid=0xd93 nid=NA waiting

  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io 
.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io 
.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
  at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
  at 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
  at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
  at 
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
  at 
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
  at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
  at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" 
prio=5 tid=0xd8e nid=NA waiting

  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io 
.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io 

Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-15 Thread Miguel Coimbra
​Hello,

I am running into a situation where the Flink threads responsible for my
operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of
memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
with LocalEnvironment
on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().
enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs
(any data dependency between jobs is flushed to disk on job *i *and read
back from disk into a DataSet in job *i + 1*).
To reiterate, I am not launching a Flink cluster, I am just executing in
local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of
memory (from -Xmx2m to -Xmx12m, from 20GB to 120GB) and the result
is always the same: the process' memory fills up completely and then the
process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an
OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different
executions, and realized quite a few operator threads are stuck on
java.lang.Thread.State:
WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:



*Number 1:*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5
tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
  at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
  at org.apache.flink.runtime.operators.hash.
ReusingBuildSecondHashJoinIterator.callWithNextKey(
ReusingBuildSecondHashJoinIterator.java:122)
  at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
  at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
  at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
  at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at java.lang.Object.wait(Object.java:502)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
  at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
  at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
  at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
  at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
  at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
  at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
  at org.apache.flink.runtime.operators.hash.MutableHashTable.