Re: Savepoint memory overhead

2020-04-29 Thread Yun Tang
Hi Lasse Would you please give more details? 1. What is the memory configuration of your task manager? e.g the memory size of process, managed memory. And how large the memory would increase to once you meet problem. 2. Did you use managed memory to control RocksDB? [1] 3. Why you give

Re: Reading from sockets using dataset api

2020-04-29 Thread Arvid Heise
Hi Kaan, not entirely sure I understand your solution. I gathered that you create a dataset of TCP addresses and then use flatMap to fetch and output the data? If so, then I think it's a good solution for batch processing (DataSet). It doesn't work in DataStream because it doesn't play well with

Re: Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn
localhost is available on my CentOS machine. I can start the cluster(including taskMgr) with start-cluster.sh when using flink-1.10 release tgz package change the slave file localhost to 127.0.0.1 can resolve the problem. But it request the password of the host. Not change slave file, i can fi

Re: Savepoint memory overhead

2020-04-29 Thread Lasse Nedergaard
We using Flink 1.10 running on Mesos. Med venlig hilsen / Best regards Lasse Nedergaard > Den 30. apr. 2020 kl. 04.53 skrev Yun Tang : > >  > Hi Lasse > > Which version of Flink did you use? Before Flink-1.10, there might exist > memory problem when RocksDB executes savepoint with write bat

Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Xintong Song
That's pretty much it. I'm not very familiar with the implementation details of the two operators you mentioned though. The default value for framework off-heap memory is usually not strict limited to the needed amount, so it's probably ok in some cases if the tasks/operators use a few direct memo

Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Jiahui Jiang
Hey Xintong, thanks for the explanations. For the first part, can I confirm whether some of my understandings are correct here: For Akka direct memory, it's part of the framework.off-heap; we also use FlinkKafkaConsumers and FlinkKafkaProducers in our pipeline, because of the netty usage withi

Re: 1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread Xintong Song
Hi Lei, Could you check whether the hostname 'localhost' is available on your CentOS machine? This is usually defined in "/etc/hosts". You can also try to modify the slaves file, replacing 'localhost' with '127.0.0.1'. The path is: /conf/slaves Thank you~ Xintong Song On Thu, Apr 30, 2020 at

Re: Flink Task Manager GC overhead limit exceeded

2020-04-29 Thread Xintong Song
Then I would suggest the following. - Check the task manager log to see if the '-D' properties are properly loaded. They should be located at the beginning of the log file. - You can also try to log into the pod and check the JVM launch command with "ps -ef | grep TaskManagerRunner". I suspect ther

Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Xintong Song
Hi Jiahui, I'd like to clarify a bit more. I think in our case, it was actually the network buffer size that was too > large (we were seeing Akka exception), which happened to be fixed by > increasing task.off-heap.size since that just make the direct memory larger. > Please be aware that 'taskma

1.11 snapshot: Name or service not knownname localhost and taskMgr not started

2020-04-29 Thread wangl...@geekplus.com.cn
1 Clone the 1.11 snapshot repository 2 Build it on windowns 3 Scp the flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT dir to a CentOS server 4 ./bin/start-cluster.sh on the CentOS server There's message: Starting cluster. Starting standalonesession daemon on host test_3.6. : N

Re: doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Thank you Alexander for the response. This is very helpful. Can i apply the same pattern to S3 as well , as in read from Kafka or Kinesis and write multiple files in S3 or multiple topics in Kinesis ? regards, Rahul On Wed, Apr 29, 2020 at 2:32 PM Alexander Fedulov wrote: > Hi Dhurandar, > > it

Re: RocksDB default logging configuration

2020-04-29 Thread Yun Tang
Hi Bajaj Actually I don't totally understand what's your description, which conflicts with Flink codebase. Please follow either one of guide below to create RocksDBStateBackend: * Set the state backend to environment programmatically, which has the highest priority over configuration in f

Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Jiahui Jiang
Hey Xintong, Steven, thanks for replies! @Steven Wu thanks for the link! I didn't realize for all the different direct memory configs, even though they can be configured separately, it's only the sum that will be used to set JVM parameter. I think in our case, it wa

Re: Flink Task Manager GC overhead limit exceeded

2020-04-29 Thread Xintong Song
Hi Eleanore, I'd like to explain about 1 & 2. For 3, I have no idea either. 1. I dont see the heap size from UI for task manager show correctly > Despite the 'heap' in the key, 'taskmanager.heap.size' accounts for the total memory of a Flink task manager, rather than only the heap memory. A Flin

Re: Savepoint memory overhead

2020-04-29 Thread Yun Tang
Hi Lasse Which version of Flink did you use? Before Flink-1.10, there might exist memory problem when RocksDB executes savepoint with write batch[1]. [1] https://issues.apache.org/jira/browse/FLINK-12785 Best Yun Tang From: Lasse Nedergaard Sent: Wednesday, Ap

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jingsong Li
Sorry for mistake, I proposal: connector: 'filesystem' path: '...' format: 'json' format.option: option1: '...' option2: '...' option3: '...' And I think most of cases, users just need configure 'format' key, we should make it convenient for them. There is no big problem in making fo

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing connector properties to connector developers. So frankly speaking, no matter which style we choose, there is no strong guarantee for either of these. So it's also possible that developers can choose a totally different way

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jingsong Li
Thanks Timo for staring the discussion. I am +1 for "format: 'json'". Take a look to Dawid's yaml case: connector: 'filesystem' path: '...' format: 'json' format: option1: '...' option2: '...' option3: '...' Is this work? According to my understanding, 'format' key is the attribute o

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Benchao Li
Thanks Timo for staring the discussion. Generally I like the idea to keep the config align with a standard like json/yaml. >From the user's perspective, I don't use table configs from a config file like yaml or json for now, And it's ok to change it to yaml like style. Actually we didn't know tha

Flink Task Manager GC overhead limit exceeded

2020-04-29 Thread Eleanore Jin
Hi All, Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4 pods, each pod with 4 parallelism. The flink job reads from a source topic with 96 partitions, and does per element filter, the filtered value comes from a broadcast topic and it always use the latest message as the

Re: doing demultiplexing using Apache flink

2020-04-29 Thread Alexander Fedulov
Hi Dhurandar, it is not supported out of the box, however, I think it is possible by doing the following: 1) Create a wrapper type, containing the original message and a topic destination where it is supposed to be sent. You can enrich the messages with it in accordance to the configuration you've

doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Hi , We have a use case where we have to demultiplex the incoming stream to multiple output streams. We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding n

doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
> > Hi , > > We have a use case where we have to demultiplex the incoming stream to > multiple output streams. > > We read from 1 Kafka topic and as an output we generate multiple Kafka > topics. The logic of generating each new Kafka topic is different and not > known beforehand. Users of the syst

doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Hi , We have a use case where we have to demultiplex the incoming stream to multiple output streams. We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding n

doing demultiplexing using Apache flink

2020-04-29 Thread dhurandar S
Hi , We have a use case where we have to demultiplex the incoming stream to multiple output streams. We read from 1 Kafka topic and as an output we generate multiple Kafka topics. The logic of generating each new Kafka topic is different and not known beforehand. Users of the system keep adding n

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Dawid Wysakowicz
Hi all, I also wanted to share my opinion. When talking about a ConfigOption hierarchy we use for configuring Flink cluster I would be a strong advocate for keeping a yaml/hocon/json/... compatible style. Those options are primarily read from a file and thus should at least try to follow common p

Re: Debug Slowness in Async Checkpointing

2020-04-29 Thread Piotr Nowojski
Hi, Yes, for example [1]. Most of the points that you mentioned are already visible in the UI and/or via metrics, just take a look at the subtask checkpoint stats. > when barriers were instrumented at source from checkpoint coordinator That’s checkpoint trigger time. > when each down stream task

Does it make sense to use init containers for job upgrades in kubernetes

2020-04-29 Thread Barisa Obradovic
Hi, we are attempting to migrate our flink cluster to K8, and are looking into options how to automate job upgrades; wondering if anyone here has done it with init container? Or if there is a simpler way? 0: So, let's assume we have a job manager with few task managers running, in a stateful set;

Re: Late data acquisition

2020-04-29 Thread Jark Wu
Hi, Currently, Flink SQL doesn't support to get late data. But you can do that by bridging SQL Table to DataStream. And use DataStream's window functionality which supports `sideOutputLateData` method [1]. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operator

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Flavio Pompermaier
Personally I don't have any preference here. Compliance wih standard YAML parser is probably more important On Wed, Apr 29, 2020 at 5:10 PM Jark Wu wrote: > From a user's perspective, I prefer the shorter one "format=json", because > it's more concise and straightforward. The "kind" is redundan

Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10

2020-04-29 Thread Steven Wu
Jiahui, Based on my reading on the doc, for containerized environment, it is probably better to set `taskmanager.memory.process.size` to the container memory limit. https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size Then I typically set `taskman

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Jark Wu
>From a user's perspective, I prefer the shorter one "format=json", because it's more concise and straightforward. The "kind" is redundant for users. Is there a real case requires to represent the configuration in JSON style? As far as I can see, I don't see such requirement, and everything works f

Using logicalType in the Avro table format

2020-04-29 Thread Gyula Fóra
Hi All! We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type. According to the docs , we can use long type with logicalType: timestamp-mi

Re: Restore from save point but need to read from different Kafka topics

2020-04-29 Thread Robert Metzger
Hey, as far as I remember, the Kafka consumer stores the topic name + partition id + offset in state. If you modify the kafka topic on restore, the Kafka consumer would continue where it left off. If the topics don't exist anymore, it would fail. If you want Kafka to start from a different topic,

Savepoint memory overhead

2020-04-29 Thread Lasse Nedergaard
Hi. I would like to know if there are any guidelines/recommendations for the memory overhead we need to calculate for when doing savepoint to s3. We use RockDb state backend. We run our job on relative small task managers and we can see we get memory problems if the state size for each task manag

Re: History Server Not Showing Any Jobs - File Not Found?

2020-04-29 Thread Chesnay Schepler
hmm...let's see if I can reproduce the issue locally. Are the archives from the same version the history server runs on? (Which I supposed would be 1.9.1?) Just for the sake of narrowing things down, it would also be interesting to check if it works with the archives residing in the local fil

Re: Reading from sockets using dataset api

2020-04-29 Thread Arvid Heise
Hi Kaan, seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1]. But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you ca

Fwd: StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-29 Thread Arvid Heise
Oh shoot, I replied privately. Let me forward the responses to the list including the solution. -- Forwarded message - From: orionemail Date: Thu, Apr 23, 2020 at 3:38 PM Subject: Re: StreamingFileSink to a S3 Bucket on a remote account using STS To: Arvid Heise Hi, Thanks for

Re: Re: RuntimeException: Could not instantiate generated class 'StreamExecCalc$23166'

2020-04-29 Thread Caizhi Weng
Hi izual, Sorry for the last response. The problem is indeed that your table has too many columns. The generated code is too long for the Janino compiler to compile. Sadly, Flink currently does not have optimizations for this. As the community is quite busy with the feature freeze of version 1.11

Re: "Fill in" notification messages based on event time watermark

2020-04-29 Thread Piotr Nowojski
Hi Manas, Adding to the response from Timo, if you don’t have unit tests/integration tests, I would strongly recommend setting them up, as it makes debugging and testing easier. You can read how to do it for your functions and operators here [1] and here [2]. Piotrek [1] https://ci.apache.or

Re: flink couchbase sink

2020-04-29 Thread Robert Metzger
Hey, I'm not aware of any other couchbase connector. I opened a ticket at the project you've mentioned, asking for a version upgrade. Best, Robert On Thu, Apr 23, 2020 at 2:33 AM 令狐月弦 wrote: > Greetings, > > We have been using couchbase for large scale data storage. > I saw there is a flink s

Re: Running in LocalExecutionEnvironment in production

2020-04-29 Thread Robert Metzger
Hey Suraj, sorry for the late reply. In principle, there are no limitations from that mode, and it is fine to use it in production. Flink is extensively using the local execution for testing. In many cases, it runs exactly the same code as you would run on a fully distributed setup. There are som

Re: Distributed Incremental Streaming Graph Analytics: State Accessing/Message Passing Options

2020-04-29 Thread Robert Metzger
Hey, I would recommend using Stateful Functions for that use case. how to access state that is stored in another node than the one doing the > processing. This is not possible in an efficient and nice way in Flink. There a hacks (using queryable state), but I would not recommend them. On Mon,

Re: StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-29 Thread Robert Metzger
Hey, sorry for the late response. Can you provide the full exceptions(s) including stacktrace you are seeing? On Mon, Apr 20, 2020 at 3:39 PM orionemail wrote: > Hi, > > New to both AWS and Flink but currently have a need to write incoming data > into a S3 bucket managed via AWS Tempory credent