Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-24 Thread Steven Wu
Hi Andrey,

Weird that I didn't see your reply in my email inbox. My colleague happened
to see it in apache archive :)

nope, we didn't experience it with 1.4 (previous version)

Yes, we did use HA setup.

high-availability: zookeeper
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: ...
high-availability.zookeeper.path.latch: /leaderlatch
high-availability.zookeeper.path.leader: /leader
high-availability.zookeeper.path.jobgraphs: /jobgraphs
high-availability.zookeeper.path.checkpoints: /checkpoints
recovery.zookeeper.path.checkpoint-counter: /checkpoint-counter
high-availability.storageDir: ...


My colleague (Mark Cho) will provide some additional observations.

Thanks,
Steven


Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Wed, Jan 23, 2019 at 10:06 PM Steven Wu  wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-24 Thread Jark Wu
+1 for the leaner distribution and improve the "Download" page.

On Fri, 25 Jan 2019 at 01:54, Bowen Li  wrote:

> +1 for leaner distribution and a better 'download' webpage.
>
> +1 for a full distribution if we can automate it besides supporting the
> leaner one. If we support both, I'd image release managers should be able
> to package two distributions with a single change of parameter instead of
> manually package the full distribution. How to achieve that needs to be
> evaluated and discussed, probably can be something like 'mvn clean install
> -Dfull/-Dlean', I'm not sure yet.
>
>
> On Wed, Jan 23, 2019 at 10:11 AM Thomas Weise  wrote:
>
>> +1 for trimming the size by default and offering the fat distribution as
>> alternative download
>>
>>
>> On Wed, Jan 23, 2019 at 8:35 AM Till Rohrmann 
>> wrote:
>>
>>> Ufuk's proposal (having a lean default release and a user convenience
>>> tarball) sounds good to me. That way advanced users won't be bothered by
>>> an
>>> unnecessarily large release and new users can benefit from having many
>>> useful extensions bundled in one tarball.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:
>>>
>>> > On Wed, Jan 23, 2019 at 11:01 AM Timo Walther 
>>> wrote:
>>> > > I think what is more important than a big dist bundle is a helpful
>>> > > "Downloads" page where users can easily find available filesystems,
>>> > > connectors, metric repoters. Not everyone checks Maven central for
>>> > > available JAR files. I just saw that we added a "Optional components"
>>> > > section recently [1], we just need to make it more prominent. This is
>>> > > also done for the SQL connectors and formats [2].
>>> >
>>> > +1 I fully agree with the importance of the Downloads page. We
>>> > definitely need to make any optional dependencies that users need to
>>> > download easy to find.
>>> >
>>>
>>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-24 Thread jincheng sun
Hi Chesnay,

Thank you for the proposal. And i like it very much.

+1 for the leaner distribution.

About improve the "Download" page, I think we can add the connectors
download link in the  "Optional components" section which @Timo Walther
  mentioned above.


Regards,
Jincheng

Chesnay Schepler  于2019年1月18日周五 下午5:59写道:

> Hello,
>
> the binary distribution that we release by now contains quite a lot of
> optional components, including various filesystems, metric reporters and
> libraries. Most users will only use a fraction of these, and as such
> pretty much only increase the size of flink-dist.
>
> With Flink growing more and more in scope I don't believe it to be
> feasible to ship everything we have with every distribution, and instead
> suggest more of a "pick-what-you-need" model, where flink-dist is rather
> lean and additional components are downloaded separately and added by
> the user.
>
> This would primarily affect the /opt directory, but could also be
> extended to cover flink-dist. For example, the yarn and mesos code could
> be spliced out into separate jars that could be added to lib manually.
>
> Let me know what you think.
>
> Regards,
>
> Chesnay
>
>


Re: TimeZone shift problem in Flink SQL

2019-01-24 Thread Bowen Li
Hi,

Did you consider timezone in conversion in your UDF?


On Tue, Jan 22, 2019 at 5:29 AM 徐涛  wrote:

> Hi Experts,
> I have the following two UDFs,
> unix_timestamp:   transform from string to Timestamp, with the
> arguments (value:String, format:String), return Timestamp
>from_unixtime:transform from Timestamp to String, with the
> arguments (ts:Long, format:String), return String
>
>
> select
>  number,
>  ts,
>  from_unixtime(unix_timestamp(LAST_UPDATE_TIME, 'EEE MMM dd
> HH:mm:Ss z '),'-MM-dd')  as dt
>   from
>  test;
>
>  when the LAST_UPDATE_TIME value is "Tue Jan 22 21:03:12 CST 2019”,
> the unix_timestamp return a Timestamp with value 1548162182001.
>   but when from_unixtime is invoked, the timestamp with
> value 1548190982001 is passed in, there are 8 hours shift between them.
>   May I know why there are 8 hours shift between them, and how can I
> get the timestamp that are passed out originally from the first UDF without
> changing the code?
>   Thanks very much.
>
> Best
> Henry
>


Re: [Flink 1.6] How to get current total number of processed events

2019-01-24 Thread Congxian Qiu
Hi, Nhan
Do you want the total number of the current parallelism or the operator? If
you want the total number of the current parallelism, Is the operator
state[1] satisfied with your use case?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#operator-state

Kien Truong  于2019年1月24日周四 下午7:45写道:

> Hi Nhan,
>
> You can store the max/min value using the value states of a
> KeyedProcessFunction,
>
> or in the global state of a ProcessWindowFunction.
>
>
> On processing each item, compare its value to the current max/min and
> update the stored value as needed.
>
>
> Regards,
>
> Kien
>
>
> On 1/24/2019 12:37 AM, Thanh-Nhan Vo wrote:
>
> Hi Kien Truong,
>
>
> Thank you for your answer. I have another question, please !
> If I count the number of messages processed for a given key j (denoted
> c_j), is there a way to retrieve max{c_j}, min{c_j}?
>
> Thanks
>
>
>
> *De :* Kien Truong [mailto:duckientru...@gmail.com
> ]
> *Envoyé :* mercredi 23 janvier 2019 16:04
> *À :* user@flink.apache.org
> *Objet :* Re: [Flink 1.6] How to get current total number of processed
> events
>
>
>
> Hi Nhan,
>
> Logically, the total number of processed events before an event cannot be
> accurately calculated unless events processing are synchronized.
>
> This is not scalable, so naturally I don't think Flink supports it.
>
> Although, I suppose you can get an approximate count by using a non-keyed
> TumblingWindow, count the item inside the window, then use that value in
> the next window.
>
>
>
> Regards,
>
> Kien
>
>
>
> On 1/21/2019 9:34 PM, Thanh-Nhan Vo wrote:
>
> Hello all,
>
> I have a question, please !
> I’m using Flink 1.6 to process our data in streaming mode.
> I wonder if at a given event, there is a way to get the current total
> number of processed events (before this event).
>
> If possible, I want to get this total number of processed events as a
> value state in Keystream.
> It means that for a given key in KeyStream, I want to retrieve not only
> the total number of processed events for this key but also the total number
> of processed events for all keys.
>
> There is a way to do this in Flink 1.6, please!
>
> Best regard,
> Nhan
>
>
>
>

-- 
Best,
Congxian


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-24 Thread Guowei Ma
This may be caused by a  jvm process can only load a so once.So a triky way is 
to rename it。

发自我的 iPhone

> 在 2019年1月25日,上午7:12,Aaron Levin  写道:
> 
> Hi Ufuk,
> 
> Update: I've pinned down the issue. It's multiple classloaders loading 
> `libhadoop.so`:
> 
> ```
> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: 
> Native Library /usr/lib/libhadoop.so already loaded in another classloader
> ```
> 
> I'm not quite sure what the solution is. Ideally flink would destroy a 
> classloader when a job is canceled, but perhaps there's a jvm limitation 
> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as 
> suggested by Chesnay in the ticket) as I get the same error. I might see if I 
> can put a jar with `org.apache.hadoop.common.io.compress` in 
> `/flink/install/lib` and then remove it from my jar. It's not an ideal 
> solution but I can't think of anything else.
> 
> Best,
> 
> Aaron Levin
> 
>> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin  wrote:
>> Hi Ufuk,
>> 
>> I'm starting to believe the bug is much deeper than the originally reported 
>> error because putting the libraries in `/usr/lib` or `/lib` does not work. 
>> This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't 
>> work, despite that being in the `java.library.path` at the call site of the 
>> error. I wrote a small program to test the loading of native libraries, and 
>> it was able to successfully load `libhadoop.so`. I'm very perplexed. Could 
>> this be related to the way flink shades hadoop stuff? 
>> 
>> Here is my program and its output:
>> 
>> ```
>> $ cat LibTest.scala
>> package com.redacted.flink
>> 
>> object LibTest {
>>   def main(args: Array[String]): Unit = {
>> val library = args(0)
>> 
>> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>> System.out.println(s"Attempting to load $library")
>> System.out.flush()
>> System.loadLibrary(library)
>> System.out.println(s"Successfully loaded ")
>> System.out.flush()
>> }
>> ```
>> 
>> I then tried running that on one of the task managers with `hadoop` as an 
>> argument:
>> 
>> ```
>> $ java -jar lib_test_deploy.jar hadoop
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> Attempting to load hadoop
>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in 
>> java.library.path
>>  at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>>  at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>>  at java.lang.System.loadLibrary(System.java:1122)
>>  at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>>  at com.stripe.flink.LibTest.main(LibTest.scala)
>> ```
>> 
>> I then copied the native libraries into `/usr/lib/` and ran it again:
>> 
>> ```
>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>> $ java -jar lib_test_deploy.jar hadoop
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> Attempting to load hadoop
>> Successfully loaded
>> ```
>> 
>> Any ideas? 
>> 
>>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:
>>> Hi Ufuk,
>>> 
>>> One more update: I tried copying all the hadoop native `.so` files (mainly 
>>> `libhadoop.so`) into `/lib` and am I still experiencing the issue I 
>>> reported. I also tried naively adding the `.so` files to the jar with the 
>>> flink application and am still experiencing the issue I reported (however, 
>>> I'm going to investigate this further as I might not have done it 
>>> correctly).
>>> 
>>> Best,
>>> 
>>> Aaron Levin
>>> 
 On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
 Hi Ufuk,
 
 Two updates:
 
 1. As suggested in the ticket, I naively copied the every `.so` in 
 `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My 
 knowledge of how shared libs get picked up is hazy, so I'm not sure if 
 blindly copying them like that should work. I did check what 
 `System.getProperty("java.library.path")` returns at the call-site and 
 it's: 
 java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
 2. The exception I see comes from 
 `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). 
 This uses `System.loadLibrary("hadoop")`.
 
 [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: 
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
 [2019-01-23 19:52:33.081376]  at 
 org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
 [2019-01-23 19:52:33.081406]  at 
 org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
 [2019-01-23 19:52:33.081429]  at 
 

Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-24 Thread Tzu-Li (Gordon) Tai
Hi!

We've double checked the code, and the only plausible cause of this is that
you may be using flink-avro 1.6.x with Flink 1.7.x.
Could you double check that all Flink dependencies, including flink-avro,
are 1.7.1?
You can verify this by doing `mvn dependency:tree` on your job, and check
that flink-avro 1.6.x isn't in there.

A more detailed explanation of why we suspect this:
In Flink 1.7.x, the job will only fail if a previous Java-serialized
serializer, that couldn't be deserialized in the restore, was attempted to
be used.
In flink-avro 1.7.x, we've made sure that the previous serialized
AvroSerializer instance (which is expected to no longer be deserializable
in 1.7.1) is never accessed. This isn't the case for flink-avro 1.6.x,
which still attempts to access the serializer AvroSerializer instance.

Please update us on your verifications here. And thanks for the effort!

Cheers,
Gordon

On Wed, Jan 23, 2019 at 8:41 PM pwestermann 
wrote:

> Thanks Gordon,
>
> I get the same exception in the JM logs and that looks like it's causing
> the
> job failure.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Is there a way to get all flink build-in SQL functions

2019-01-24 Thread Hequn Cheng
Hi yinhua,

As Chesnay suggest, document is a good way. You can find descriptions and
example for each udf.
If you only want to get a list of name, you can also take a look at the
flink code(i.e., the BasicOperatorTable.builtInSqlOperators

).

Hope this helps.
Best, Hequn

On Thu, Jan 24, 2019 at 4:34 PM Chesnay Schepler  wrote:

> Beyond the documentation
> 
> I don't believe there to be a mechanism for listing all built-in functions.
>
> On 23.01.2019 04:30, yinhua.dai wrote:
>
> I would like to put this list to the our self service flink SQL web UI.
> Thanks.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: Change Flink checkpoint configuration at runtime

2019-01-24 Thread Chesnay Schepler

You cannot change the checkpointing configuration at runtime.

You should be able to resume the job from the last checkpoint.

On 22.01.2019 19:39, knur wrote:

I'm running a streaming job that uses the following config:

 checkpointInterval = 5 mins
 minPauseBetweenCheckpoints = 2 mins
 checkpointTimeout = 1 minute
 maxConcurrentCheckpoints = 1

This is using incremental, async checkpoints with the RocksDb backend. So
far around 2K checkpoints have been triggered, but I just noticed that after
the first ~1K the checkpoints have been failing with:

 Checkpoint 1560 of job 9054d277265950c07ab90cf7ba0641d0 expired before
completing.

Now I'm in a very interesting position: I want to trigger a `savepoint` or a
`cancel -s`, but both of those commands will fail because they are coupled
to the checkpoint mechanism. i.e. both commands fail precisely because the
checkpoints are timing out.

Hence my question... is there a way to change the configuration of the
checkpoints at runtime? It seems like there is no such thing, but also not a
good reason why it couldn't be implemented (we already allow modifying the
parallelism of a job which looks like a harder problem to solve).

Assuming there is no way to do this... how should I try to save my job? I do
have enabled the `RETAIN_ON_CANCELLATION` policy.

Should I be able to resume the job from the last checkpoint using the
--savepoint flag?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Back pressure within a operator chain

2019-01-24 Thread Chesnay Schepler
The behavior should be identical regardless of whether the are chained 
or not.


On 23.01.2019 09:11, Paul Lam wrote:

Hi,

I would like to know if back pressure applies to operators in the same 
operator chain?


The background is that I have a simple streaming job that consumes 
data from Kafka, do some transformation and writes to HDFS (all the 
operators are chained together), and if the Kafka partitions are much 
greater that job parallelism (like 40:1), OOM happens. The the root 
cause should be Kafka consumer pulling too much data. So I’m wondering 
if I should separate the source and sink to make the back pressure 
mechanism working.


Best,
Paul Lam





Re: Is there a way to get all flink build-in SQL functions

2019-01-24 Thread Chesnay Schepler
Beyond the documentation 
 
I don't believe there to be a mechanism for listing all built-in functions.


On 23.01.2019 04:30, yinhua.dai wrote:

I would like to put this list to the our self service flink SQL web UI.
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink CEP : Doesn't generate output

2019-01-24 Thread Chesnay Schepler
Can you provide us a self-contained reproducing example? (preferably as 
elementary as possible)


On 22.01.2019 18:58, dhanuka ranasinghe wrote:

Hi All,

I have used Flink CEP to filter some events and generate some alerts 
based on certain conditions. But unfortunately doesn't print any 
result. I have attached source code herewith, could you please help me 
on this.





package org.monitoring.stream.analytics;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.shaded.org.apache.commons.lang3.StringUtils;
import org.monitoring.stream.analytics.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MultiMap;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;


public class FlinkCEP {
private final static Logger LOGGER = 
LoggerFactory.getLogger(FlinkCEP.class);


public static void main(String[] args) throws Exception {

String query = 
FileHandler.readInputStream(FileHandler.getResourceAsStream("query.sql"));

if (query == null) {
LOGGER.error("*  Can't read resources 
");

} else {
LOGGER.info(" " + query + " 
=");

}
Properties props = 
FileHandler.loadResourceProperties("application.properties");
Properties kConsumer = 
FileHandler.loadResourceProperties("consumer.properties");
Properties kProducer = 
FileHandler.loadResourceProperties("producer.properties");
String hzConfig = 
FileHandler.readInputStream(FileHandler.getResourceAsStream("hazelcast-client.xml"));
String schemaContent = 
FileHandler.readInputStream(FileHandler.getResourceAsStream("IRIC-schema.json"));


props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.starting-position", "latest");
Map tempMap = new HashMap<>();
for (final String name : props.stringPropertyNames())
tempMap.put(name, props.getProperty(name));
final ParameterTool params = ParameterTool.fromMap(tempMap);
String jobName = props.getProperty(ApplicationConfig.JOB_NAME);

LOGGER.info("%%% Desktop Responsibility 
Start %%");


LOGGER.info("$$$ Hz instance name " + 
props.toString());

HazelcastInstance hzInst = HazelcastUtils.getClient(hzConfig, "");

LOGGER.info("== schema " + schemaContent);

MultiMap distributedMap = 
hzInst.getMultiMap("masterDataSynch");

distributedMap.put(jobName, query);

LOGGER.info("%% Desktop Responsibility 
End %");


Collection queries = distributedMap.get(jobName);
Set rules = new HashSet<>(queries);
LOGGER.info("== query" + query);
rules.add(query);
hzInst.getLifecycleService().shutdown();
final String sourceTable = "dataTable";

String paral = props.getProperty(ApplicationConfig.FLINK_PARALLEL_TASK);
String noOfOROperatorsValue = 
props.getProperty(ApplicationConfig.FLINK_NUMBER_OF_OR_OPERATORS);

int noOfOROperators = 50;
if(StringUtils.isNoneBlank(noOfOROperatorsValue)) {
noOfOROperators = Integer.parseInt(noOfOROperatorsValue);
}
List> subQueries = chunk(new ArrayList(rules), 
noOfOROperators);


// define a schema

// setup streaming environment
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));

env.enableCheckpointing(30); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
// env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

Re: No resource available error while testing HA

2019-01-24 Thread Gary Yao
Hi Averell,

> Then I have another question: when JM cannot start/connect to the JM on
.88,
> why didn't it try on .82 where resource are still available?

When you are deploying on YARN, the TM container placement is decided by the
YARN scheduler and not by Flink. Without seeing the complete logs, it is
difficult to tell what happened. If you need help with debugging, please
enable YARN's log aggregation and attach the output of:

yarn logs -applicationId 

Do I understand it correctly that your problem was solved by changing the
zookeper connection string?

Best,
Gary

On Wed, Jan 23, 2019 at 12:44 PM Averell  wrote:

> Hi Gary,
>
> Thanks for your support.
>
> I use flink 1.7.0. I will try to test without that -n.
> Here below are the JM log (on server .82) and TM log (on server .88). I'm
> sorry that I missed that TM log before asking, had a thought that it would
> not relevant. I just fixed the issue with connection to zookeeper and the
> problem was solved.
>
> Then I have another question: when JM cannot start/connect to the JM on
> .88,
> why didn't it try on .82 where resource are still available?
>
> Thanks and regards,
> Averell
>
> Here is the JM log (from /mnt/var/log/hadoop-yarn/.../jobmanager.log on
> .82)
> (it seems irrelevant. Even the earlier message regarding
> NoResourceAvailable
> was there in GUI, but not found in the jobmanager.log file):
>
> 2019-01-23 04:15:01.869 [main] WARN
> org.apache.flink.configuration.Configuration  - Config uses deprecated
> configuration key 'web.port' instead of proper key 'rest.port'
> 2019-01-23 04:15:03.483 [main] WARN
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  - Upload
> directory
> /tmp/flink-web-08279f45-0244-4c5c-bc9b-299ac59b4068/flink-web-upload does
> not exist, or has been deleted externally. Previously uploaded files are no
> longer available.
>
> And here is the TM log:
> 2019-01-23 11:07:07.479 [main] ERROR
> o.a.flink.shaded.curator.org.apache.curator.ConnectionState  - Connection
> timed out for connection string (localhost:2181) and timeout (15000) /
> elapsed (56538)
>
> org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:158)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:32)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:242)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:175)
> at
>
> org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:154)
> at
>
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.start(ZooKeeperLeaderRetrievalService.java:107)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskExecutor.start(TaskExecutor.java:277)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:168)
> at
>
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.lambda$run$0(YarnTaskExecutorRunner.java:142)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:141)
> at
>
> org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:75)
> 2019-01-23 11:07:08.224 [main-SendThread(localhost:2181)] WARN
> 

Re: Trouble migrating state from 1.6.3 to 1.7.1

2019-01-24 Thread pwestermann
I ran `mvn dependency:tree` and only see 1.7.1 dependencies for Flink:

[INFO] com.inin.analytics:analytics-flink:jar:0.0.1-SNAPSHOT
[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.7.1:provided
[INFO] |  +- org.apache.flink:flink-runtime_2.11:jar:1.7.1:provided
[INFO] |  |  +-
org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.7.1:provided
[INFO] |  |  +- org.apache.flink:flink-hadoop-fs:jar:1.7.1:provided
[INFO] |  |  +- commons-io:commons-io:jar:2.4:compile
[INFO] |  |  +-
org.apache.flink:flink-shaded-netty:jar:4.1.24.Final-5.0:provided
[INFO] |  |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-5.0:provided
[INFO] |  |  +- org.apache.flink:flink-shaded-jackson:jar:2.7.9-5.0:provided
[INFO] |  |  +- org.javassist:javassist:jar:3.19.0-GA:provided
[INFO] |  |  +- org.scala-lang:scala-library:jar:2.11.12:compile
[INFO] |  |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:provided
[INFO] |  |  |  +- com.typesafe:config:jar:1.3.0:provided
[INFO] |  |  |  \-
org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:provided
[INFO] |  |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:provided
[INFO] |  |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:provided
[INFO] |  |  | \-
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:provided
[INFO] |  |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:provided
[INFO] |  |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:provided
[INFO] |  |  +- org.clapper:grizzled-slf4j_2.11:jar:1.3.2:provided
[INFO] |  |  +- com.github.scopt:scopt_2.11:jar:3.5.0:provided
[INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
[INFO] |  |  \- com.twitter:chill_2.11:jar:0.7.6:provided
[INFO] |  | \- com.twitter:chill-java:jar:0.7.6:provided
[INFO] |  +- org.apache.flink:flink-shaded-guava:jar:18.0-5.0:provided
[INFO] |  +- org.apache.commons:commons-math3:jar:3.5:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.7.1:compile
[INFO] +- org.apache.flink:flink-clients_2.11:jar:1.7.1:provided
[INFO] |  +- org.apache.flink:flink-core:jar:1.7.1:provided
[INFO] |  |  +- org.apache.flink:flink-annotations:jar:1.7.1:provided
[INFO] |  |  +- org.apache.flink:flink-metrics-core:jar:1.7.1:provided
[INFO] |  |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:provided
[INFO] |  |  |  +- com.esotericsoftware.minlog:minlog:jar:1.2:provided
[INFO] |  |  |  \- org.objenesis:objenesis:jar:2.1:provided
[INFO] |  |  +- commons-collections:commons-collections:jar:3.2.2:provided
[INFO] |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  +- org.apache.flink:flink-optimizer_2.11:jar:1.7.1:provided
[INFO] |  +- org.apache.flink:flink-java:jar:1.7.1:provided
[INFO] |  \- commons-cli:commons-cli:jar:1.3.1:provided
[INFO] +- org.apache.flink:flink-avro:jar:1.7.1:compile
[INFO] |  \- org.apache.avro:avro:jar:1.8.2:compile
[INFO] | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] | +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
[INFO] | \- org.tukaani:xz:jar:1.5:compile
[INFO] +-
org.apache.flink:flink-statebackend-rocksdb_2.11:jar:1.7.1:provided
[INFO] |  \- org.rocksdb:rocksdbjni:jar:5.7.5:provided
[INFO] +- org.apache.flink:flink-connector-kafka-0.11_2.11:jar:1.7.1:compile
[INFO] |  +-
org.apache.flink:flink-connector-kafka-0.10_2.11:jar:1.7.1:compile
[INFO] |  |  \-
org.apache.flink:flink-connector-kafka-0.9_2.11:jar:1.7.1:compile
[INFO] |  | \-
org.apache.flink:flink-connector-kafka-base_2.11:jar:1.7.1:compile
[INFO] |  \- org.apache.kafka:kafka-clients:jar:0.11.0.2:compile
[INFO] | \- net.jpountz.lz4:lz4:jar:1.3.0:compile
[INFO] +- org.apache.flink:flink-s3-fs-presto:jar:1.7.1:provided

I also tried this again with debug logging enabled but didn't see any more
messages that would explain the failure.
To me, the error message
(org.apache.flink.formats.avro.typeutils.AvroSerializer; local class
incompatible: stream classdesc serialVersionUID = 1, local class
serialVersionUID = 2) looks like this is caused by only having the 1.7.1
AvroSerializer class (serialVersionUID = 2) in the classpath but the
savepoint requires the old one (serialVersionUID = 1).



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Flink 1.6] How to get current total number of processed events

2019-01-24 Thread Kien Truong

Hi Nhan,

You can store the max/min value using the value states of a 
KeyedProcessFunction,


or in the global state of a ProcessWindowFunction.


On processing each item, compare its value to the current max/min and 
update the stored value as needed.



Regards,

Kien


On 1/24/2019 12:37 AM, Thanh-Nhan Vo wrote:


Hi Kien Truong,


Thank you for your answer. I have another question, please !
If I count the number of messages processed for a given key j (denoted 
c_j), is there a way to retrieve max{c_j}, min{c_j}?


Thanks

*De :*Kien Truong [mailto:duckientru...@gmail.com]
*Envoyé :* mercredi 23 janvier 2019 16:04
*À :* user@flink.apache.org
*Objet :* Re: [Flink 1.6] How to get current total number of processed 
events


Hi Nhan,

Logically, the total number of processed events before an event cannot 
be accurately calculated unless events processing are synchronized.


This is not scalable, so naturally I don't think Flink supports it.

Although, I suppose you can get an approximate count by using a 
non-keyed TumblingWindow, count the item inside the window, then use 
that value in the next window.


Regards,

Kien

On 1/21/2019 9:34 PM, Thanh-Nhan Vo wrote:

Hello all,

I have a question, please !
I’m using Flink 1.6 to process our data in streaming mode.
I wonder if at a given event, there is a way to get the current
total number of processed events (before this event).

If possible, I want to get this total number of processed events
as a value state in Keystream.
It means that for a given key in KeyStream, I want to retrieve not
only the total number of processed events for this key but also
the total number of processed events for all keys.

There is a way to do this in Flink 1.6, please!

Best regard,
Nhan



Re: Use case for The Broadcast State Pattern and efficient database access

2019-01-24 Thread Andrey Zagrebin
Hi Marke,

Q1: From your description of the problem, "Broadcast State Pattern" seems
to be the suitable choice.
If you want to keep the same state on all parallel instances which process
stream[1] and update/store that state the same way on each instance by
using each element of stream[2].

Q2: Apart of simple synchronous queries to database upon getting each
element of stream[2], you might benefit from using async IO (1). E.g. you
could put it before broadcasting stream[2] and broadcast database response.

Best,
Andrey

(1)
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Thu, Jan 24, 2019 at 1:10 PM Marke Builder 
wrote:

> Hi,
>
> I have a question regarding the "Broadcast State Pattern".
> My job consume two streams (kafka, rabbitmq), on one of the streams come a
> lot of data and continuously[1]. On the other  very few and rarely[2]. I'm
> using the Broadcast State pattern, because the stream[2] are updating data
> which are required for stream[1].
>
> Q1: Is the Broadcast State Pattern the right way?
>
> As I mentioned above, the stream[2] provide data and "say" read additional
> data from a database.
>
> Q2: What is the best(the most efficient) way to request a database from
> the processElement(...) function?
>
> Many Thanks!
> Marke
>


Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

2019-01-24 Thread Andrey Zagrebin
Hi Steven,

Did you not experience this problem with previous Flink release (your
marked topic with 1.7)?

Do you use HA setup?

Without HA setup, the blob data, which belongs to the job, will be
distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc),
it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task
executors and produced partitions because task executors contend for the
blob data. When the job is restored, the blob data might be not fetched
because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and
DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Thu, Jan 24, 2019 at 7:06 AM Steven Wu  wrote:

> When we start a high-parallelism (1,600) job without any
> checkpoint/savepoint, the job struggled to be deployed. After a few
> restarts, it eventually got deployed and was running fine after the initial
> struggle. jobmanager was very busy. Web UI was very slow. I saw these two
> exceptions/failures during the initial failures.
>
> I don't seem to see this issue when starting the same job from an external
> checkpoint. or at least very rarely.
>
> Anyone else experienced similar issue?
>
> Thanks,
> Steven
>
> Exception #1
>
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> fe55bf158e89cf555be6582e577b9621 timed out.
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)
>
> at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> Exception #2
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627
> not found.
>
> at
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
>
> at
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)
>
> at java.util.TimerThread.mainLoop(Timer.java:555)
>
> at java.util.TimerThread.run(Timer.java:505)
>
>


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-24 Thread Aaron Levin
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported
error because putting the libraries in `/usr/lib` or `/lib` does not work.
This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't
work, despite that being in the `java.library.path` at the call site of the
error. I wrote a small program to test the loading of native libraries, and
it was able to successfully load `libhadoop.so`. I'm very perplexed. Could
this be related to the way flink shades hadoop stuff?

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
val library = args(0)

System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
System.out.println(s"Attempting to load $library")
System.out.flush()
System.loadLibrary(library)
System.out.println(s"Successfully loaded ")
System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an
argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas?

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> One more update: I tried copying all the hadoop native `.so` files (mainly
> `libhadoop.so`) into `/lib` and am I still experiencing the issue I
> reported. I also tried naively adding the `.so` files to the jar with the
> flink application and am still experiencing the issue I reported (however,
> I'm going to investigate this further as I might not have done it
> correctly).
>
> Best,
>
> Aaron Levin
>
> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
>
>> Hi Ufuk,
>>
>> Two updates:
>>
>> 1. As suggested in the ticket, I naively copied the every `.so` in
>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
>> knowledge of how shared libs get picked up is hazy, so I'm not sure if
>> blindly copying them like that should work. I did check what
>> `System.getProperty("java.library.path")` returns at the call-site and
>> it's: 
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> 2. The exception I see comes from
>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
>> This uses `System.loadLibrary("hadoop")`.
>>
>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>> [2019-01-23 19:52:33.081376]  at
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>> [2019-01-23 19:52:33.081406]  at
>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>> [2019-01-23 19:52:33.081429]  at
>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>> [2019-01-23 19:52:33.081457]  at
>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>> [2019-01-23 19:52:33.081494]  at
>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>> [2019-01-23 19:52:33.081517]  at
>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>> [2019-01-23 19:52:33.081549]  at
>> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081728]  at
>> scala.collection.immutable.List.foreach(List.scala:392)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081832]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>> [2019-01-23 19:52:33.081854]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> [2019-01-23 19:52:33.081882]  at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>> [2019-01-23 19:52:33.081904]  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> [2019-01-23 19:52:33.081946]  at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

Re: [DISCUSS] Towards a leaner flink-dist

2019-01-24 Thread Bowen Li
+1 for leaner distribution and a better 'download' webpage.

+1 for a full distribution if we can automate it besides supporting the
leaner one. If we support both, I'd image release managers should be able
to package two distributions with a single change of parameter instead of
manually package the full distribution. How to achieve that needs to be
evaluated and discussed, probably can be something like 'mvn clean install
-Dfull/-Dlean', I'm not sure yet.


On Wed, Jan 23, 2019 at 10:11 AM Thomas Weise  wrote:

> +1 for trimming the size by default and offering the fat distribution as
> alternative download
>
>
> On Wed, Jan 23, 2019 at 8:35 AM Till Rohrmann 
> wrote:
>
>> Ufuk's proposal (having a lean default release and a user convenience
>> tarball) sounds good to me. That way advanced users won't be bothered by
>> an
>> unnecessarily large release and new users can benefit from having many
>> useful extensions bundled in one tarball.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 23, 2019 at 3:42 PM Ufuk Celebi  wrote:
>>
>> > On Wed, Jan 23, 2019 at 11:01 AM Timo Walther 
>> wrote:
>> > > I think what is more important than a big dist bundle is a helpful
>> > > "Downloads" page where users can easily find available filesystems,
>> > > connectors, metric repoters. Not everyone checks Maven central for
>> > > available JAR files. I just saw that we added a "Optional components"
>> > > section recently [1], we just need to make it more prominent. This is
>> > > also done for the SQL connectors and formats [2].
>> >
>> > +1 I fully agree with the importance of the Downloads page. We
>> > definitely need to make any optional dependencies that users need to
>> > download easy to find.
>> >
>>
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-24 Thread knur
Bump? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Adding flink udf support to linkedin's portable udf framework transport

2019-01-24 Thread Arup Malakar
Hi Flink Users,

Came across the project transport from linkedin:
https://github.com/linkedin/transport I think the project has great
potential which allows for sharing udf implementation across various
compute engines (hive/spark/presto). Any thoughts on adding support for
flink udfs to transport or is anyone already working on it?

-- 
Arup Malakar