Re: Rocksdb to filesystem state migration errors

2019-03-14 Thread Lakshmi Gururaja Rao
Thanks for pointing me to the JIRA, Congxian.

On Thu, Mar 14, 2019 at 6:14 PM Congxian Qiu  wrote:

> Hi Lakshmi
>
> Currently, we can’t switch between rocksdb and filesystem backend using
> savepoint, there is an issue to fix this[1].
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11254
>
>
> Best,
> Congxian
>
>
> Lakshmi Gururaja Rao  于2019年3月15日周五 上午8:07写道:
>
>> Hey all,
>>
>> I'm trying to do a state migration from rocksdb --> filesystem backend.
>> The approach I'm taking here is:
>> 1) Cancel job with savepoint while its running on rocksdb
>> 2) Update the job/cluster with filesystem as the state backend
>> 3) Submit a job with the previous rocksdb savepoint
>>
>> From what I understand about savepoints, this should work out of the box?
>> However, it works in some cases but fails in others. Specifically, whenever
>> there's a job with user managed state, for e.g., a Process Function with a
>> ValueState, it throws the following error:
>>
>> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>>  at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:418)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
>>  at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>>  at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>>
>>
>>
>> The error specifically comes from a precondition check in
>> HeapKeyedStateBackend
>> 
>>  .
>> On doing some debugging, I find that the value of writtenKeyGroupIndex
>> 
>>  always
>> evaluates to 0, thus failing the check.
>>
>> Has anyone run into this issue before?
>>
>> Thanks
>> Lakshmi
>>
>

-- 
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] 


Where does the logs in Flink GUI's Exception tab come from?

2019-03-14 Thread Averell
Hi everyone,

I am running Flink in EMR YARN cluster, and when the job failed and
restarted, I could see some logs in the Exception tab of Flink GUI.

 

I could not find this piece of logs on my cluster's hard-disk - not in TM or
JM logs.

Where can I find this?

Thanks.

Here below is my logback.xml. I'm not sure it has anything to do with my
question.



${log.file}
false

%d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level
%logger{60} %X{sourceThread} - %msg%n
















 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What should I take care if I enable object reuse

2019-03-14 Thread Kurt Young
Keep one thing in mind: if you want the element remains legal after the
function call ends (maybe map(), flatmap(), depends on what you are using),
you should copy the elements.
Typical scenarios includes:
1. Save the elements into some collection like array, list, map for later
usage, you should copy it explicitly.
2. Pass the element into some async calls, you should copy it.

Best,
Kurt


On Fri, Mar 15, 2019 at 8:45 AM yinhua.dai  wrote:

> Hi Elias,
>
> Thanks.
> Would it be good enough as long as we use always use different object when
> call the Collector.collect() method in the operator?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Set partition number of Flink DataSet

2019-03-14 Thread qi luo
Hi Ken,

That looks awesome! I’ve implemented something similar to your bucketing sink, 
but using multiple internal writers rather than multiple internal output.

Besides this, I’m also curious whether Flink can achieve this like Spark: allow 
user to specify partition number in partitionBy() method (so no multiple output 
formats are needed). But this seems to need non-trivial changes in Flink core.

Thanks,
Qi

> On Mar 15, 2019, at 2:36 AM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
> See https://github.com/ScaleUnlimited/flink-utils/ 
> , for a rough but working 
> version of a bucketing sink.
> 
> — Ken
> 
> 
>> On Mar 13, 2019, at 7:46 PM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
>> may also try sortPartition() so each sink could write files one by one. 
>> Looking forward to your solution. :)
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
 On Mar 13, 2019, at 1:26 AM, qi luo >>> > wrote:
 
 Hi Ken,
 
 Do you mean that I can create a batch sink which writes to N files? 
>>> 
>>> Correct.
>>> 
 That sounds viable, but since our data size is huge (billions of records & 
 thousands of files), the performance may be unacceptable. 
>>> 
>>> The main issue with performance (actually memory usage) is how many 
>>> OutputFormats do you need to have open at the same time.
>>> 
>>> If you partition by the same key that’s used to define buckets, then the 
>>> max number is less, as each parallel instance of the sink only gets a 
>>> unique subset of all possible bucket values.
>>> 
>>> I’m actually dealing with something similar now, so I might have a solution 
>>> to share soon.
>>> 
>>> — Ken
>>> 
>>> 
 I will check Blink and give it a try anyway.
 
 Thank you,
 Qi
 
> On Mar 12, 2019, at 11:58 PM, Ken Krugler  > wrote:
> 
> Hi Qi,
> 
> If I understand what you’re trying to do, then this sounds like a 
> variation of a bucketing sink.
> 
> That typically uses a field value to create a directory path or a file 
> name (though the filename case is only viable when the field is also 
> what’s used to partition the data)
> 
> But I don’t believe Flink has built-in support for that, in batch mode 
> (see BucketingSink 
> 
>  for streaming).
> 
> Maybe Blink has added that? Hoping someone who knows that codebase can 
> chime in here.
> 
> Otherwise you’ll need to create a custom sink to implement the desired 
> behavior - though abusing a MapPartitionFunction 
> 
>  would be easiest, I think.
> 
> — Ken
> 
> 
> 
>> On Mar 12, 2019, at 2:28 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply. I may not make myself clear: our problem is not 
>> about reading but rather writing. 
>> 
>> We need to write to N files based on key partitioning. We have to use 
>> setParallelism() to set the output partition/file number, but when the 
>> partition number is too large (~100K), the parallelism would be too 
>> high. Is there any other way to achieve this?
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> I’m guessing you’re calling createInput() for each input file.
>>> 
>>> If so, then instead you want to do something like:
>>> 
>>> Job job = Job.getInstance();
>>> 
>>> for each file…
>>> FileInputFormat.addInputPath(job, new 
>>> org.apache.hadoop.fs.Path(file path));
>>> 
>>> env.createInput(HadoopInputs.createHadoopInput(…, job)
>>> 
>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>> given the parallelism that you’re specifying.
>>> 
>>> — Ken
>>> 
>>> 
 On Mar 11, 2019, at 5:42 AM, qi luo >>> > wrote:
 
 Hi,
 
 We’re trying to distribute batch input data to (N) HDFS files 
 partitioning by hash using DataSet API. What I’m doing is like:
 
 env.createInput(…)
   .partitionByHash(0)
   .setParallelism(N)
   .output(…)
 
 This works well for small number of files. But when we need to 
 distribute to 

Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Kumar Bolar, Harshith
Hi Gary,

An update. I noticed the line “–host cluster” in the program arguments section 
of the job manager logs. So, I commented the following section in 
jobmanager.sh, the task manager is now able to connect to job manager without 
issues.

  if [ ! -z $HOST ]; then
args+=("--host")
args+=("${HOST}")
fi


Task manager logs after commenting those lines:


2019-03-14 22:31:02,863 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/taskmanager_0 .
2019-03-14 22:31:02,875 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-03-14 22:31:02,876 INFO  
org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job 
leader service.
2019-03-14 22:31:02,877 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-12d5905f-d694-46f6-9359-3a636188b008
2019-03-14 22:31:02,884 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://fl...@flink0-1.flink1.us-east-1.high.ue1.non.aws.cloud.arity.com:28945/user/resourcemanager(8583b335fd08a30a89585b7af07e4213).
2019-03-14 22:31:03,109 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-03-14 22:31:03,110 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-03-14 22:31:03,228 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 2 (timeout=200ms)
2019-03-14 22:31:03,266 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager 
akka.tcp://fl...@flink0-1.flink1.us-east-1.abc.com:28945/user/resourcemanager 
under registration id 170ee6a00f80ee02ead0e88710093d77.


Thanks,
Harshith

From: Harshith Kumar Bolar 
Date: Friday, 15 March 2019 at 7:38 AM
To: Gary Yao 
Cc: user 
Subject: Re: [External] Re: Re: Flink 1.7.2: Task Manager not able to connect 
to Job Manager

Hi Gary,

Here are the full job manager and task manager logs. In the job manager logs, I 
see it says “starting StandaloneSessionClusterEntrypoint”, whereas in Flink 
1.4.2, it used to say “starting JobManager”. Is this correct?

Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
(https://paste(.)ubuntu(.)com/p/DCVzsQdpHq 
/)
Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ 
(https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)

Thanks,
Harshith

From: Gary Yao 
Date: Thursday, 14 March 2019 at 10:11 PM
To: Harshith Kumar Bolar 
Cc: user 
Subject: [External] Re: Re: Flink 1.7.2: Task Manager not able to connect to 
Job Manager

Hi Harshith,

The truncated log is not enough. Can you share the complete logs? If that's
not possible, I'd like to see the beginning of the log files where the cluster
configuration is logged.

The TaskManager tries to connect to the leader that is advertised in
ZooKeeper. In your case the "cluster" hostname is advertised which hints a
problem in your Flink configuration.

Best,
Gary

On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi Gary,

I’ve attached the relevant portions of the JM and TM logs.

Job Manager Logs:

2019-03-14 11:38:28,257 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component log file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.log
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component stdout file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.out
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest endpoint 
listening at cluster:8080
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2019-03-14 11:38:28,574 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web frontend 
listening at 
http://cluster:8080.
2019-03-14 11:38:28,613 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint 

Re: Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Kumar Bolar, Harshith
Hi Gary,

Here are the full job manager and task manager logs. In the job manager logs, I 
see it says “starting StandaloneSessionClusterEntrypoint”, whereas in Flink 
1.4.2, it used to say “starting JobManager”. Is this correct?

Job manager logs: https://paste.ubuntu.com/p/DCVzsQdpHq/ 
(https://paste(.)ubuntu(.)com/p/DCVzsQdpHq 
/)
Task Manager logs: https://paste.ubuntu.com/p/wbvYFZxdT8/ 
(https://paste(.)ubuntu(.)com/p/wbvYFZxdT8/)

Thanks,
Harshith

From: Gary Yao 
Date: Thursday, 14 March 2019 at 10:11 PM
To: Harshith Kumar Bolar 
Cc: user 
Subject: [External] Re: Re: Flink 1.7.2: Task Manager not able to connect to 
Job Manager

Hi Harshith,

The truncated log is not enough. Can you share the complete logs? If that's
not possible, I'd like to see the beginning of the log files where the cluster
configuration is logged.

The TaskManager tries to connect to the leader that is advertised in
ZooKeeper. In your case the "cluster" hostname is advertised which hints a
problem in your Flink configuration.

Best,
Gary

On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi Gary,

I’ve attached the relevant portions of the JM and TM logs.

Job Manager Logs:

2019-03-14 11:38:28,257 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component log file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.log
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component stdout file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.out
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest endpoint 
listening at cluster:8080
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2019-03-14 11:38:28,574 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web frontend 
listening at 
http://cluster:8080.
2019-03-14 11:38:28,613 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager .
2019-03-14 11:38:28,674 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher .
2019-03-14 11:38:28,691 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2019-03-14 11:38:28,694 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-03-14 11:38:28,698 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2019-03-14 11:38:28,700 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-14 11:38:28,818 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system [akka.tcp://flink@cluster:22671] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@cluster:22671]] Caused by: [cluster]
2019-03-14 11:39:09,010 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://cluster:8080
 was granted leadership with 
leaderSessionID=bbe408fc-ef93-4328-abeb-85323db7aef7
2019-03-14 11:39:09,010 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
ResourceManager akka.tcp://flink@cluster:31794/user/resourcemanager was granted 
leadership with fencing token ae4c0d30d0d65a0c41565360667e48fb
2019-03-14 11:39:09,011 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting 
the SlotManager.
2019-03-14 11:39:09,012 INFO  
org.apache.flink.runtime.dispatcher.Standalon

Re: S3 parquet sink - failed with S3 connection exception

2019-03-14 Thread Averell
Hi Kostas and everyone,

I tried to change setFailOnCheckpointingErrors from True to False, and got
the following trace in Flink GUI when the checkpoint/uploading failed. Not
sure whether it would be of any help in identifying the issue.

BTW, could you please help tell where to find the log file that Flink GUI's
Exception tab is reading from?

Thanks and regards,
Averell

java.lang.ArrayIndexOutOfBoundsException: 122626
at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainLongDictionaryValuesWriter.fallBackDictionaryEncodedData(DictionaryValuesWriter.java:397)
at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.fallBackAllValuesTo(DictionaryValuesWriter.java:130)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.fallBack(FallbackValuesWriter.java:153)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.checkFallback(FallbackValuesWriter.java:147)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeLong(FallbackValuesWriter.java:181)
at
org.apache.parquet.column.impl.ColumnWriterV1.write(ColumnWriterV1.java:228)
at
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addLong(MessageColumnIO.java:449)
at
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:327)
at
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rocksdb to filesystem state migration errors

2019-03-14 Thread Congxian Qiu
Hi Lakshmi

Currently, we can’t switch between rocksdb and filesystem backend using
savepoint, there is an issue to fix this[1].


[1] https://issues.apache.org/jira/browse/FLINK-11254


Best,
Congxian


Lakshmi Gururaja Rao  于2019年3月15日周五 上午8:07写道:

> Hey all,
>
> I'm trying to do a state migration from rocksdb --> filesystem backend.
> The approach I'm taking here is:
> 1) Cancel job with savepoint while its running on rocksdb
> 2) Update the job/cluster with filesystem as the state backend
> 3) Submit a job with the previous rocksdb savepoint
>
> From what I understand about savepoints, this should work out of the box?
> However, it works in some cases but fails in others. Specifically, whenever
> there's a job with user managed state, for e.g., a Process Function with a
> ValueState, it throws the following error:
>
> Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:418)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>
>
>
> The error specifically comes from a precondition check in
> HeapKeyedStateBackend
> 
>  .
> On doing some debugging, I find that the value of writtenKeyGroupIndex
> 
>  always
> evaluates to 0, thus failing the check.
>
> Has anyone run into this issue before?
>
> Thanks
> Lakshmi
>


Re: What should I take care if I enable object reuse

2019-03-14 Thread yinhua.dai
Hi Elias,

Thanks.
Would it be good enough as long as we use always use different object when
call the Collector.collect() method in the operator?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Rocksdb to filesystem state migration errors

2019-03-14 Thread Lakshmi Gururaja Rao
Hey all,

I'm trying to do a state migration from rocksdb --> filesystem backend. The
approach I'm taking here is:
1) Cancel job with savepoint while its running on rocksdb
2) Update the job/cluster with filesystem as the state backend
3) Submit a job with the previous rocksdb savepoint

>From what I understand about savepoints, this should work out of the box?
However, it works in some cases but fails in others. Specifically, whenever
there's a job with user managed state, for e.g., a Process Function with a
ValueState, it throws the following error:

Caused by: java.lang.IllegalStateException: Unexpected key-group in restore.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:418)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)



The error specifically comes from a precondition check in
HeapKeyedStateBackend

.
On doing some debugging, I find that the value of writtenKeyGroupIndex

always
evaluates to 0, thus failing the check.

Has anyone run into this issue before?

Thanks
Lakshmi


Re: What should I take care if I enable object reuse

2019-03-14 Thread Elias Levy
Avoid mutating the object if you keep a reference to it within an operator.

On Wed, Mar 13, 2019 at 11:51 PM yinhua.dai  wrote:

> Hi Community,
>
> I saw from the document that we need to be careful about enable the object
> reuse feature.
> So which part should I check to avoid any issues? Can any one help to
> summarize?
> Thank you.
>
> //
> *enableObjectReuse() / disableObjectReuse()* By default, objects are not
> reused in Flink. Enabling the object reuse mode will instruct the runtime
> to
> reuse user objects for better performance. Keep in mind that this can lead
> to bugs when the user-code function of an operation is not aware of this
> behavior.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Set partition number of Flink DataSet

2019-03-14 Thread Ken Krugler
Hi Qi,

See https://github.com/ScaleUnlimited/flink-utils/ 
, for a rough but working 
version of a bucketing sink.

— Ken


> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
> 
> Hi Ken,
> 
> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
> may also try sortPartition() so each sink could write files one by one. 
> Looking forward to your solution. :)
> 
> Thanks,
> Qi
> 
>> On Mar 14, 2019, at 2:54 AM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>>> On Mar 13, 2019, at 1:26 AM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Do you mean that I can create a batch sink which writes to N files? 
>> 
>> Correct.
>> 
>>> That sounds viable, but since our data size is huge (billions of records & 
>>> thousands of files), the performance may be unacceptable. 
>> 
>> The main issue with performance (actually memory usage) is how many 
>> OutputFormats do you need to have open at the same time.
>> 
>> If you partition by the same key that’s used to define buckets, then the max 
>> number is less, as each parallel instance of the sink only gets a unique 
>> subset of all possible bucket values.
>> 
>> I’m actually dealing with something similar now, so I might have a solution 
>> to share soon.
>> 
>> — Ken
>> 
>> 
>>> I will check Blink and give it a try anyway.
>>> 
>>> Thank you,
>>> Qi
>>> 
 On Mar 12, 2019, at 11:58 PM, Ken Krugler >>> > wrote:
 
 Hi Qi,
 
 If I understand what you’re trying to do, then this sounds like a 
 variation of a bucketing sink.
 
 That typically uses a field value to create a directory path or a file 
 name (though the filename case is only viable when the field is also 
 what’s used to partition the data)
 
 But I don’t believe Flink has built-in support for that, in batch mode 
 (see BucketingSink 
 
  for streaming).
 
 Maybe Blink has added that? Hoping someone who knows that codebase can 
 chime in here.
 
 Otherwise you’ll need to create a custom sink to implement the desired 
 behavior - though abusing a MapPartitionFunction 
 
  would be easiest, I think.
 
 — Ken
 
 
 
> On Mar 12, 2019, at 2:28 AM, qi luo  > wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not 
> about reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. 
> Is there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>  Job job = Job.getInstance();
>> 
>>  for each file…
>>  FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>> given the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo >> > wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files 
>>> partitioning by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>   .partitionByHash(0)
>>>   .setParallelism(N)
>>>   .output(…)
>>> 
>>> This works well for small number of files. But when we need to 
>>> distribute to large number of files (say 100K), the parallelism becomes 
>>> too large and we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control 
>>> the parallelism separately (using dynamic allocation). Is there 
>>> anything similar in Flink or other way we can achieve similar result? 
>>> Thank you!
>>> 
>>> Qi
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com 
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 
 
 

Re: Technical consulting resources

2019-03-14 Thread Oytun Tez
Here is a better link from a service buyer perspective: https://touk.pl/

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Mar 14, 2019 at 1:02 PM Oytun Tez  wrote:

> Hi Ron,
>
> I've been experimenting with Nussknacker, a Flink application, which is
> built by TouK: https://touk.pl/esp/
>
> It looks like they have experience with large deployments (telecom), so
> they may be helpful. I am CC'ing 2 engineers from their team.
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Mar 14, 2019 at 12:48 PM Ron Crocker 
> wrote:
>
>> I have a team that I’m working with that’s looking for some Flink
>> consulting. They’re running on an older version (1.4) and are looking for
>> help with upgrading to a later version and even a path for staying
>> current). There are some complications of our deployment model that make
>> this harder than it needs to be, so being comfortable with the deployment
>> details of a flink cluster is helpful.
>>
>> This team is in Portland Oregon but you can likely work with us remotely
>> as well.
>>
>> You can contact me directly or share with this list with your
>> recommendations.
>>
>> Thanks in advance
>>
>> Ron
>> —
>> Ron Crocker
>> Distinguished Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>>
>>


Re: Technical consulting resources

2019-03-14 Thread Oytun Tez
Hi Ron,

I've been experimenting with Nussknacker, a Flink application, which is
built by TouK: https://touk.pl/esp/

It looks like they have experience with large deployments (telecom), so
they may be helpful. I am CC'ing 2 engineers from their team.



---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Mar 14, 2019 at 12:48 PM Ron Crocker  wrote:

> I have a team that I’m working with that’s looking for some Flink
> consulting. They’re running on an older version (1.4) and are looking for
> help with upgrading to a later version and even a path for staying
> current). There are some complications of our deployment model that make
> this harder than it needs to be, so being comfortable with the deployment
> details of a flink cluster is helpful.
>
> This team is in Portland Oregon but you can likely work with us remotely
> as well.
>
> You can contact me directly or share with this list with your
> recommendations.
>
> Thanks in advance
>
> Ron
> —
> Ron Crocker
> Distinguished Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
>
>


Technical consulting resources

2019-03-14 Thread Ron Crocker
I have a team that I’m working with that’s looking for some Flink consulting. 
They’re running on an older version (1.4) and are looking for help with 
upgrading to a later version and even a path for staying current). There are 
some complications of our deployment model that make this harder than it needs 
to be, so being comfortable with the deployment details of a flink cluster is 
helpful.

This team is in Portland Oregon but you can likely work with us remotely as 
well.

You can contact me directly or share with this list with your recommendations.

Thanks in advance

Ron
—
Ron Crocker
Distinguished Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com



Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Gary Yao
Hi Harshith,

The truncated log is not enough. Can you share the complete logs? If that's
not possible, I'd like to see the beginning of the log files where the
cluster
configuration is logged.

The TaskManager tries to connect to the leader that is advertised in
ZooKeeper. In your case the "cluster" hostname is advertised which hints a
problem in your Flink configuration.

Best,
Gary

On Thu, Mar 14, 2019 at 4:54 PM Kumar Bolar, Harshith 
wrote:

> Hi Gary,
>
>
>
> I’ve attached the relevant portions of the JM and TM logs.
>
>
>
> *Job Manager Logs:*
>
> 2019-03-14 11:38:28,257 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
> - State change: CONNECTED
> 2019-03-14 11:38:28,309 INFO
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined
> location of main cluster component log file:
> /opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.log
> 2019-03-14 11:38:28,309 INFO
> org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined
> location of main cluster component stdout file:
> /opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.out
> 2019-03-14 11:38:28,527 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest
> endpoint listening at cluster:8080
> 2019-03-14 11:38:28,527 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
> 2019-03-14 11:38:28,574 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web
> frontend listening at http://cluster:8080.
> 2019-03-14 11:38:28,613 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
> akka://flink/user/resourcemanager .
> 2019-03-14 11:38:28,674 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/dispatcher .
> 2019-03-14 11:38:28,691 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> 2019-03-14 11:38:28,694 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2019-03-14 11:38:28,698 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
> 2019-03-14 11:38:28,700 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> 2019-03-14 11:38:28,818 WARN
> akka.remote.ReliableDeliverySupervisor- Association
> with remote system [akka.tcp://flink@cluster:22671] has failed, address
> is now gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@cluster:22671]] Caused by: [cluster]
> 2019-03-14 11:39:09,010 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint-
> http://cluster:8080 was granted leadership with
> leaderSessionID=bbe408fc-ef93-4328-abeb-85323db7aef7
> 2019-03-14 11:39:09,010 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> ResourceManager akka.tcp://flink@cluster:31794/user/resourcemanager was
> granted leadership with fencing token ae4c0d30d0d65a0c41565360667e48fb
> 2019-03-14 11:39:09,011 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
> Starting the SlotManager.
> 2019-03-14 11:39:09,012 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
> akka.tcp://flink@cluster:31794/user/dispatcher was granted leadership
> with fencing token c852ada2-5fd4-4ff8-80ab-c2cdd85a75d9
> 2019-03-14 11:39:09,017 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering
> all persisted jobs.
>
> *Task Manager Logs:*
>
> 2019-03-14 11:42:35,790 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager
> uses directory /tmp/flink-io-a7bc246d-bae4-489f-9c9c-f6a25d3c4b8f for spill
> files.
> 2019-03-14 11:42:35,820 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages
> have a max timeout of 1 ms
> 2019-03-14 11:42:35,839 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at
> akka://flink/user/taskmanager_0 .
> 2019-03-14 11:42:35,853 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2019-03-14 11:42:35,854 I

Re: Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Kumar Bolar, Harshith
Hi Gary,

I’ve attached the relevant portions of the JM and TM logs.

Job Manager Logs:

2019-03-14 11:38:28,257 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: CONNECTED
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component log file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.log
2019-03-14 11:38:28,309 INFO  
org.apache.flink.runtime.webmonitor.WebMonitorUtils   - Determined 
location of main cluster component stdout file: 
/opt/flink-1.7.2/log/flink-root-standalonesession-4-flink0-1.flink1.us-east-1.out
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Rest endpoint 
listening at cluster:8080
2019-03-14 11:38:28,527 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2019-03-14 11:38:28,574 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Web frontend 
listening at http://cluster:8080.
2019-03-14 11:38:28,613 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at 
akka://flink/user/resourcemanager .
2019-03-14 11:38:28,674 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher .
2019-03-14 11:38:28,691 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2019-03-14 11:38:28,694 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-03-14 11:38:28,698 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Starting ZooKeeperLeaderElectionService 
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2019-03-14 11:38:28,700 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-03-14 11:38:28,818 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system [akka.tcp://flink@cluster:22671] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@cluster:22671]] Caused by: [cluster]
2019-03-14 11:39:09,010 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://cluster:8080 was granted leadership with 
leaderSessionID=bbe408fc-ef93-4328-abeb-85323db7aef7
2019-03-14 11:39:09,010 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
ResourceManager akka.tcp://flink@cluster:31794/user/resourcemanager was granted 
leadership with fencing token ae4c0d30d0d65a0c41565360667e48fb
2019-03-14 11:39:09,011 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting 
the SlotManager.
2019-03-14 11:39:09,012 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher 
akka.tcp://flink@cluster:31794/user/dispatcher was granted leadership with 
fencing token c852ada2-5fd4-4ff8-80ab-c2cdd85a75d9
2019-03-14 11:39:09,017 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Recovering all 
persisted jobs.

Task Manager Logs:

2019-03-14 11:42:35,790 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
uses directory /tmp/flink-io-a7bc246d-bae4-489f-9c9c-f6a25d3c4b8f for spill 
files.
2019-03-14 11:42:35,820 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 1 ms
2019-03-14 11:42:35,839 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
- Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/taskmanager_0 .
2019-03-14 11:42:35,853 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-03-14 11:42:35,854 INFO  
org.apache.flink.runtime.taskexecutor.JobLeaderService- Start job 
leader service.
2019-03-14 11:42:35,855 INFO  org.apache.flink.runtime.filecache.FileCache  
- User file cache uses directory 
/tmp/flink-dist-cache-a7f67948-ab57-4cd9-b2a6-0361b53ecd26
2019-03-14 11:42:35,871 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://flink@cluster:31794/user/resourcemanager(ae4c0d30d0d65a0c41565360667e48fb).
2019-03-14 11:42:35,963 WARN  akka.remote.ReliableDeliverySuperv

Re: Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Gary Yao
Hi Harshith,

Can you share JM and TM logs?

Best,
Gary

On Thu, Mar 14, 2019 at 3:42 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> I'm trying to upgrade our Flink cluster from 1.4.2 to 1.7.2
>
>
>
> When I bring up the cluster, the task managers refuse to connect to the
> job managers with the following error.
>
>
>
> 2019-03-14 10:34:41,551 WARN
> akka.remote.ReliableDeliverySupervisor
>
> - Association with remote system [akka.tcp://flink@cluster:22671]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@cluster:22671]] Caused by: [cluster: Name or
> service not known]
>
>
>
> Now, this works correctly if I add the following line into
> the /etc/hosts file.
>
>
>
> x.x.x.x job-manager-address.com cluster
>
>
>
> Why is Flink 1.7.2 connecting to JM using cluster in the address? Flink
> 1.4.2 used to have the job manager's address instead of the word cluster.
>
>
>
> Thanks,
>
> Harshith
>
>
>


Flink 1.7.2: Task Manager not able to connect to Job Manager

2019-03-14 Thread Kumar Bolar, Harshith
Hi all,

I'm trying to upgrade our Flink cluster from 1.4.2 to 1.7.2

When I bring up the cluster, the task managers refuse to connect to the job 
managers with the following error.

2019-03-14 10:34:41,551 WARN  akka.remote.ReliableDeliverySupervisor
- Association with remote system [akka.tcp://flink@cluster:22671] has 
failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@cluster:22671]] Caused by: [cluster: Name or service not 
known]

Now, this works correctly if I add the following line into the /etc/hosts file.

x.x.x.x job-manager-address.com cluster

Why is Flink 1.7.2 connecting to JM using cluster in the address? Flink 1.4.2 
used to have the job manager's address instead of the word cluster.

Thanks,
Harshith



Re: [DISCUSS] Create a Flink ecosystem website

2019-03-14 Thread Becket Qin
Thanks for writing up the specifications.

Regarding the website source code, Austin found a website[1] whose frontend
code[2] is available publicly. It lacks some support (e.g login), but it is
still a good starting point. One thing is that I did not find a License
statement for that source code. I'll reach out to the author to see if they
have any concern over our usage.

Apache Infra has not replied to my email regarding some details about the
VM. I'll open an infra Jira ticket tomorrow if there is still no response.

Thanks,

Jiangjie (Becket) Qin

[1] https://emberobserver.com/
[2] https://github.com/emberobserver/client



On Thu, Mar 14, 2019 at 1:35 AM Robert Metzger  wrote:

> @Bowen: I agree. Confluent Hub looks nicer, but it is on their company
> website. I guess the likelihood that they give out code from their company
> website is fairly low.
> @Nils: Beam's page is similar to our Ecosystem page, which we'll
> reactivate as part of this PR:
> https://github.com/apache/flink-web/pull/187
>
> Spark-packages.org did not respond to my request.
> I will propose a short specification in Becket's initial document.
>
>
> On Mon, Mar 11, 2019 at 11:38 AM Niels Basjes  wrote:
>
>> Hi,
>>
>> The Beam project has something in this area that is simply a page
>> within their documentation website:
>> https://beam.apache.org/documentation/sdks/java-thirdparty/
>>
>> Niels Basjes
>>
>> On Fri, Mar 8, 2019 at 11:39 PM Bowen Li  wrote:
>> >
>> > Confluent hub for Kafka is another good example of this kind. I
>> personally like it over the spark site. May worth checking it out with
>> Kafka folks
>> >
>> > On Thu, Mar 7, 2019 at 6:06 AM Becket Qin  wrote:
>> >>
>> >> Absolutely! Thanks for the pointer. I'll submit a PR to update the
>> >> ecosystem page and the navigation.
>> >>
>> >> Thanks,
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On Thu, Mar 7, 2019 at 8:47 PM Robert Metzger 
>> wrote:
>> >>
>> >> > Okay. I will reach out to spark-packages.org and see if they are
>> willing
>> >> > to share.
>> >> >
>> >> > Do you want to raise a PR to update the ecosystem page (maybe sync
>> with
>> >> > the "Software Projects" listed here:
>> >> > https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink)
>> and
>> >> > link it in the navigation?
>> >> >
>> >> > Best,
>> >> > Robert
>> >> >
>> >> >
>> >> > On Thu, Mar 7, 2019 at 10:13 AM Becket Qin 
>> wrote:
>> >> >
>> >> >> Hi Robert,
>> >> >>
>> >> >> I think it at least worths checking if spark-packages.org owners
>> are
>> >> >> willing to share. Thanks for volunteering to write the requirement
>> >> >> descriptions! In any case, that will be very helpful.
>> >> >>
>> >> >> Since a static page has almost no cost, and we will need it to
>> redirect
>> >> >> to the dynamic site anyways, how about we first do that while
>> working on
>> >> >> the dynamic website?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Jiangjie (Becket) Qin
>> >> >>
>> >> >> On Thu, Mar 7, 2019 at 4:59 AM Ufuk Celebi  wrote:
>> >> >>
>> >> >>> I like Shaoxuan's idea to keep this a static site first. We could
>> then
>> >> >>> iterate on this and make it a dynamic thing. Of course, if we have
>> the
>> >> >>> resources in the community to quickly start with a dynamic site,
>> I'm
>> >> >>> not apposed.
>> >> >>>
>> >> >>> – Ufuk
>> >> >>>
>> >> >>> On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger > >
>> >> >>> wrote:
>> >> >>> >
>> >> >>> > Awesome! Thanks a lot for looking into this Becket! The VMs
>> hosted by
>> >> >>> Infra
>> >> >>> > look suitable.
>> >> >>> >
>> >> >>> > @Shaoxuan: There is actually already a static page. It used to be
>> >> >>> linked,
>> >> >>> > but has been removed from the navigation bar for some reason.
>> This is
>> >> >>> the
>> >> >>> > page: https://flink.apache.org/ecosystem.html
>> >> >>> > We could update the page and add it back to the navigation bar
>> for the
>> >> >>> > coming weeks. What do you think?
>> >> >>> >
>> >> >>> > I would actually like to push for a dynamic page right away.
>> >> >>> >
>> >> >>> > I know it's kind of a bold move, but how do you feel about
>> sending the
>> >> >>> > owners of spark-packages.org a short note, if they are
>> interested in
>> >> >>> > sharing the source? We could maintain the code together in a
>> public
>> >> >>> repo.
>> >> >>> > If they are not interested in sharing, or we decide not to ask
>> in the
>> >> >>> first
>> >> >>> > place, I'm happy to write down a short description of the
>> requirements,
>> >> >>> > maybe some mockups. We could then see if we find somebody here
>> in the
>> >> >>> > community who's willing to implement it.
>> >> >>> > Given the number of people who are eager to contribute, I
>> believe we
>> >> >>> will
>> >> >>> > be able to find somebody pretty soon.
>> >> >>> >
>> >> >>> >
>> >> >>> > On Wed, Mar 6, 2019 at 3:49 AM Becket Qin 
>> >> >>> wrote:
>> >> >>> >
>> >> >>> > > Forgot to provide the link...
>> >> >>> > >
>> >> >>> > > [1] https://www.apache.org/dev/services.html#blo

RE: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-14 Thread Papadopoulos, Konstantinos
It seems that Flink cluster does not retrieve program arguments correctly. For 
reference, I sent the following request:
Method Type: POST
URL: 
http://dbtpa05p.ch3.dev.i.com:9171/v1/jars/321febd8-a5e8-4255-858b-c221b49aef18_aa-lmx-etl-1.0.0.1-SNAPSHOT.jar/run
Body: {"programArgs" : 
"--job=mediaSpent,--initialScopeId=b494c35d-4c37-4338-8d23-0fc947bef690,--integratedScopeId=91769bd8-df4d-436c-b8d0-2e23ce862859,--projectId=333,--log.path=../log"}
Content-Type: application/json


From: Chesnay Schepler 
Sent: Πέμπτη, 14 Μαρτίου 2019 2:24 μμ
To: Papadopoulos, Konstantinos ; 
user@flink.apache.org
Subject: Re: ProgramInvocationException when trying to submit a job by running 
a jar using Monitoring REST API

Please enable debug logging, re-submit the job, check the JobManager logs and 
report back the logged exception.

On 14.03.2019 13:16, Papadopoulos, Konstantinos wrote:
Hi all,

As part of our projects, I experiment with Flink Monitoring REST API and, 
especially, its capabilities of uploading and running jar files.
When I am trying to submit one of our jobs by running a jar previously uploaded 
via '/jars/upload', I am getting an 500 Internal Server Error response with the 
following body:
{ "errors": [ "org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error." ] }
On the other hand, when I am trying the same with the 'Word Count' batch 
example, execution succeeds.
It must be pointed out that I tried to execute the respective POST request both 
via 'curl' and Postman, but both failed with the same error.

Do anyone have idea why this may happen?

Thanks in advance,
Konstantinos




Re: ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-14 Thread Chesnay Schepler
Please enable debug logging, re-submit the job, check the JobManager 
logs and report back the logged exception.


On 14.03.2019 13:16, Papadopoulos, Konstantinos wrote:


Hi all,

As part of our projects, I experiment with Flink Monitoring REST API 
and, especially, its capabilities of uploading and running jar files.


When I am trying to submit one of our jobs by running a jar previously 
uploaded via '/jars/upload', I am getting an 500 Internal Server Error 
response with the following body:


{ "errors": [ 
"org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error." ] }


On the other hand, when I am trying the same with the ‘Word Count’ 
batch example, execution succeeds.


It must be pointed out that I tried to execute the respective POST 
request both via ‘curl’ and Postman, but both failed with the same error.


Do anyone have idea why this may happen?

Thanks in advance,

Konstantinos





ProgramInvocationException when trying to submit a job by running a jar using Monitoring REST API

2019-03-14 Thread Papadopoulos, Konstantinos
Hi all,

As part of our projects, I experiment with Flink Monitoring REST API and, 
especially, its capabilities of uploading and running jar files.
When I am trying to submit one of our jobs by running a jar previously uploaded 
via '/jars/upload', I am getting an 500 Internal Server Error response with the 
following body:
{ "errors": [ "org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error." ] }
On the other hand, when I am trying the same with the 'Word Count' batch 
example, execution succeeds.
It must be pointed out that I tried to execute the respective POST request both 
via 'curl' and Postman, but both failed with the same error.

Do anyone have idea why this may happen?

Thanks in advance,
Konstantinos


Re: How to join stream and dimension data in Flink?

2019-03-14 Thread Piotr Nowojski
Hi Henry,

1. Also take a look at the regular joins limitations 
:

> However, this operation has an important implication: it requires to keep 
> both sides of the join input in Flink’s state forever. Thus, the resource 
> usage will grow
> indefinitely as well, if one or both input tables are continuously growing.

4. Our current grammar for temporal table joins is like a stop gap solution 
that is ANSI SQL complainant. Unfortunately SQL standard lags behind the 
streaming requirements and we are working on addressing this issue. [1]

5. It will be execute in a similar fashion how you would expect regular hash 
join to be executed - the “WHERE” join condition will be pushed into the 
temporal table join operator.

6. I don’t think that Flink supports the syntax suggested by Hequn. Currently 
outer joins are not supported with temporal tables.

Piotrek

[1] https://issues.apache.org/jira/browse/CALCITE-1917 



> On 14 Mar 2019, at 03:31, Hequn Cheng  wrote:
> 
> Hi Henry,
> 
> These are good questions! 
> I would rather not to add the temporal and lateral prefix in front of the 
> join. The temporal table is a concept orthogonal to join. We should say join 
> a temporal table or join a Lateral table. 
> 1. You can of course use stream-stream join. Introducing the temporal table 
> not only makes our query more simple but also improves performance. More 
> detail can be found in [1].
> 2. Both two joins based on the concept of temporal table, i.e., a table joins 
> a temporal table.
> 3. Yes, actually the join in Flink uses a lateral table&TemporalTableFunction 
> to implement a temporal table. A temporal table is a versioned table and a 
> lateral table is a table keeps references to the previous table. If you do 
> not want to use time version, you don't need the temporal table. 
> 4. It is a kind of join. The join keyword can be omitted if it is an inner 
> join. The grammar will not be changed in the near future. I haven't heard 
> some news about changing it.
> 5. Yes, it will be optimized. 
> 6. If you want to left join a temporal table. You can write sql like:
> 
> SELECT
>   o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>   Orders AS o
>   LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>   ON r.currency = o.currency
> 
> CC @Piotr Nowojski   Would be great to have 
> your opinions here.
> 
> Best,
> Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>  
> 
> 
> 
> On Wed, Mar 13, 2019 at 1:59 PM 徐涛  > wrote:
> Hi Hequn,
>   Thanks a lot for your answer! That is very helpful for me.
>   I still have some questions about stream and dimension data join and 
> temporal table join:  
>   1. I found the temporal table join is still a one stream driven join, I 
> do not know why the dimension data join has to be done by one stream driven 
> join, why it can not be done by two stream join(traditional stream-stream 
> join)?
>   I try to give an answer about it: two stream join is based on 
> the mechanism that is materialize two stream data in state, but the due to 
> state retention, the dimension data may be lost. I guess this is one reason, 
> am I correct?
>   2. Is Blink`s stream and dimension data join based on temporal table 
> join? 
> 3. I think lateral table join can also do dimension join if I do not 
> want to use time versioning. How to choose between temporal table join and 
> lateral table join?
>   4. I found that the temporal table join in Flink use a “LATERAL TABLE” 
> grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will the 
> community modify the grammar in future releases?
>   5. In the following temporal table join statement, will the Orders 
> table join Rates produce too many data before the where clause take effects? 
> Will it be optimized?
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency 
>   6. How to use temporal table join to do left join?
> 
> 
> Best
> Henry
> 
>> 在 2019年3月13日,上午12:02,Hequn Cheng > > 写道:
>> 
>> Hi Henry,
>> 
>> Yes, you are correct. Basically, there are two ways you can use to join a 
>> Temporal Table. One is provided in Flink and the other is provided in Blink 
>> which has been pushed as a branch[1] in Flink repo.
>> 
>> - Join a Temporal Table in Flink[2][3][4]
>> As the document said: it is a join with a temporal table joins an 
>> append-only table (left input/probe side) with a temporal table (right 
>> input/build side), i.e., a table that ch

Re: flink-io FileNotFoundException

2019-03-14 Thread Chesnay Schepler

Is there some periodic cleanup job running on your system for /tmp?

On 11.03.2019 10:38, Alexander Smirnov wrote:

Hi everybody,

I am using Flink 1.4.2 and periodically my job goes down with the 
following exception in logs. Relaunching the job does not help, only 
restarting the whole cluster.


Is there a JIRA problem for that? will upgrade to 1.5 help?

java.io.FileNotFoundException: 
/tmp/flink-io-20a15b29-1838-4de0-b383-165b1c49655c/c41ceb40a4eca0d0b739e0d1e2db45b9cc22d160ef25769993084a90e6a79b78.0.buffer 
(No such file or directory)

at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)
at 
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:149)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.(StreamTwoInputProcessor.java:147)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.init(TwoInputStreamTask.java:79)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:235)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Thank you,
Alex





How to sorted by flink sql?

2019-03-14 Thread 刘 文
hi all,
   Results are not sorted











package com.opensourceteams.module.bigdata.flink.example.sql.test

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment

object Run {



  def main(args: Array[String]): Unit = {


val env = ExecutionEnvironment.getExecutionEnvironment


val dataSet = 
env.fromElements(("a",15,"male"),("b",45,"female"),("d",25,"male"),("c",35,"female"))

val tableEnv = TableEnvironment.getTableEnvironment(env)

tableEnv.registerDataSet("user1",dataSet,'name,'age,'sex)


/**
  * default sort
  a,15,male
  b,45,female
  d,25,male
  c,35,female
  */


/**
  * sort by age  ,Results are not sorted
  * a,15,male
  * b,45,female
  * c,35,female
  * d,25,male
  */

tableEnv.sqlQuery("select name,age,sex FROM user1  order by age desc ")
  .first(100).print()


  }

}



Best,
thinktothings




Re: Custom Partitioner and Graph Algorithms

2019-03-14 Thread MBilal
I have added a working code example to the stackoverflow question that is 
representative of what I am using. The github repo can be found here: 
https://github.com/MBtech/graphtest

On 2019/03/13 09:46:52, MBilal  wrote: 
> Hi,
> 
> I am observing a behaviour in the task statistics that I don't fully 
> understand. 
> Essentially I have create a partitioner that assigns all the edges to a 
> single partition.
> I see imbalance (in terms of records sent/received) in the task statistics of 
> different instances of the same operator for the second and third stages. 
> But from fourth stage onwards, all operator instances are executing pretty 
> much the same number of records. I would have expected that the imbalance 
> would exist in those stages as well. 
> 
> Details of the my code and task statistics are in this stackoverflow question:
> https://stackoverflow.com/questions/55138553/behaviour-of-custom-partitioner-in-apache-flink
> 
> Thanks. 
> 
> - Bilal
> 
>