Hi Josh, do you maybe want to open an issue for that and contribute your fix for that?
Cheers, Aljoscha On Fri, 17 Jun 2016 at 17:49 Josh <jof...@gmail.com> wrote: > Hi Aljoscha, > > Thanks! It looks like you're right. I've ran it with the FsStateBackend > and everything works fine. > > I've also got it working with RocksDBStateBackend now, by rebuilding Flink > master with: > - the verify step in FsStateBackend skipped for URIs with s3 schemes. > - the initialisation of filesystem in the constructor commented out (not > sure why this is initialised in the constructor, since it seems to get > initialised later anyway) > > Josh > > > > On Fri, Jun 17, 2016 at 2:53 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> I think the problem with the missing Class >> com.amazon.ws.emr.hadoop.fs.EmrFileSystem is not specific to RocksDB. The >> exception is thrown in the FsStateBackend, which is internally used by the >> RocksDB backend to do snapshotting of non-partitioned state. The problem is >> that the FsStateBackend tries to verify that the checkpoint path exists in >> the constructor. The constructor is invoked in the client program, when not >> running in the Yarn context where the correct jars that hold the EMR >> FileSystem classes are available. This should be causing the exception. >> >> Just to verify, could you maybe run it with the FsStateBackend to see if >> you get the same exception. If yes, then we need to remove the verify step >> in the FsStateBackend or at least provide a way to bypass these steps. >> >> Cheers, >> Aljoscha >> >> On Fri, 17 Jun 2016 at 15:40 Josh <jof...@gmail.com> wrote: >> >>> I found that I can still write to s3, using my Flink build of >>> 1.1-SNAPSHOT, for example if I run the word count example: >>> ./bin/flink run ./examples/batch/WordCount.jar --input >>> hdfs:///tmp/LICENSE --output s3://permutive-flink/wordcount-result.txt >>> >>> This works fine - it's just the RocksDBStateBackend which is erroring >>> with the s3 URI. I'm wondering if it could be an issue with >>> RocksDBStateBackend? >>> >>> >>> On Fri, Jun 17, 2016 at 12:09 PM, Josh <jof...@gmail.com> wrote: >>> >>>> Hi Gordon/Fabian, >>>> >>>> Thanks for helping with this! Downgrading the Maven version I was using >>>> to build Flink appears to have fixed that problem - I was using Maven 3.3.3 >>>> before and have downgraded to 3.2.5. >>>> >>>> Just for reference, I printed the loaded class at runtime and found >>>> that when I was using Flink built with Maven 3.3.3, it was pulling in: >>>> >>>> jar:file:/opt/flink/flink-1.1-SNAPSHOT/lib/flink-dist_2.11-1.1-SNAPSHOT.jar!/org/apache/http/params/HttpConnectionParams.class >>>> But after building with the older Maven version, it pulled in the class >>>> from my jar: >>>> >>>> jar:file:/tmp/my-assembly-1.0.jar!/org/apache/http/params/HttpConnectionParams.class >>>> >>>> >>>> Unfortunately now that problem is fixed I've now got a different >>>> classpath issue. It started with: >>>> >>>> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class >>>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found >>>> at >>>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) >>>> at >>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) >>>> at >>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) >>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) >>>> at >>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:383) >>>> at >>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:175) >>>> at >>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.<init>(FsStateBackend.java:144) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:205) >>>> >>>> This is strange because I used an s3:// checkpoint directory when >>>> running Flink 1.0.3 on EMR and it worked fine. (according to >>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency >>>> no configuration should be needed to use S3 when running on EMR). >>>> >>>> Anyway I tried executing /etc/hadoop/conf/hadoop-env.sh before running >>>> my job, as this sets up the HADOOP_CLASSPATH env var. The exception then >>>> changed to: >>>> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/common/Abortable >>>> >>>> I found that this class is related to a jar called s3-dist-cp, so then >>>> I tried copying that jar to Flink's lib directory from >>>> /usr/share/aws/emr/s3-dist-cp/lib/* >>>> >>>> And now I'm back to another Kinesis connector classpath error: >>>> >>>> java.lang.NoClassDefFoundError: >>>> org/apache/http/conn/ssl/SSLSocketFactory >>>> at >>>> com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:136) >>>> at >>>> com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:120) >>>> at >>>> com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:157) >>>> at >>>> com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:137) >>>> at >>>> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:76) >>>> at >>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:166) >>>> at >>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:140) >>>> at >>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:123) >>>> >>>> I guess this is related to me adding a bunch of extra stuff to the >>>> classpath in an attempt to solve the EmrFileSystem error. Any ideas what >>>> caused that error in the first place? >>>> >>>> By the way, I built Flink with: >>>> mvn clean install -Pinclude-kinesis,vendor-repos -DskipTests >>>> -Dhadoop.version=2.7.1 >>>> >>>> Josh >>>> >>>> On Fri, Jun 17, 2016 at 9:56 AM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Josh, >>>>> >>>>> I assume that you build the SNAPSHOT version yourself. I had similar >>>>> version conflicts for Apache HttpCore with Flink SNAPSHOT versions on EMR. >>>>> The problem is cause by a changed behavior in Maven 3.3 and following >>>>> versions. >>>>> Due to these changes, the dependency shading is not working correctly. >>>>> That's why we use Maven 3.2 to build the Flink release artifacts. >>>>> >>>>> Can you check whether you used Maven 3.3 and try to downgrade to 3.2 >>>>> if that was the case? >>>>> >>>>> Cheers, Fabian >>>>> >>>>> 2016-06-17 8:12 GMT+02:00 Tai Gordon <tzuli...@gmail.com>: >>>>> >>>>>> Hi Josh, >>>>>> >>>>>> I’m looking into the problem. Seems like the connector is somehow >>>>>> using older versions of httpclient. >>>>>> Can you print the loaded class path at runtime, and check the path & >>>>>> version of the loaded httpclient / httpcore dependency? >>>>>> i.e. `classOf[HttpConnectionParams].getResource(" >>>>>> HttpConnectionParams.class").toString` >>>>>> >>>>>> Also, on which commit was your kinesis connector built? >>>>>> >>>>>> Regards, >>>>>> Gordon >>>>>> >>>>>> >>>>>> On June 17, 2016 at 1:08:37 AM, Josh (jof...@gmail.com) wrote: >>>>>> >>>>>> Hey, >>>>>> >>>>>> I've been running the Kinesis connector successfully now for a couple >>>>>> of weeks, on a Flink cluster running Flink 1.0.3 on EMR 2.7.1/YARN. >>>>>> >>>>>> Today I've been trying to get it working on a cluster running the >>>>>> current Flink master (1.1-SNAPSHOT) but am running into a classpath issue >>>>>> when starting the job. This only happens when running on EMR/YARN (it's >>>>>> fine when running 1.1-SNAPSHOT locally, and when running 1.0.3 on EMR) >>>>>> >>>>>> ---- >>>>>> The program finished with the following exception: >>>>>> >>>>>> java.lang.NoSuchMethodError: >>>>>> org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V >>>>>> at >>>>>> com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) >>>>>> at >>>>>> com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:187) >>>>>> at >>>>>> com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:136) >>>>>> at >>>>>> com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:120) >>>>>> at >>>>>> com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:157) >>>>>> at >>>>>> com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:137) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:76) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:166) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:140) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.<init>(FlinkKinesisConsumer.java:123) >>>>>> --- >>>>>> >>>>>> Any ideas what's going on? >>>>>> >>>>>> The job I'm deploying has httpclient 4.3.6 and httpcore 4.3.3 which I >>>>>> believe are the libraries with the HttpConnectionParams class. >>>>>> >>>>>> Thanks, >>>>>> Josh >>>>>> >>>>>> >>>>> >>>> >>> >