Re: Classloading issues after changing to 1.4

2018-04-14 Thread Ken Krugler
When we transitioned from 1.3 to 1.4, we ran into some class loader issues.

Though we weren’t using any sophisticated class loader helicopter stunts :)

Specifically…

1. Re-worked our pom.xml to set up shading to better mirror what the 1.4 
example pom was doing.

2. Enabled child-first classloading

3. Ensured “hadoop classpath” command returned nothing (or failed), to avoid 
loading Hadoop jars before our jars (even with #2 above)

— Ken




> On Apr 13, 2018, at 2:28 AM, eSKa  wrote:
> 
> Hello,
> I still have problem after upgrading from flink 1.3.1 to 1.4.2 
> Our scenario looks like that:
> we have container running on top of yarn. Machine that starts it has
> installed flink and also loading some classpath libraries (e.g. hadoop) into
> container.
> there is seperate rest service that gets requests to run export job - it
> uses YarnClusterClient and submitting Packaged program. Jars with actual
> flink jobs are located in lib/ directory of service. On the machine where
> Spring service is deployed we don't have flink installed. 
> For version 1.3 we had some libraries also loaded to container so that they
> wont have to be loaded dynamically every time. If I understand it correctly
> in strategy child-first it should not be needed any more, right?
> 
> Now our actual problems started to come up linked with class loading. After
> restarting rest service first trigger of job is working fine, but next ones
> are complaining on class versions that are loaded. We found out that
> PackagedProgram is creating new classLoader on every creation. So we
> overriden that behaviour so that we have static map holding one classloader
> per jarFileName:
> 
> 
>/private static final Map classLoaders =
> Maps.newHashMap();
>...
>(constructor) {
>   ...
>   classLoaders.computeIfAbsent(jarFile.getName(),
>s -> getUserClassLoaderChildFirst(getAllLibraries(),
> classPaths, getClass().getClassLoader()));
> 
>userCodeClassLoader = classLoaders.get(jarFile.getName());
>this.mainClass = loadMainClass(entryPointClassName,
> userCodeClassLoader);
> 
>}
> /
> 
> I don't know if that is a good direction, but seems to solve an issue for
> now. We are just not sure about stability of this solution - still tesing on
> our internal environment but I'm affraid for now to proceed on production.
> Can you give us any other things we could try out to deal with loading?
> 
> 
> Also PackagedProgram is still using parentFirst strategy, in JobWithJars you
> have method:
> /
> 
>   public static ClassLoader buildUserCodeClassLoader(List jars,
> List classpaths, ClassLoader parent) {
>   URL[] urls = new URL[jars.size() + classpaths.size()];
>   for (int i = 0; i < jars.size(); i++) {
>   urls[i] = jars.get(i);
>   }
>   for (int i = 0; i < classpaths.size(); i++) {
>   urls[i + jars.size()] = classpaths.get(i);
>   }
>   return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
>   }
> /
> 
> is that correct to still point to parent?
> 
> In some of issues I found in mailing list, you suggest to set up container
> to parent-first as a solving issue. We would like to find proper solution
> working on supported child-first path and don't use workaround fix.
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


http://about.me/kkrugler
+1 530-210-6378



Re: Complexity of Flink

2018-04-14 Thread Jörn Franke
That is complexity of the source code, this is easy to obtain, just fork it on 
github and send it through sonarqube or codacy cloud. I am not sure if this is 
not done already by the flink project. For my open source libraries 
(hadoopcryptoledger and hadoopoffice) that also provide Flink modules I do this.

> On 14. Apr 2018, at 14:17, Esa Heikkinen  wrote:
> 
> Yes, you are right.
> 
> But if I only focus to a statistical complexity of sources of Flink ? E.g. 
> number of libraries, functions/classes/methods, number and size of source 
> files and so on ?
> How easily it is to get this information ?
> 
> Best, Esa
> 
> -Original Message-
> From: Jörn Franke  
> Sent: Saturday, April 14, 2018 1:43 PM
> To: Esa Heikkinen 
> Cc: user@flink.apache.org
> Subject: Re: Complexity of Flink
> 
> I think this always depends. I found Flink more clean compared to other Big 
> Data platforms and with some experience it is rather easy to deploy.
> 
> However how do you measure complexity? How do you plan to cater for  other 
> components (eg deploy in the cloud, deploy locally in a Hadoop cluster etc).
> Then how do you take into account experience of the team leader and people 
> deploying it, issues with unqualified external service providers, contracts 
> etc?
> 
> Those are the variables that you need to define and then validate (case study 
> and/or survey).
> 
>> On 14. Apr 2018, at 12:24, Esa Heikkinen  wrote:
>> 
>> 
>> Hi
>> 
>> I am writing a scientific article, that is related to deployment of Flink.
>> 
>> I would be very interesting to know, how to measure a complexity of Flink 
>> platform or framework ?
>> 
>> Does anyone know a good articles about that ?
>> 
>> I think it is not always so simple to deploy and use..
>> 
>> Best, Esa
>> 
>> 


RE: Complexity of Flink

2018-04-14 Thread Esa Heikkinen
Yes, you are right.

But if I only focus to a statistical complexity of sources of Flink ? E.g. 
number of libraries, functions/classes/methods, number and size of source files 
and so on ?
How easily it is to get this information ?

Best, Esa

-Original Message-
From: Jörn Franke  
Sent: Saturday, April 14, 2018 1:43 PM
To: Esa Heikkinen 
Cc: user@flink.apache.org
Subject: Re: Complexity of Flink

I think this always depends. I found Flink more clean compared to other Big 
Data platforms and with some experience it is rather easy to deploy.

However how do you measure complexity? How do you plan to cater for  other 
components (eg deploy in the cloud, deploy locally in a Hadoop cluster etc).
Then how do you take into account experience of the team leader and people 
deploying it, issues with unqualified external service providers, contracts etc?

Those are the variables that you need to define and then validate (case study 
and/or survey).

> On 14. Apr 2018, at 12:24, Esa Heikkinen  wrote:
> 
> 
> Hi
> 
> I am writing a scientific article, that is related to deployment of Flink.
> 
> I would be very interesting to know, how to measure a complexity of Flink 
> platform or framework ?
> 
> Does anyone know a good articles about that ?
> 
> I think it is not always so simple to deploy and use..
> 
> Best, Esa
> 
> 


Re: Complexity of Flink

2018-04-14 Thread Jörn Franke
I think this always depends. I found Flink more clean compared to other Big 
Data platforms and with some experience it is rather easy to deploy.

However how do you measure complexity? How do you plan to cater for  other 
components (eg deploy in the cloud, deploy locally in a Hadoop cluster etc).
Then how do you take into account experience of the team leader and people 
deploying it, issues with unqualified external service providers, contracts etc?

Those are the variables that you need to define and then validate (case study 
and/or survey).

> On 14. Apr 2018, at 12:24, Esa Heikkinen  wrote:
> 
> 
> Hi
> 
> I am writing a scientific article, that is related to deployment of Flink.
> 
> I would be very interesting to know, how to measure a complexity of Flink 
> platform or framework ?
> 
> Does anyone know a good articles about that ?
> 
> I think it is not always so simple to deploy and use..
> 
> Best, Esa
> 
> 


Complexity of Flink

2018-04-14 Thread Esa Heikkinen


Hi

I am writing a scientific article, that is related to deployment of Flink.

I would be very interesting to know, how to measure a complexity of 
Flink platform or framework ?


Does anyone know a good articles about that ?

I think it is not always so simple to deploy and use..

Best, Esa




Re: Json KAFKA producer

2018-04-14 Thread Fabian Hueske
Hi,

SerializationSchema is a public interface that you can implement.
It has a single method to turn an object into a byte array.

I would suggest to implement your own SerializationSchema.

Best, Fabian

2018-04-11 15:56 GMT+02:00 Luigi Sgaglione :

> Hi,
>
> I'm trying to create a Flink example with kafka consumer and producer
> using Json data format. In particular, I'm able to consume and process Json
> data published on a Kafka topic, but not to publish the results.
>
> The problem is that I don't know what is the serialization schema that
> should be used to publish an ObjectNode (Jackson).
>
>
> This is an excerpt of my test code
>
> import org.apache.flink.shaded.jackson2.com.fasterxml.
> jackson.databind.node.ObjectNode;
> 
>
> FlinkKafkaConsumer011 myConsumer = new
> FlinkKafkaConsumer011<>("test2", new JSONDeserializationSchema(),
> properties);
> myConsumer.setStartFromEarliest();
>
> DataStreamSource stream = env.addSource(myConsumer);
> SingleOutputStreamOperator out1 = stream.filter(new
> FilterFunction() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public boolean filter(ObjectNode arg0) throws Exception {
> String temp=arg0.get("value").asText();
> return (!temp.equals("1"));
> }
> });
> FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<
> ObjectNode>("192.168.112.128:9092", "flinkOut", *XXX*);
> out1.addsink(producer);
>
> Can you help me to understand how I can publish an ObjectNode?
>
> Thanks
>
>
>