Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Xingcan.
Regarding that Either, I still see the need to do TypeCasting/CaseClass
matching. Could you please help give a look?
val transformed = dog
 
.union(cat)
 
.connect(transformer)
  .keyBy(r
= r.name, r2 = r2.name)
 
.process(new TransformerCoProcessFunction)
  .split(_
match {


case Right(d) = List("dog")


case Left(c) = List("cat")


case _ = List("")
  })
 
 val transformed_dog = transformed.select("dog").map(_ match {


case Right(d) = d


case _ = NON_EXIST_DOG
  })
 val transformed_cat = transformed.select("cat").map(_ match {


case Left(c) = c


case _ = NON_EXIST_CAT
  })
Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to submit flink job on yarn by java code

2018-08-15 Thread Rong Rong
I dont think your exception / code was attached.

In general, this is largely depending on how your setup is. Are you trying
to setup a long-running YARN session cluster or are you trying to directly
use YARN cluster submit? [1].
We have an open-sourced project [2] with similar requirement submitting
compiled jobs on YARN, where we directly extends Flink's
AbstractYarnClusterDescriptor.
maybe it can be a reference for you.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#start-a-long-running-flink-cluster-on-yarn
[2]
https://github.com/uber/AthenaX/blob/master/athenax-backend/src/main/java/com/uber/athenax/backend/server/yarn/AthenaXYarnClusterDescriptor.java

On Tue, Aug 14, 2018 at 7:58 PM spoon_lz <971066...@qq.com> wrote:

> My project is to automatically generate flink's code jar and then submit it
> to yarn cluster for execution and get the ApplicationId. I find that after
> execution, an error will be reported
>
>
>
> Then I searched for the error on Google and found that the reason for the
> error was that I did not introduce the haoop environment variable.
> But my jar submission is not called./bin/flink script originally submitted,
> but use the CliFrontend.java ,How to solve this problem?
>
> My code like :
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink CLI does not return after submitting yarn job in detached mode

2018-08-15 Thread Marvin777
Hi, Madhav,


> ./flink-1.4.2/bin/flink run -m yarn-cluster *-yd* -yn 2 -yqu "default"
>  -ytm 2048 myjar.jar


Modified to, ./flink-1.4.2/bin/flink run -m yarn-cluster -*d* -yn 2 -yqu
"default"  -ytm 2048 myjar.jar



[image: image.png]

madhav Kelkar  于2018年8月16日周四 上午5:01写道:

> Hi there,
>
> I am trying to run a single flink job on YARN in detached mode. as per
> the docs for flink 1.4.2, I am using -yd to do that.
>
> The problem I am having is the flink bash script doesn't terminate
> execution and return until I press control + c. In detached mode, I would
> expect the flink CLI to return as soon as yarn job is submitted. is there
> something I am missing? here is exact output I get -
>
>
>
> ./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"  -ytm
>> 2048 myjar.jar \
>> program arguments omitted
>>
>>
>> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
>> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
>> 2018-08-15 14:39:36,873 INFO
>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>> for the flink jar passed. Using the location of class
>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>> 2018-08-15 14:39:36,873 INFO
>>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
>> for the flink jar passed. Using the location of class
>> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>>   - Connecting to ResourceManager at /0.0.0.0:8032
>> 2018-08-15 14:39:37,226 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - Cluster specification:
>> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048,
>> numberTaskManagers=2, slotsPerTaskManager=1}
>> 2018-08-15 14:39:37,651 WARN  org.apache.flink.yarn.YarnClusterDescriptor
>>   - The configuration directory
>> ('/Users/makelkar/work/flink/flink-1.4.2/conf') contains both LOG4J and
>> Logback configuration files. Please delete or rename one of them.
>> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>>
>> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
>> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-dist_2.11-1.4.2.jar
>> 2018-08-15 14:39:38,586 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-python_2.11-1.4.2.jar
>> 2018-08-15 14:39:38,603 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/conf/log4j.properties to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/log4j.properties
>>
>> 2018-08-15 14:39:39,002 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/flink-dist_2.11-1.4.2.jar
>> 2018-08-15 14:39:39,401 INFO  org.apache.flink.yarn.Utils
>>   - Copying from
>> /var/folders/b6/_t_6q0vs3glcggp_8rgyxxl4gn/T/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
>> to
>> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
>> 2018-08-15 14:39:39,836 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>>   - Submitting application master
>> application_1534188161088_0019
>> 2018-08-15 14:39:39,858 INFO
>>  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
>> application 

OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

2018-08-15 Thread Ken Krugler
Hi all,

It looks to me like the OperatorSubtaskState returned from 
OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had 
been registered via registerProcessingTimeTimer but had not yet fired when the 
snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness 
to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and 
run.

The TimerFunction used in this test does seem to be doing the right thing, as 
using it in a simple job on a local Flink cluster works as expected when 
creating & then restarting from a savepoint.

Thanks,

— Ken

==
TimerTest.java
==
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
public static final Logger LOGGER = 
LoggerFactory.getLogger(TimerTest.class);

private List _firedTimers = new ArrayList();

@Before
public void setUp() throws Exception {
}

@Test
public void testTimerSaving() throws Throwable {

// This operator doesn't really do much at all, but the first element
// it processes will create a timer for (timestamp+1).
// Whenever that timer fires, it will create another timer for 
// (timestamp+1).
KeyedProcessOperator operator = 
new KeyedProcessOperator<>(new TimerFunction(_firedTimers));

// Create a test harness from scratch
OneInputStreamOperatorTestHarness testHarness = 
makeTestHarness(operator, null);

// We begin at time zero
testHarness.setProcessingTime(0);

// Process some elements, which should also create a timer for time 1.
int inputs[] = new int[] {1, 2, 3};
for (int input : inputs) {
testHarness.processElement(new StreamRecord<>(input));
}

// Force some time to pass, which should keep moving the timer ahead,
// finally leaving it set for time 10.
for (long i = 1; i < 10; i++) {
testHarness.setProcessingTime(i);
}

// Save the state, which we assume should include the timer we set for
// time 10.
OperatorSubtaskState savedState = 
testHarness.snapshot(0L, testHarness.getProcessingTime());

// Close the first test harness
testHarness.close();

// Create a new test harness using the saved state (which we assume
// includes the timer for time 10).
testHarness = makeTestHarness(operator, savedState);

// Force more time to pass, which should keep moving the timer ahead.
for (long i = 10; i < 20; i++) {
testHarness.setProcessingTime(i);
}

// Close the second test harness and make sure all the timers we expect
// actually fired.
testHarness.close();
for (long i = 1; i < 20; i++) {

// TODO This expectation currently fails, since Timers don't
// seem to be included in the snapshot, at least the one produced by
// the test harness.
assertTrue(_firedTimers.contains(i));
}
}

private OneInputStreamOperatorTestHarness makeTestHarness(
KeyedProcessOperator operator,
OperatorSubtaskState savedState) 
throws Exception {
OneInputStreamOperatorTestHarness result;
result = 
new KeyedOneInputStreamOperatorTestHarness(
operator,
new TimerTool.IdentityKeySelector(),
BasicTypeInfo.INT_TYPE_INFO);
result.setup();
result.open();
if (savedState != null) {
result.initializeState(savedState);
}
return result;
}
}


==
TimerFunction.java
==
package 

Flink CLI does not return after submitting yarn job in detached mode

2018-08-15 Thread madhav Kelkar
Hi there,

I am trying to run a single flink job on YARN in detached mode. as per
the docs for flink 1.4.2, I am using -yd to do that.

The problem I am having is the flink bash script doesn't terminate
execution and return until I press control + c. In detached mode, I would
expect the flink CLI to return as soon as yarn job is submitted. is there
something I am missing? here is exact output I get -



./flink-1.4.2/bin/flink run -m yarn-cluster -yd -yn 2 -yqu "default"  -ytm
> 2048 myjar.jar \
> program arguments omitted
>
>
> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
> /Users/makelkar/work/hadoop-2.7.3/etc/hadoop:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/common/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/hdfs/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/yarn/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/Users/makelkar/work/hadoop-2.7.3/share/hadoop/mapreduce/*:/Users/makelkar/work/hadoop-2.7.3/contrib/capacity-scheduler/*.jar
> 2018-08-15 14:39:36,873 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-08-15 14:39:36,873 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-08-15 14:39:36,921 INFO  org.apache.hadoop.yarn.client.RMProxy
>   - Connecting to ResourceManager at /0.0.0.0:8032
> 2018-08-15 14:39:37,226 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Cluster specification:
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048,
> numberTaskManagers=2, slotsPerTaskManager=1}
> 2018-08-15 14:39:37,651 WARN  org.apache.flink.yarn.YarnClusterDescriptor
>   - The configuration directory
> ('/Users/makelkar/work/flink/flink-1.4.2/conf') contains both LOG4J and
> Logback configuration files. Please delete or rename one of them.
> 2018-08-15 14:39:37,660 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/conf/logback.xml to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/logback.xml
>
> 2018-08-15 14:39:37,986 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/log4j-1.2.17.jar to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/log4j-1.2.17.jar
> 2018-08-15 14:39:38,011 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-dist_2.11-1.4.2.jar
> 2018-08-15 14:39:38,586 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-python_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/lib/flink-python_2.11-1.4.2.jar
> 2018-08-15 14:39:38,603 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/conf/log4j.properties to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/log4j.properties
>
> 2018-08-15 14:39:39,002 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> file:/Users/makelkar/work/flink/flink-1.4.2/lib/flink-dist_2.11-1.4.2.jar
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/flink-dist_2.11-1.4.2.jar
> 2018-08-15 14:39:39,401 INFO  org.apache.flink.yarn.Utils
>   - Copying from
> /var/folders/b6/_t_6q0vs3glcggp_8rgyxxl4gn/T/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
> to
> hdfs://localhost:9000/user/makelkar/.flink/application_1534188161088_0019/application_1534188161088_0019-flink-conf.yaml8441703337078262150.tmp
> 2018-08-15 14:39:39,836 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Submitting application master
> application_1534188161088_0019
> 2018-08-15 14:39:39,858 INFO
>  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1534188161088_0019
> 2018-08-15 14:39:39,858 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Waiting for the cluster to be allocated
> 2018-08-15 14:39:39,859 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   - Deploying cluster, current state ACCEPTED
> 2018-08-15 14:39:47,733 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>   

Re: docker, error NoResourceAvailableException..

2018-08-15 Thread Esteban Serrano
You can also instead of defining 2 services (taskmanager and taskmanager1),
set the scale parameter on taskmanager to the number of desired slots.
Something like this:

taskmanager:
  image: "${FLINK_DOCKER_IMAGE:-flink:1.5.2}"
  scale: 2
  expose:
- "6121"
- "6122"
- "8081"


On Wed, Aug 15, 2018 at 1:53 PM shyla deshpande 
wrote:

> Thanks Dominik, I will try that.
>
> On Wed, Aug 15, 2018 at 3:10 AM, Dominik Wosiński 
> wrote:
>
>> Hey,
>> The problem is that your command does start Job Manager container, but it
>> does not start the Task Manager . That is why you have 0 slots. Currently,
>> the default *numberOfTaskSlots* is set to the number of CPUs avaialbe on
>> the machine.
>>
>>
>> So, You generally can to do 2 things:
>>
>>
>> 1) Start Job Manager and 2 Task Managers. If you have Docker Compose
>> available, you can paste this to your *docker-compose.yml* :
>>
>>
>>
>> *services:  jobmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>>
>> *expose:  *- "6123"
>>
>> *ports:  *- "8081:8081"
>> *command: *jobmanager
>>
>> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>>
>>
>> *taskmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>>
>> *expose:  *- "6121"
>>   - "6122"
>>
>> *depends_on:  *- jobmanager
>> *command: *taskmanager
>>
>> *links:  *- "jobmanager:jobmanager"
>>
>> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>>
>>
>> *taskmanager1:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>>
>> *expose:  *- "6190"
>>   - "6120"
>>
>> *depends_on:  *- jobmanager
>> *command: *taskmanager
>>
>> *links:  *- "jobmanager:jobmanager"
>>
>> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>>
>>
>>
>> This will give you 1 Job Manager and 2 Task Managers with one task slot
>> each, so 2 Task slots in general.
>>
>> 2) You can deploy 1 Job Manager and 1 Task Manager.Then you need to
>> modify *flink-conf.yml* and set the following setting :
>>
>> *taskmanager.numberOfTaskSlots: *2
>>
>>
>> This will give you 2 Task Slots with only 1 Task Manager. But you will
>> need to somehow override config in the container, possibly using :
>> https://docs.docker.com/storage/volumes/
>>
>> Regards,
>> Dominik.
>>
>> *Od: *shyla deshpande 
>> *Wysłano: *środa, 15 sierpnia 2018 07:23
>> *Do: *user 
>> *Temat: *docker, error NoResourceAvailableException..
>>
>>
>>
>> Hello all,
>>
>>
>>
>> Trying to use docker as a single node flink cluster.
>>
>>
>>
>> docker run --name flink_local -p 8081:8081 -t flink local
>>
>>
>>
>> I submited a job to the cluster using the Web UI. The job failed. I see
>> this error message in the docker logs.
>>
>>
>>
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 2, slots allocated: 0
>>
>>
>>
>> The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard.
>>
>> How do I start the docker with 2 Task slots?
>>
>>
>>
>> Appreciate any help.
>>
>>
>>
>> Thanks
>>
>>
>>
>
>


Re: docker, error NoResourceAvailableException..

2018-08-15 Thread shyla deshpande
Thanks Dominik, I will try that.

On Wed, Aug 15, 2018 at 3:10 AM, Dominik Wosiński  wrote:

> Hey,
> The problem is that your command does start Job Manager container, but it
> does not start the Task Manager . That is why you have 0 slots. Currently,
> the default *numberOfTaskSlots* is set to the number of CPUs avaialbe on
> the machine.
>
>
> So, You generally can to do 2 things:
>
>
> 1) Start Job Manager and 2 Task Managers. If you have Docker Compose
> available, you can paste this to your *docker-compose.yml* :
>
>
>
> *services:  jobmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>
> *expose:  *- "6123"
>
> *ports:  *- "8081:8081"
> *command: *jobmanager
>
> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>
>
> *taskmanager:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>
> *expose:  *- "6121"
>   - "6122"
>
> *depends_on:  *- jobmanager
> *command: *taskmanager
>
> *links:  *- "jobmanager:jobmanager"
>
> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>
>
> *taskmanager1:image: *${FLINK_DOCKER_IMAGE_NAME:-flink}
>
> *expose:  *- "6190"
>   - "6120"
>
> *depends_on:  *- jobmanager
> *command: *taskmanager
>
> *links:  *- "jobmanager:jobmanager"
>
> *environment:  *- JOB_MANAGER_RPC_ADDRESS=jobmanager
>
>
>
> This will give you 1 Job Manager and 2 Task Managers with one task slot
> each, so 2 Task slots in general.
>
> 2) You can deploy 1 Job Manager and 1 Task Manager.Then you need to modify
> *flink-conf.yml* and set the following setting :
>
> *taskmanager.numberOfTaskSlots: *2
>
>
> This will give you 2 Task Slots with only 1 Task Manager. But you will
> need to somehow override config in the container, possibly using :
> https://docs.docker.com/storage/volumes/
>
> Regards,
> Dominik.
>
> *Od: *shyla deshpande 
> *Wysłano: *środa, 15 sierpnia 2018 07:23
> *Do: *user 
> *Temat: *docker, error NoResourceAvailableException..
>
>
>
> Hello all,
>
>
>
> Trying to use docker as a single node flink cluster.
>
>
>
> docker run --name flink_local -p 8081:8081 -t flink local
>
>
>
> I submited a job to the cluster using the Web UI. The job failed. I see
> this error message in the docker logs.
>
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 2, slots allocated: 0
>
>
>
> The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard.
>
> How do I start the docker with 2 Task slots?
>
>
>
> Appreciate any help.
>
>
>
> Thanks
>
>
>


Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Xingcan Cui
Hi Averell,

With the CoProcessFunction, you could get access to the time-related services 
which may be useful when maintaining the elements in Stream_C and you could get 
rid of type casting with the Either class.

Best,
Xingcan

> On Aug 15, 2018, at 3:27 PM, Averell  wrote:
> 
> Thank you Vino & Xingcan.
> @Vino: could you help explain more details on using DBMS? Would that be with
> using TableAPI, or you meant directly reading DBMS data inside the
> ProcessFunction?
> 
> @Xingcan: I don't know what are the benefits of using CoProcess over
> RichCoFlatMap in this case.
> Regarding using Either wrapper, as my understanding, I would need to use
> that both in my sources (stream_A and B) and in the
> CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
> convenient, wouldn't it?
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Scala 2.12 Support

2018-08-15 Thread Aaron Levin
Hello!

I'm wondering if there is anywhere I can see Flink's roadmap for Scala 2.12
support. The last email I can find on the list for this was back in
January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support,
hasn't been updated in a few months.

Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and
from what I can gather this was one of the main barrier for Flink
supporting Scala 2.12. Given this has been fixed, is there work in progress
to support Scala 2.12? Any updates on FLINK-7811?

Thanks for your help!

[0] https://issues.apache.org/jira/browse/FLINK-7811
[1] https://issues.apache.org/jira/browse/SPARK-14540

Best,

Aaron Levin


Re: watermark does not progress

2018-08-15 Thread Hequn Cheng
Hi John,

I guess the source data of local are different from the cluster. And as
Fabian said, it is probably that some partitions don't carry data.
As a choice, you can set job parallelism to 1 and check the result.

Best, Hequn

On Wed, Aug 15, 2018 at 5:22 PM, John O  wrote:

> I did some more testing.
>
> Below is a pseudo version of by setup.
>
> kafkaconsumer->
> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
> process(print1 ctx.timerService().currentWatermark()) ->
> keyBy(_.someProp) ->
> process(print2 ctx.timerService().currentWatermark()) ->
>
> I am manually sending monotonically increasing (eventtime ) records to
> kafka topic.
>
> What I see is in print1 I see expected watermark
>
> But print2 is always Long.MIN
>
> It looks like keyBy wipes out the watermark.
>
>
>
> Now, if I run the exact same code on a flink cluster, print2 outputs
> expected watermark.
>
>
>
> Jo
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Wednesday, August 15, 2018 2:07 AM
> *To:* vino yang 
> *Cc:* John O ; user 
> *Subject:* Re: watermark does not progress
>
>
>
> Hi John,
>
>
>
> Watermarks cannot make progress if you have stream partitions that do not
> carry any data.
>
> What kind of source are you using?
>
>
>
> Best,
>
> Fabian
>
>
>
> 2018-08-15 4:25 GMT+02:00 vino yang :
>
> Hi Johe,
>
>
>
> In local mode, it should also work.
>
> When you debug, you can set a breakpoint in the getCurrentWatermark method
> to see if you can enter the method and if the behavior is what you expect.
>
> What is your source? If you post your code, it might be easier to locate.
>
> In addition, for positioning watermark, you can also refer to this
> email[1].
>
>
>
> [1]: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Debugging-watermarks-td7065.html
>
>
>
> Thanks, vino.
>
>
>
> John O  于2018年8月15日周三 上午9:44写道:
>
> I am noticing that watermark does not progress as expected when running
> locally in IDE. It just stays at Long.MIN
>
>
>
> I am using EventTime processing and have tried both these time extractors.
>
> · assignAscendingTimestamps ...
>
> · 
> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)
> ...
>
>
>
> Also, configured the environment as so
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
> If I run the job on a flink cluster, I do see the watermark progress.
>
>
>
> Is watermarking not supported in local mode?
>
>
>
> Thanks
>
> Jo
>
>
>


Re: Stream collector serialization performance

2018-08-15 Thread Timo Walther

Hi Mingliang,

first of all the POJO serializer is not very performant. Tuple or Row 
are better. If you want to improve the performance of a collect() 
between operators, you could also enable object reuse. You can read more 
about this here [1] (section "Issue 2: Object Reuse"), but make sure 
your implementation is correct because an operator could modify the 
objects of follwing operators.


I hope this helps.

Regards,
Timo

[1] 
https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime



Am 15.08.18 um 09:06 schrieb 祁明良:

Hi all,

I’m currently using the keyed process function, I see there’s serialization 
happening when I collect the object / update the object to rocksdb. For me the 
performance of serialization seems to be the bottleneck.
By default, POJO serializer is used, and the timecost of collect / update to 
rocksdb is roughly 1:1, Then I switch to kryo by setting 
getConfig.enableForceKryo(). Now the timecost of update to rocksdb decreases 
significantly to roughly 0.3, but the collect method seems not improving. Can 
someone help to explain this?

  My Object looks somehow like this:

Class A {
String f1 // 20 * string fields
List f2. // 20 * list of another POJO object
Int f3 // 20 * ints fields
}
Class B {
String f // 5 * string fields
}

Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.





Re: 1.5.1

2018-08-15 Thread Gary Yao
Hi Juho,

the main thread of the RPC endpoint should never be blocked. Blocking on
that
thread is considered an implementation error. Unfortunately, without logs it
is difficult to tell what the exact problem is. If you are able to reproduce
heartbeat timeouts on your test staging environment, can you send the full
logs of the ClusterEntryPoint (JobManager) and the ones of the failing
TaskManager? From the log extracts you sent, I cannot really draw any
conclusions.

Best,
Gary


On Wed, Aug 15, 2018 at 10:38 AM, Juho Autio  wrote:

> Thanks Gary..
>
> What could be blocking the RPC threads? Slow checkpointing?
>
> In production we're still using a self-built Flink package 1.5-SNAPSHOT,
> flink commit 8395508b0401353ed07375e22882e7581d46ac0e, and the jobs are
> stable.
>
> Now with 1.5.2 the same jobs are failing due to heartbeat timeouts every
> day. What changed between commit 8395508b0401353ed07375e22882e7581d46ac0e
> & release 1.5.2?
>
> Also, I just tried to run a slightly heavier job. It eventually had some
> heartbeat timeouts, and then this:
>
> 2018-08-15 01:49:58,156 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Source: Kafka (topic1, topic2) -> Filter -> AppIdFilter([topic1,
> topic2]) -> XFilter -> EventMapFilter(AppFilters) (4/8)
> (da6e2ba425fb91316dd05e72e6518b24) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot
> container_1534167926397_0001_01_02_1 was removed.
>
> After that the job tried to restart according to Flink restart strategy
> but that kept failing with this error:
>
> 2018-08-15 02:00:22,000 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - Job X (19bd504d2480ccb2b44d84fb1ef8af68) switched from state
> RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 36, slots allocated: 12
>
> This was repeated until all restart attempts had been used (we've set it
> to 50), and then the job finally failed.
>
> I would like to know also how to prevent Flink from going into such bad
> state. At least it should exit immediately instead of retrying in such a
> situation. And why was "slot container removed"?
>
> On Tue, Aug 14, 2018 at 11:24 PM Gary Yao  wrote:
>
>> Hi Juho,
>>
>> It seems in your case the JobMaster did not receive a heartbeat from the
>> TaskManager in time [1]. Heartbeat requests and answers are sent over the
>> RPC
>> framework, and RPCs of one component (e.g., TaskManager, JobMaster, etc.)
>> are
>> dispatched by a single thread. Therefore, the reasons for heartbeats
>> timeouts
>> include:
>>
>> 1. The RPC threads of the TM or JM are blocked. In this case
>> heartbeat requests or answers cannot be dispatched.
>> 2. The scheduled task for sending the heartbeat requests [2] died.
>> 3. The network is flaky.
>>
>> If you are confident that the network is not the culprit, I would suggest
>> to
>> set the logging level to DEBUG, and look for periodic log messages (JM
>> and TM
>> logs) that are related to heartbeating. If the periodic log messages are
>> overdue, it is a hint that the main thread of the RPC endpoint is blocked
>> somewhere.
>>
>> Best,
>> Gary
>>
>> [1] https://github.com/apache/flink/blob/release-1.5.2/flink-
>> runtime/src/main/java/org/apache/flink/runtime/jobmaster
>> /JobMaster.java#L1611
>> [2] https://github.com/apache/flink/blob/913b0413882939c30da4ad4
>> df0cabc84dfe69ea0/flink-runtime/src/main/java/org/apache/flink/runtime/
>> heartbeat/HeartbeatManagerSenderImpl.java#L64
>>
>> On Mon, Aug 13, 2018 at 9:52 AM, Juho Autio  wrote:
>>
>>> I also have jobs failing on a daily basis with the error "Heartbeat of
>>> TaskManager with id  timed out". I'm using Flink 1.5.2.
>>>
>>> Could anyone suggest how to debug possible causes?
>>>
>>> I already set these in flink-conf.yaml, but I'm still getting failures:
>>> heartbeat.interval: 1
>>> heartbeat.timeout: 10
>>>
>>> Thanks.
>>>
>>> On Sun, Jul 22, 2018 at 2:20 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 According to the UI it seems that "

 org.apache.flink.util.FlinkException: The assigned slot 
 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.

 " was the cause of a pipe restart.

 As to the TM it is an artifact of the new job allocation regime which
 will exhaust all slots on a TM rather then distributing them equitably.
 TMs selectively are under more stress then in a pure RR distribution I
 think. We may have to lower the slots on each TM to define a good upper
 bound. You are correct 50s is a a pretty generous value.

 On Sun, Jul 22, 2018 at 6:55 AM, Gary Yao 
 wrote:

> Hi,
>
> The first exception should be only logged on info level. It's expected
> to see
> this exception when a TaskManager unregisters from the ResourceManager.
>
> 

ODP: docker, error NoResourceAvailableException..

2018-08-15 Thread Dominik Wosiński
Hey, 
The problem is that your command does start Job Manager container, but it does 
not start the Task Manager . That is why you have 0 slots. Currently, the 
default numberOfTaskSlots is set to the number of CPUs avaialbe on the machine.

So, You generally can to do 2 things: 

1) Start Job Manager and 2 Task Managers. If you have Docker Compose available, 
you can paste this to your docker-compose.yml : 
services:
  jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
  - "6123"
ports:
  - "8081:8081"
command: jobmanager
environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
  - "6121"
  - "6122"
depends_on:
  - jobmanager
command: taskmanager
links:
  - "jobmanager:jobmanager"
environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager1:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
  - "6190"
  - "6120"
depends_on:
  - jobmanager
command: taskmanager
links:
  - "jobmanager:jobmanager"
environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager

This will give you 1 Job Manager and 2 Task Managers with one task slot each, 
so 2 Task slots in general.

2) You can deploy 1 Job Manager and 1 Task Manager.Then you need to modify 
flink-conf.yml and set the following setting : 

taskmanager.numberOfTaskSlots: 2

This will give you 2 Task Slots with only 1 Task Manager. But you will need to 
somehow override config in the container, possibly using : 
https://docs.docker.com/storage/volumes/

Regards,
Dominik.
Od: shyla deshpande
Wysłano: środa, 15 sierpnia 2018 07:23
Do: user
Temat: docker, error NoResourceAvailableException..

Hello all,

Trying to use docker as a single node flink cluster.

docker run --name flink_local -p 8081:8081 -t flink local

I submited a job to the cluster using the Web UI. The job failed. I see this 
error message in the docker logs.

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 2, slots allocated: 0

The Web UI, shows 0 taskmanagers and 0 task slots on the Flink dashboard.
How do I start the docker with 2 Task slots?

Appreciate any help.

Thanks



RE: watermark does not progress

2018-08-15 Thread John O
I did some more testing.

Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka 
topic.

What I see is in print1 I see expected watermark
But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

Now, if I run the exact same code on a flink cluster, print2 outputs expected 
watermark.

Jo


From: Fabian Hueske 
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang 
Cc: John O ; user 
Subject: Re: watermark does not progress

Hi John,

Watermarks cannot make progress if you have stream partitions that do not carry 
any data.
What kind of source are you using?

Best,
Fabian

2018-08-15 4:25 GMT+02:00 vino yang 
mailto:yanghua1...@gmail.com>>:
Hi Johe,

In local mode, it should also work.
When you debug, you can set a breakpoint in the getCurrentWatermark method to 
see if you can enter the method and if the behavior is what you expect.
What is your source? If you post your code, it might be easier to locate.
In addition, for positioning watermark, you can also refer to this email[1].

[1]: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Debugging-watermarks-td7065.html

Thanks, vino.

John O mailto:son...@samsung.com>> 于2018年8月15日周三 上午9:44写道:
I am noticing that watermark does not progress as expected when running locally 
in IDE. It just stays at Long.MIN

I am using EventTime processing and have tried both these time extractors.

• assignAscendingTimestamps ...

• 
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

If I run the job on a flink cluster, I do see the watermark progress.

Is watermarking not supported in local mode?

Thanks
Jo



Re: Limit on number of files to read for Dataset

2018-08-15 Thread Fabian Hueske
Hi Darshan,

This looks like a file system configuration issue to me.
Flink supports different file systems for S3 and there are also a few
tuning knobs.

Did you have a look at the docs for file system configuration [1]?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/filesystems.html

2018-08-14 20:45 GMT+02:00 Darshan Singh :

> Thanks for the details. I got it working. I have around 1 directory for
> each month and I am running for 12-15 month data.So I created a dataset
> from each month and did a union.
>
> However, when I run I get the HTTP timeout issue. I am reading more than
> 120K files in total in all of months.
>
> I am using S3 and emr to do this and flink version is 1.4.2. When I run
> for 6 months this works fine.
>
> Below is part of error
>
> Caused by: java.io.IOException: Error opening the Input Split s3://.gz
> [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
> pool
> at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:705)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:477)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:48)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:145)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.handleRetryableException(
> AmazonHttpClient.java:1114)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeWithTimer(
> AmazonHttpClient.java:717)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutionBuilderImpl.
> execute(AmazonHttpClient.java:649)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
> at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:22)
> at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:9)
> at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.
> GlobalS3Executor.execute(GlobalS3Executor.java:80)
> at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> invoke(AmazonS3LiteClient.java:176)
> at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> getObject(AmazonS3LiteClient.java:99)
> at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:452)
> at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:439)
> at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:409)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invokeMethod(RetryInvocationHandler.java:163)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invoke(RetryInvocationHandler.java:155)
> at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(
> RetryInvocationHandler.java:95)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:346)
> at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
> at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(
> S3NativeFileSystem.java:1213)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(
> EmrFileSystem.java:166)
> at 

Re: watermark does not progress

2018-08-15 Thread Fabian Hueske
Hi John,

Watermarks cannot make progress if you have stream partitions that do not
carry any data.
What kind of source are you using?

Best,
Fabian

2018-08-15 4:25 GMT+02:00 vino yang :

> Hi Johe,
>
> In local mode, it should also work.
> When you debug, you can set a breakpoint in the getCurrentWatermark method
> to see if you can enter the method and if the behavior is what you expect.
> What is your source? If you post your code, it might be easier to locate.
> In addition, for positioning watermark, you can also refer to this
> email[1].
>
> [1]: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Debugging-watermarks-td7065.html
>
> Thanks, vino.
>
> John O  于2018年8月15日周三 上午9:44写道:
>
>> I am noticing that watermark does not progress as expected when running
>> locally in IDE. It just stays at Long.MIN
>>
>>
>>
>> I am using EventTime processing and have tried both these time
>> extractors.
>>
>> · assignAscendingTimestamps ...
>>
>> · 
>> assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)
>> ...
>>
>>
>>
>> Also, configured the environment as so
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>>
>>
>> If I run the job on a flink cluster, I do see the watermark progress.
>>
>>
>>
>> Is watermarking not supported in local mode?
>>
>>
>>
>> Thanks
>>
>> Jo
>>
>


Re: 1.5.1

2018-08-15 Thread Juho Autio
Gary, I found another mail thread about similar issue:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Testing-on-Flink-1-5-tp19565p19647.html

Specifically I found this:

> we are observing Akka.ask.timeout error for few of our jobs (JM's
logs[2]), we tried to increase this parameter to 2 min from default 10 s
and still observe the same.

And your answer:

> Can you try setting:
>
> web.timeout: 12
>
> This will increase the timeout of the RPC call to 2 minutes.

Do you think that might fix my problem? Does Flink job execution internally
depend on the Flink REST API to answer quickly? Due to monitoring we're
making quite many calls to the Flink REST API (from outside the cluster),
which could be blocking other requests.

Looks like the default is just 1 according to
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#web-timeout

Cheers,
Juho

On Wed, Aug 15, 2018 at 11:43 AM Juho Autio  wrote:

> Vishal, from which version did you upgrade to 1.5.1? Maybe from 1.5.0
> (release)? Knowing that might help narrowing down the source of this.
>
> On Wed, Aug 15, 2018 at 11:38 AM Juho Autio  wrote:
>
>> Thanks Gary..
>>
>> What could be blocking the RPC threads? Slow checkpointing?
>>
>> In production we're still using a self-built Flink package 1.5-SNAPSHOT,
>> flink commit 8395508b0401353ed07375e22882e7581d46ac0e, and the jobs are
>> stable.
>>
>> Now with 1.5.2 the same jobs are failing due to heartbeat timeouts every
>> day. What changed between commit 8395508b0401353ed07375e22882e7581d46ac0e &
>> release 1.5.2?
>>
>> Also, I just tried to run a slightly heavier job. It eventually had some
>> heartbeat timeouts, and then this:
>>
>> 2018-08-15 01:49:58,156 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> Kafka (topic1, topic2) -> Filter -> AppIdFilter([topic1, topic2]) ->
>> XFilter -> EventMapFilter(AppFilters) (4/8)
>> (da6e2ba425fb91316dd05e72e6518b24) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkException: The assigned slot
>> container_1534167926397_0001_01_02_1 was removed.
>>
>> After that the job tried to restart according to Flink restart strategy
>> but that kept failing with this error:
>>
>> 2018-08-15 02:00:22,000 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job X
>> (19bd504d2480ccb2b44d84fb1ef8af68) switched from state RUNNING to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Could not allocate all requires slots within timeout of 30 ms. Slots
>> required: 36, slots allocated: 12
>>
>> This was repeated until all restart attempts had been used (we've set it
>> to 50), and then the job finally failed.
>>
>> I would like to know also how to prevent Flink from going into such bad
>> state. At least it should exit immediately instead of retrying in such a
>> situation. And why was "slot container removed"?
>>
>> On Tue, Aug 14, 2018 at 11:24 PM Gary Yao  wrote:
>>
>>> Hi Juho,
>>>
>>> It seems in your case the JobMaster did not receive a heartbeat from the
>>> TaskManager in time [1]. Heartbeat requests and answers are sent over
>>> the RPC
>>> framework, and RPCs of one component (e.g., TaskManager, JobMaster,
>>> etc.) are
>>> dispatched by a single thread. Therefore, the reasons for heartbeats
>>> timeouts
>>> include:
>>>
>>> 1. The RPC threads of the TM or JM are blocked. In this case
>>> heartbeat requests or answers cannot be dispatched.
>>> 2. The scheduled task for sending the heartbeat requests [2] died.
>>> 3. The network is flaky.
>>>
>>> If you are confident that the network is not the culprit, I would
>>> suggest to
>>> set the logging level to DEBUG, and look for periodic log messages (JM
>>> and TM
>>> logs) that are related to heartbeating. If the periodic log messages are
>>> overdue, it is a hint that the main thread of the RPC endpoint is blocked
>>> somewhere.
>>>
>>> Best,
>>> Gary
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.5.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1611
>>> [2]
>>> https://github.com/apache/flink/blob/913b0413882939c30da4ad4df0cabc84dfe69ea0/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java#L64
>>>
>>> On Mon, Aug 13, 2018 at 9:52 AM, Juho Autio 
>>> wrote:
>>>
 I also have jobs failing on a daily basis with the error "Heartbeat of
 TaskManager with id  timed out". I'm using Flink 1.5.2.

 Could anyone suggest how to debug possible causes?

 I already set these in flink-conf.yaml, but I'm still getting failures:
 heartbeat.interval: 1
 heartbeat.timeout: 10

 Thanks.

 On Sun, Jul 22, 2018 at 2:20 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> According to the UI it seems that "
>
> org.apache.flink.util.FlinkException: The assigned slot 
> 

Re: 1.5.1

2018-08-15 Thread Juho Autio
Vishal, from which version did you upgrade to 1.5.1? Maybe from 1.5.0
(release)? Knowing that might help narrowing down the source of this.

On Wed, Aug 15, 2018 at 11:38 AM Juho Autio  wrote:

> Thanks Gary..
>
> What could be blocking the RPC threads? Slow checkpointing?
>
> In production we're still using a self-built Flink package 1.5-SNAPSHOT,
> flink commit 8395508b0401353ed07375e22882e7581d46ac0e, and the jobs are
> stable.
>
> Now with 1.5.2 the same jobs are failing due to heartbeat timeouts every
> day. What changed between commit 8395508b0401353ed07375e22882e7581d46ac0e &
> release 1.5.2?
>
> Also, I just tried to run a slightly heavier job. It eventually had some
> heartbeat timeouts, and then this:
>
> 2018-08-15 01:49:58,156 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Kafka (topic1, topic2) -> Filter -> AppIdFilter([topic1, topic2]) ->
> XFilter -> EventMapFilter(AppFilters) (4/8)
> (da6e2ba425fb91316dd05e72e6518b24) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The assigned slot
> container_1534167926397_0001_01_02_1 was removed.
>
> After that the job tried to restart according to Flink restart strategy
> but that kept failing with this error:
>
> 2018-08-15 02:00:22,000 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job X
> (19bd504d2480ccb2b44d84fb1ef8af68) switched from state RUNNING to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 36, slots allocated: 12
>
> This was repeated until all restart attempts had been used (we've set it
> to 50), and then the job finally failed.
>
> I would like to know also how to prevent Flink from going into such bad
> state. At least it should exit immediately instead of retrying in such a
> situation. And why was "slot container removed"?
>
> On Tue, Aug 14, 2018 at 11:24 PM Gary Yao  wrote:
>
>> Hi Juho,
>>
>> It seems in your case the JobMaster did not receive a heartbeat from the
>> TaskManager in time [1]. Heartbeat requests and answers are sent over the
>> RPC
>> framework, and RPCs of one component (e.g., TaskManager, JobMaster, etc.)
>> are
>> dispatched by a single thread. Therefore, the reasons for heartbeats
>> timeouts
>> include:
>>
>> 1. The RPC threads of the TM or JM are blocked. In this case
>> heartbeat requests or answers cannot be dispatched.
>> 2. The scheduled task for sending the heartbeat requests [2] died.
>> 3. The network is flaky.
>>
>> If you are confident that the network is not the culprit, I would suggest
>> to
>> set the logging level to DEBUG, and look for periodic log messages (JM
>> and TM
>> logs) that are related to heartbeating. If the periodic log messages are
>> overdue, it is a hint that the main thread of the RPC endpoint is blocked
>> somewhere.
>>
>> Best,
>> Gary
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.5.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1611
>> [2]
>> https://github.com/apache/flink/blob/913b0413882939c30da4ad4df0cabc84dfe69ea0/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java#L64
>>
>> On Mon, Aug 13, 2018 at 9:52 AM, Juho Autio  wrote:
>>
>>> I also have jobs failing on a daily basis with the error "Heartbeat of
>>> TaskManager with id  timed out". I'm using Flink 1.5.2.
>>>
>>> Could anyone suggest how to debug possible causes?
>>>
>>> I already set these in flink-conf.yaml, but I'm still getting failures:
>>> heartbeat.interval: 1
>>> heartbeat.timeout: 10
>>>
>>> Thanks.
>>>
>>> On Sun, Jul 22, 2018 at 2:20 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 According to the UI it seems that "

 org.apache.flink.util.FlinkException: The assigned slot 
 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.

 " was the cause of a pipe restart.

 As to the TM it is an artifact of the new job allocation regime which
 will exhaust all slots on a TM rather then distributing them equitably.
 TMs selectively are under more stress then in a pure RR distribution I
 think. We may have to lower the slots on each TM to define a good upper
 bound. You are correct 50s is a a pretty generous value.

 On Sun, Jul 22, 2018 at 6:55 AM, Gary Yao 
 wrote:

> Hi,
>
> The first exception should be only logged on info level. It's expected
> to see
> this exception when a TaskManager unregisters from the ResourceManager.
>
> Heartbeats can be configured via heartbeat.interval and
> hearbeat.timeout [1].
> The default timeout is 50s, which should be a generous value. It is
> probably a
> good idea to find out why the heartbeats cannot be answered by the TM.
>
> Best,
> Gary
>
> [1]
> 

Re: 1.5.1

2018-08-15 Thread Juho Autio
Thanks Gary..

What could be blocking the RPC threads? Slow checkpointing?

In production we're still using a self-built Flink package 1.5-SNAPSHOT,
flink commit 8395508b0401353ed07375e22882e7581d46ac0e, and the jobs are
stable.

Now with 1.5.2 the same jobs are failing due to heartbeat timeouts every
day. What changed between commit 8395508b0401353ed07375e22882e7581d46ac0e &
release 1.5.2?

Also, I just tried to run a slightly heavier job. It eventually had some
heartbeat timeouts, and then this:

2018-08-15 01:49:58,156 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Kafka (topic1, topic2) -> Filter -> AppIdFilter([topic1, topic2]) ->
XFilter -> EventMapFilter(AppFilters) (4/8)
(da6e2ba425fb91316dd05e72e6518b24) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The assigned slot
container_1534167926397_0001_01_02_1 was removed.

After that the job tried to restart according to Flink restart strategy but
that kept failing with this error:

2018-08-15 02:00:22,000 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job X
(19bd504d2480ccb2b44d84fb1ef8af68) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 36, slots allocated: 12

This was repeated until all restart attempts had been used (we've set it to
50), and then the job finally failed.

I would like to know also how to prevent Flink from going into such bad
state. At least it should exit immediately instead of retrying in such a
situation. And why was "slot container removed"?

On Tue, Aug 14, 2018 at 11:24 PM Gary Yao  wrote:

> Hi Juho,
>
> It seems in your case the JobMaster did not receive a heartbeat from the
> TaskManager in time [1]. Heartbeat requests and answers are sent over the
> RPC
> framework, and RPCs of one component (e.g., TaskManager, JobMaster, etc.)
> are
> dispatched by a single thread. Therefore, the reasons for heartbeats
> timeouts
> include:
>
> 1. The RPC threads of the TM or JM are blocked. In this case heartbeat
> requests or answers cannot be dispatched.
> 2. The scheduled task for sending the heartbeat requests [2] died.
> 3. The network is flaky.
>
> If you are confident that the network is not the culprit, I would suggest
> to
> set the logging level to DEBUG, and look for periodic log messages (JM and
> TM
> logs) that are related to heartbeating. If the periodic log messages are
> overdue, it is a hint that the main thread of the RPC endpoint is blocked
> somewhere.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/release-1.5.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L1611
> [2]
> https://github.com/apache/flink/blob/913b0413882939c30da4ad4df0cabc84dfe69ea0/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerSenderImpl.java#L64
>
> On Mon, Aug 13, 2018 at 9:52 AM, Juho Autio  wrote:
>
>> I also have jobs failing on a daily basis with the error "Heartbeat of
>> TaskManager with id  timed out". I'm using Flink 1.5.2.
>>
>> Could anyone suggest how to debug possible causes?
>>
>> I already set these in flink-conf.yaml, but I'm still getting failures:
>> heartbeat.interval: 1
>> heartbeat.timeout: 10
>>
>> Thanks.
>>
>> On Sun, Jul 22, 2018 at 2:20 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> According to the UI it seems that "
>>>
>>> org.apache.flink.util.FlinkException: The assigned slot 
>>> 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.
>>>
>>> " was the cause of a pipe restart.
>>>
>>> As to the TM it is an artifact of the new job allocation regime which
>>> will exhaust all slots on a TM rather then distributing them equitably.
>>> TMs selectively are under more stress then in a pure RR distribution I
>>> think. We may have to lower the slots on each TM to define a good upper
>>> bound. You are correct 50s is a a pretty generous value.
>>>
>>> On Sun, Jul 22, 2018 at 6:55 AM, Gary Yao 
>>> wrote:
>>>
 Hi,

 The first exception should be only logged on info level. It's expected
 to see
 this exception when a TaskManager unregisters from the ResourceManager.

 Heartbeats can be configured via heartbeat.interval and
 hearbeat.timeout [1].
 The default timeout is 50s, which should be a generous value. It is
 probably a
 good idea to find out why the heartbeats cannot be answered by the TM.

 Best,
 Gary

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#heartbeat-manager


 On Sun, Jul 22, 2018 at 1:36 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> 2 issues we are seeing on 1.5.1 on a streaming pipe line
>
> org.apache.flink.util.FlinkException: The assigned slot 
> 208af709ef7be2d2dfc028ba3bbf4600_10 was removed.
>
>
> and

Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread vino yang
Hi Averell,

What I mean is that if you store stream_c data in an RDBMS, you can access
the RDBMS directly in the CoFlatMapFunction instead of using the Table API.
This is somewhat similar to stream and dimension table joins.
Of course, the premise of adopting this option is that the amount of data
in stream_c is not particularly large and will not be updated frequently.

Thanks, vino.

Averell  于2018年8月15日周三 下午3:27写道:

> Thank you Vino & Xingcan.
> @Vino: could you help explain more details on using DBMS? Would that be
> with
> using TableAPI, or you meant directly reading DBMS data inside the
> ProcessFunction?
>
> @Xingcan: I don't know what are the benefits of using CoProcess over
> RichCoFlatMap in this case.
> Regarding using Either wrapper, as my understanding, I would need to use
> that both in my sources (stream_A and B) and in the
> CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
> convenient, wouldn't it?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Averell
Thank you Vino & Xingcan.
@Vino: could you help explain more details on using DBMS? Would that be with
using TableAPI, or you meant directly reading DBMS data inside the
ProcessFunction?

@Xingcan: I don't know what are the benefits of using CoProcess over
RichCoFlatMap in this case.
Regarding using Either wrapper, as my understanding, I would need to use
that both in my sources (stream_A and B) and in the
CoProcess/CoFlatMapFunction. Then using a super class Animal would be more
convenient, wouldn't it?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Weekly community update #33

2018-08-15 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #33. Please post any news and
updates you want to share with the community to this thread.

# Flink 1.6.0 has been released

The community released Flink 1.6.0 [1]. Thanks to everyone who made this
release possible.

# Improving tutorials section

Fabian kicked off an effort to improve Flink's tutorial section in the
documentation [2]. If you have some input on how to improve this section,
then please join the discussion.

# Design of interval join APIs

Florian started a discussion on how to design the DataStream API for
interval joins. If you want to see an early preview or participate in the
discussion join this thread [3].

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-6-0-released-td23753.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-Tutorials-section-of-documentation-td23755.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Outer-join-support-and-timestamp-assignment-for-IntervalJoin-td23788.html

Cheers,
Till


Stream collector serialization performance

2018-08-15 Thread 祁明良
Hi all,

I’m currently using the keyed process function, I see there’s serialization 
happening when I collect the object / update the object to rocksdb. For me the 
performance of serialization seems to be the bottleneck.
By default, POJO serializer is used, and the timecost of collect / update to 
rocksdb is roughly 1:1, Then I switch to kryo by setting 
getConfig.enableForceKryo(). Now the timecost of update to rocksdb decreases 
significantly to roughly 0.3, but the collect method seems not improving. Can 
someone help to explain this?

 My Object looks somehow like this:

Class A {
String f1 // 20 * string fields
List f2. // 20 * list of another POJO object
Int f3 // 20 * ints fields
}
Class B {
String f // 5 * string fields
}

Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread Xingcan Cui
Hi Averell,

I am also in favor of option 2. Besides, you could use CoProcessFunction 
instead of CoFlatMapFunction and try to wrap elements of stream_A and stream_B 
using the `Either` class.

Best,
Xingcan

> On Aug 15, 2018, at 2:24 PM, vino yang  wrote:
> 
> Hi Averell,
> 
> As far as these two solutions are concerned, I think you can only choose 
> option 2, because as you have stated, the current Flink DataStream API does 
> not support the replacement of one of the input stream types of 
> CoFlatMapFunction. Another choice:
> 
> 1. Split it into two separate jobs. But in comparison, I still think that 
> Option 2 is better.
> 2. Since you said that stream_c is slower and has fewer updates, if it is not 
> very large, you can store it in the RDBMS and then join it with stream_a and 
> stream_b respectively (using CoFlatMapFunction as well).
> 
> I think you should give priority to your option 2.
> 
> Thanks, vino.
> 
> Averell mailto:lvhu...@gmail.com>> 于2018年8月15日周三 下午1:51写道:
> Hi,
> 
> I have stream_A of type "Dog", which needs to be transformed using data from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
> 
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A. 
> 
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
> 
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams. 
> Is there any better option for me?
> 
> Thank you very much for your support.
> Regards,
> Averell
> 
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 



Re: ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-15 Thread Chesnay Schepler
As seen in the stacktrace every sink added via StreamExEnv#add_source is 
wrapped in a PythonSinkFunction which internally converts things to 
PyObjects, that's why the mapper  had no effect.
Currently we don't differentiate between java/python sinks, contrary to 
sources where we have an explicit StreamExEnv#add_java_source method.


There are 2 ways to approach this issue:
* As alluded in a previous mail, create a python wrapper around the 
kafka consumer class.

* extend PythonDataStream class with a separate method for kafka.

Unfortunately I don't think we can solve this in a generic matter (i.e. 
add_java_source) since the java types wouldn't fit at compile time.


On 15.08.2018 04:15, vino yang wrote:

Hi Joe,

ping Chesnay for you, please wait for the reply.

Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年8月15日周三 
上午7:16写道:


Hi,

I'm trying to write to a Kafka stream in a Flink job using the new
Python streaming API.

My program looks like this:

def main(factory):

 props = Properties()
 props.setProperty("bootstrap.servers",configs['kafkaBroker'])

 consumer = FlinkKafkaConsumer010([configs['kafkaReadTopic']], 
SimpleStringSchema(), props)
 producer = FlinkKafkaProducer010(configs['kafkaWriteTopic'], 
SimpleStringSchema(), props)

 env = factory.get_execution_environment()

 stream = env.add_java_source(consumer)

 stream.output()# this works (prints to a .out file) 
stream.add_sink(producer)# producing to this causes the exception env.execute()

I'm getting a ClassCastException when trying to output to the
FlinkKafkaProducer:

java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to 
java.lang.String
at 
org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36)
at 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48)
at 
org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)


It seems that the Python string isn't getting converted to a
java.lang.String, which should happen automatically in Jython.

I've tried adding a MapFunction that maps each input to
String(input)where String is the constructor for java.lang.String.
This made no difference; I get the same error.

Any ideas?

Thanks,

Joe Malt

Software Engineering Intern
Yelp





Re: CoFlatMapFunction with more than two input streams

2018-08-15 Thread vino yang
Hi Averell,

As far as these two solutions are concerned, I think you can only choose
option 2, because as you have stated, the current Flink DataStream API does
not support the replacement of one of the input stream types of
CoFlatMapFunction. Another choice:

1. Split it into two separate jobs. But in comparison, I still think that
Option 2 is better.
2. Since you said that stream_c is slower and has fewer updates, if it is
not very large, you can store it in the RDBMS and then join it with
stream_a and stream_b respectively (using CoFlatMapFunction as well).

I think you should give priority to your option 2.

Thanks, vino.

Averell  于2018年8月15日周三 下午1:51写道:

> Hi,
>
> I have stream_A of type "Dog", which needs to be transformed using data
> from
> stream_C of type "Name_Mapping". As stream_C is a slow one (data is not
> being updated frequently), to do the transformation I connect two streams,
> do a keyBy, and then use a RichCoFlatMapFunction in which mapping data from
> stream_C is saved into a State (flatMap1 generates 1 output, while flatMap2
> is just to update State table, not generating any output).
>
> Now I have another stream B of type "Cat", which also needs to be
> transformed using data from stream_C. After that transformation,
> transformed_B will go through a completely different pipeline from
> transformed A.
>
> I can see two approaches for this:
> 1. duplicate stream_C and the RichCoFlatMapFunction and apply on stream_B
> 2. create a new stream D of type "Animal", transform it with C, then split
> the result into two streams using split/select using case class pattern
> matching.
>
> My question is which option should I choose?
> With option 1, at least I need to maintain two State tables, let alone the
> cost for duplicating stream (I am not sure how expensive this is in term of
> resource), and the requirement on duplicating the CoFlatMapFunction (*).
> With option 2, there's additional cost coming from unioning,
> splitting/selecting, and type-casting at the final streams.
> Is there any better option for me?
>
> Thank you very much for your support.
> Regards,
> Averell
>
> (*) I am using Scala, and I tried to create a RichCoFlatMapFunction of type
> [Animal, Name_Mapping] but it cannot be used for a stream of [Dog,
> Name_Mapping] or [Cat, Name_Mapping]. Thus I needed to duplicate the
> Function as well.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>