I will try to add more details to make the problem more explanatory:

- the whole pipeline is executed twice as i see it. There are 2 calls to ES
to create an index, thats how I m verifying that pipeline is being run
twice.
- the log file is converted to txt format easy to open in browser only.
>From the log file i figured out that 2 calls are being made to ES.

The structure of the code is smth like this as shown in attached code.java

The way I run the pipeline is in the previous thread.

Lmk if there is some doubt in understanding the problem.




On Sun, Feb 9, 2020 at 5:05 PM vivek chaurasiya <[email protected]> wrote:

> Hey team,
>
> My beam pipeline seems to be executing twice. The business logic of beam
> pipeline is to create one ElasticSearch Index. But since its executed twice
> the "spark-submit" command always fails and fails my automation.
>
> Attached is the logs.
>
> I am running spark-submit on AWS-EMR like this:
>
> spark-submit --deploy-mode cluster --conf
> spark.executor.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf
> spark.driver.extraJavaOptions=-DCLOUD_PLATFORM=AWS --conf
> spark.yarn.am.waitTime=300s --conf
> spark.executor.extraClassPath=__app__.jar --driver-memory 8G
> --num-executors 5 --executor-memory 20G --executor-cores 6 --jars
> s3://vivek-tests/cloud-dataflow-1.0.jar --name
> new_user_index_mappings_create_dev --class
> com.noka.beam.common.pipeline.EMRSparkStartPipeline
> s3://vivek-tests/cloud-dataflow-1.0.jar
> --job=new-user-index-mappings-create --dateTime=2020-02-04T00:00:00
> --isDev=True --incrementalExport=False
>
> Note: The code has been working as expected (i.e. one run of create-index)
> on AWS EMR 5.17 but recently we upgraded to AWS-EMR-5.29
>
> Does someone know if something changed in framework or am I doing smth
> wrong? Please help!
>
> Thanks
> Vivek
>

Container: container_1581290593006_0004_01_000001 on 
ip-172-31-10-196.ec2.internal_8041
=========================================================================================
LogType:stderr
Log Upload Time:Mon Feb 10 00:19:13 +0000 2020
LogLength:20509
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/mnt/yarn/usercache/hadoop/filecache/13/__spark_libs__4137318382159335040.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for TERM
20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for HUP
20/02/10 00:18:59 INFO SignalUtils: Registered signal handler for INT
20/02/10 00:18:59 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/02/10 00:18:59 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/02/10 00:18:59 INFO SecurityManager: Changing view acls groups to:
20/02/10 00:18:59 INFO SecurityManager: Changing modify acls groups to:
20/02/10 00:18:59 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); 
groups with view permissions: Set(); users  with modify permissions: Set(yarn, 
hadoop); groups with modify permissions: Set()
20/02/10 00:19:00 INFO ApplicationMaster: Preparing Local resources
20/02/10 00:19:01 INFO ApplicationMaster: ApplicationAttemptId: 
appattempt_1581290593006_0004_000001
20/02/10 00:19:01 INFO ApplicationMaster: Starting the user application in a 
separate Thread
20/02/10 00:19:01 INFO ApplicationMaster: Waiting for spark context 
initialization...
20/02/10 00:19:02 INFO SparkRunner: PipelineOptions.filesToStage was not 
specified. Defaulting to files from the classpath: will stage 1 files. Enable 
logging at DEBUG level to see which files will be staged.
20/02/10 00:19:03 INFO NewUserESIndexMappingsCreateTask: esIndexName = 
new_users_index_20200204
20/02/10 00:19:03 INFO log: Logging to 
org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
20/02/10 00:19:03 INFO log: requestBody = {
  "aliases": null,
  "mappings": {
    "_source": {
      "excludes": ["j"]
    },
    "properties": {
      "a1": {
        "type": "keyword"
      },
      "a30": {
        "type": "keyword"
      },
      "a": {
        "type": "keyword"
      },
      "c": {
        "type": "keyword"
      },
      "j": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "f": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "i": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "cd": {
        "type": "keyword"
      },
      "cw": {
        "type": "keyword"
      },
      "f3": {
        "type": "keyword"
      },
      "f5": {
        "type": "keyword"
      },
      "lc": {
        "type": "keyword"
      },
      "mp": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "s": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "p": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "is": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "staticRank": {
        "type": "long",
        "index": false
      },
      "ts": {
        "type": "long"
      },
      "uid": {
        "type": "keyword"
      }
    }
  },
  "settings": {
    "index": {
      "refresh_interval": "-1",
      "number_of_shards": "8",
      "store": {
        "preload": ["*"],
        "type": "mmapfs"
      },
      "sort": {
        "field": "staticRank",
        "order": "desc"
      },
      "analysis": {
        "analyzer": {
          "list": {
            "type": "custom",
            "tokenizer": "whitespace"
          }
        }
      },
      "number_of_replicas": "4"
    }
  }
}

20/02/10 00:19:04 INFO log: 
{"acknowledged":true,"shards_acknowledged":true,"index":"new_users_index_20200204"}
20/02/10 00:19:04 INFO SparkRunner: Executing pipeline using the SparkRunner.
20/02/10 00:19:04 INFO SparkContextFactory: Creating a brand new Spark Context.
20/02/10 00:19:04 INFO SparkContext: Running Spark version 2.4.4
20/02/10 00:19:04 INFO SparkContext: Submitted application: 
EMRSparkStartPipeline
20/02/10 00:19:04 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/02/10 00:19:04 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/02/10 00:19:04 INFO SecurityManager: Changing view acls groups to:
20/02/10 00:19:04 INFO SecurityManager: Changing modify acls groups to:
20/02/10 00:19:04 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); 
groups with view permissions: Set(); users  with modify permissions: Set(yarn, 
hadoop); groups with modify permissions: Set()
20/02/10 00:19:05 INFO Utils: Successfully started service 'sparkDriver' on 
port 33571.
20/02/10 00:19:05 INFO SparkEnv: Registering MapOutputTracker
20/02/10 00:19:05 INFO SparkEnv: Registering BlockManagerMaster
20/02/10 00:19:05 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/02/10 00:19:05 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/02/10 00:19:05 INFO DiskBlockManager: Created local directory at 
/mnt/yarn/usercache/hadoop/appcache/application_1581290593006_0004/blockmgr-9b72697c-d6ec-4bff-813e-138eb5f9f9b2
20/02/10 00:19:05 INFO MemoryStore: MemoryStore started with capacity 4.1 GB
20/02/10 00:19:05 INFO SparkEnv: Registering OutputCommitCoordinator
20/02/10 00:19:05 INFO JettyUtils: Adding filter 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /jobs, 
/jobs/json, /jobs/job, /jobs/job/json, /stages, /stages/json, /stages/stage, 
/stages/stage/json, /stages/pool, /stages/pool/json, /storage, /storage/json, 
/storage/rdd, /storage/rdd/json, /environment, /environment/json, /executors, 
/executors/json, /executors/threadDump, /executors/threadDump/json, /static, /, 
/api, /jobs/job/kill, /stages/stage/kill.
20/02/10 00:19:05 INFO Utils: Successfully started service 'SparkUI' on port 
41233.
20/02/10 00:19:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
http://ip-172-31-10-196.ec2.internal:41233
20/02/10 00:19:05 INFO SparkContext: Added JAR 
/mnt/yarn/usercache/hadoop/appcache/application_1581290593006_0004/container_1581290593006_0004_01_000001/__app__.jar
 at spark://ip-172-31-10-196.ec2.internal:33571/jars/__app__.jar with timestamp 
1581293945526
20/02/10 00:19:05 INFO YarnClusterScheduler: Created YarnClusterScheduler
20/02/10 00:19:05 INFO SchedulerExtensionServices: Starting Yarn extension 
services with app application_1581290593006_0004 and attemptId 
Some(appattempt_1581290593006_0004_000001)
20/02/10 00:19:05 INFO Utils: Using initial executors = 34, max of 
spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors 
and spark.executor.instances
20/02/10 00:19:05 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 36811.
20/02/10 00:19:05 INFO NettyBlockTransferService: Server created on 
ip-172-31-10-196.ec2.internal:36811
20/02/10 00:19:05 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
20/02/10 00:19:05 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, ip-172-31-10-196.ec2.internal, 36811, None)
20/02/10 00:19:05 INFO BlockManagerMasterEndpoint: Registering block manager 
ip-172-31-10-196.ec2.internal:36811 with 4.1 GB RAM, BlockManagerId(driver, 
ip-172-31-10-196.ec2.internal, 36811, None)
20/02/10 00:19:05 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, ip-172-31-10-196.ec2.internal, 36811, None)
20/02/10 00:19:05 INFO BlockManager: external shuffle service port = 7337
20/02/10 00:19:05 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, ip-172-31-10-196.ec2.internal, 36811, None)
20/02/10 00:19:05 INFO JettyUtils: Adding filter 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /metrics/json.
20/02/10 00:19:05 INFO EventLoggingListener: Logging events to 
hdfs:/var/log/spark/apps/application_1581290593006_0004_1
20/02/10 00:19:05 INFO Utils: Using initial executors = 34, max of 
spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors 
and spark.executor.instances
20/02/10 00:19:05 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to 
request executors before the AM has registered!
20/02/10 00:19:05 INFO RMProxy: Connecting to ResourceManager at 
ip-172-31-4-238.ec2.internal/172.31.4.238:8030
20/02/10 00:19:06 INFO YarnRMClient: Registering the ApplicationMaster
20/02/10 00:19:06 INFO ApplicationMaster:
===============================================================================
YARN executor launch context:
  env:
    CLASSPATH -> 
__app__.jar<CPS>{{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*<CPS>/usr/share/aws/aws-java-sdk/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*<CPS>/usr/share/aws/aws-java-sdk/*<CPS>{{PWD}}/__spark_conf__/__hadoop_conf__
    SPARK_YARN_STAGING_DIR -> 
hdfs://ip-172-31-4-238.ec2.internal:8020/user/hadoop/.sparkStaging/application_1581290593006_0004
    SPARK_USER -> hadoop
    SPARK_PUBLIC_DNS -> ip-172-31-10-196.ec2.internal

  command:
    
LD_LIBRARY_PATH=\"/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:$LD_LIBRARY_PATH\"
 \
      {{JAVA_HOME}}/bin/java \
      -server \
      -Xmx20480m \
      '-DCLOUD_PLATFORM=AWS' \
      -Djava.io.tmpdir={{PWD}}/tmp \
      '-Dspark.driver.port=33571' \
      '-Dspark.history.ui.port=18080' \
      '-Dspark.ui.port=0' \
      -Dspark.yarn.app.container.log.dir=<LOG_DIR> \
      -XX:OnOutOfMemoryError='kill %p' \
      org.apache.spark.executor.CoarseGrainedExecutorBackend \
      --driver-url \
      spark://[email protected]:33571 \
      --executor-id \
      <executorId> \
      --hostname \
      <hostname> \
      --cores \
      6 \
      --app-id \
      application_1581290593006_0004 \
      --user-class-path \
      file:$PWD/__app__.jar \
      1><LOG_DIR>/stdout \
      2><LOG_DIR>/stderr

  resources:
    __app__.jar -> resource { scheme: "hdfs" host: 
"ip-172-31-4-238.ec2.internal" port: 8020 file: 
"/user/hadoop/.sparkStaging/application_1581290593006_0004/cloud-dataflow-1.0.jar"
 } size: 289132276 timestamp: 1581293936195 type: FILE visibility: PRIVATE
    __spark_libs__ -> resource { scheme: "hdfs" host: 
"ip-172-31-4-238.ec2.internal" port: 8020 file: 
"/user/hadoop/.sparkStaging/application_1581290593006_0004/__spark_libs__4137318382159335040.zip"
 } size: 230327364 timestamp: 1581293930975 type: ARCHIVE visibility: PRIVATE
    __spark_conf__ -> resource { scheme: "hdfs" host: 
"ip-172-31-4-238.ec2.internal" port: 8020 file: 
"/user/hadoop/.sparkStaging/application_1581290593006_0004/__spark_conf__.zip" 
} size: 237273 timestamp: 1581293936294 type: ARCHIVE visibility: PRIVATE

===============================================================================
20/02/10 00:19:06 INFO Utils: Using initial executors = 34, max of 
spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors 
and spark.executor.instances
20/02/10 00:19:06 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster registered as 
NettyRpcEndpointRef(spark://[email protected]:33571)
20/02/10 00:19:06 INFO YarnAllocator: Will request 34 executor container(s), 
each with 6 core(s) and 24320 MB memory (including 3840 MB of overhead)
20/02/10 00:19:06 INFO YarnAllocator: Submitted 34 unlocalized container 
requests.
20/02/10 00:19:06 INFO ApplicationMaster: Started progress reporter thread with 
(heartbeat : 3000, initial allocation : 200) intervals
20/02/10 00:19:06 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready 
for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
20/02/10 00:19:06 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook 
done
20/02/10 00:19:06 ERROR ApplicationMaster: User class threw exception: 
java.lang.NoSuchMethodError: 
com.fasterxml.jackson.databind.jsontype.TypeSerializer.typeId(Ljava/lang/Object;Lcom/fasterxml/jackson/core/JsonToken;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
java.lang.NoSuchMethodError: 
com.fasterxml.jackson.databind.jsontype.TypeSerializer.typeId(Ljava/lang/Object;Lcom/fasterxml/jackson/core/JsonToken;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
        at 
org.apache.beam.sdk.io.aws.options.AwsModule$AWSCredentialsProviderSerializer.serializeWithType(AwsModule.java:163)
        at 
org.apache.beam.sdk.io.aws.options.AwsModule$AWSCredentialsProviderSerializer.serializeWithType(AwsModule.java:134)
        at 
com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
        at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
        at 
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
        at 
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.ensureSerializable(ProxyInvocationHandler.java:721)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.serialize(ProxyInvocationHandler.java:647)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.serialize(ProxyInvocationHandler.java:635)
        at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
        at 
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
        at 
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
        at 
org.apache.beam.runners.core.construction.SerializablePipelineOptions.serializeToJson(SerializablePipelineOptions.java:67)
        at 
org.apache.beam.runners.core.construction.SerializablePipelineOptions.<init>(SerializablePipelineOptions.java:43)
        at 
org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:71)
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:215)
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
        at 
com.nokachat.beam.search.displaynamesearch.pipeline.NewUserESIndexMappingsCreateTask.execute(NewUserESIndexMappingsCreateTask.java:45)
        at 
com.nokachat.beam.common.pipeline.DataflowJob.run(DataflowJob.java:61)
        at 
com.nokachat.beam.common.pipeline.EMRSparkStartPipeline.main(EMRSparkStartPipeline.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
20/02/10 00:19:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 
15, (reason: User class threw exception: java.lang.NoSuchMethodError: 
com.fasterxml.jackson.databind.jsontype.TypeSerializer.typeId(Ljava/lang/Object;Lcom/fasterxml/jackson/core/JsonToken;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
        at 
org.apache.beam.sdk.io.aws.options.AwsModule$AWSCredentialsProviderSerializer.serializeWithType(AwsModule.java:163)
        at 
org.apache.beam.sdk.io.aws.options.AwsModule$AWSCredentialsProviderSerializer.serializeWithType(AwsModule.java:134)
        at 
com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
        at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
        at 
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
        at 
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.ensureSerializable(ProxyInvocationHandler.java:721)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.serialize(ProxyInvocationHandler.java:647)
        at 
org.apache.beam.sdk.options.ProxyInvocationHandler$Serializer.serialize(ProxyInvocationHandler.java:635)
        at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
        at 
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
        at 
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
        at 
org.apache.beam.runners.core.construction.SerializablePipelineOptions.serializeToJson(SerializablePipelineOptions.java:67)
        at 
org.apache.beam.runners.core.construction.SerializablePipelineOptions.<init>(SerializablePipelineOptions.java:43)
        at 
org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:71)
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:215)
        at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
        at 
com.nokachat.beam.search.displaynamesearch.pipeline.NewUserESIndexMappingsCreateTask.execute(NewUserESIndexMappingsCreateTask.java:45)
        at 
com.nokachat.beam.common.pipeline.DataflowJob.run(DataflowJob.java:61)
        at 
com.nokachat.beam.common.pipeline.EMRSparkStartPipeline.main(EMRSparkStartPipeline.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
)
20/02/10 00:19:06 INFO SparkContext: Invoking stop() from shutdown hook
20/02/10 00:19:06 INFO SparkUI: Stopped Spark web UI at 
http://ip-172-31-10-196.ec2.internal:41233
20/02/10 00:19:06 INFO YarnAllocator: Driver requested a total number of 0 
executor(s).
20/02/10 00:19:06 INFO YarnClusterSchedulerBackend: Shutting down all executors
20/02/10 00:19:06 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each 
executor to shut down
20/02/10 00:19:06 INFO SchedulerExtensionServices: Stopping 
SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
20/02/10 00:19:06 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
20/02/10 00:19:06 INFO MemoryStore: MemoryStore cleared
20/02/10 00:19:06 INFO BlockManager: BlockManager stopped
20/02/10 00:19:06 INFO BlockManagerMaster: BlockManagerMaster stopped
20/02/10 00:19:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
20/02/10 00:19:06 INFO SparkContext: Successfully stopped SparkContext
20/02/10 00:19:06 INFO ShutdownHookManager: Shutdown hook called
20/02/10 00:19:06 INFO ShutdownHookManager: Deleting directory 
/mnt/yarn/usercache/hadoop/appcache/application_1581290593006_0004/spark-9e6f581b-d822-4347-9b69-6379cd2c3fe0
End of LogType:stderr

LogType:stdout
Log Upload Time:Mon Feb 10 00:19:13 +0000 2020
LogLength:0
Log Contents:
End of LogType:stdout



Container: container_1581290593006_0004_02_000001 on 
ip-172-31-2-76.ec2.internal_8041
=======================================================================================
LogType:stderr
Log Upload Time:Mon Feb 10 00:19:13 +0000 2020
LogLength:10158
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/mnt/yarn/usercache/hadoop/filecache/13/__spark_libs__4137318382159335040.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/02/10 00:19:08 INFO SignalUtils: Registered signal handler for TERM
20/02/10 00:19:08 INFO SignalUtils: Registered signal handler for HUP
20/02/10 00:19:08 INFO SignalUtils: Registered signal handler for INT
20/02/10 00:19:09 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/02/10 00:19:09 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/02/10 00:19:09 INFO SecurityManager: Changing view acls groups to:
20/02/10 00:19:09 INFO SecurityManager: Changing modify acls groups to:
20/02/10 00:19:09 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); 
groups with view permissions: Set(); users  with modify permissions: Set(yarn, 
hadoop); groups with modify permissions: Set()
20/02/10 00:19:09 INFO ApplicationMaster: Preparing Local resources
20/02/10 00:19:10 INFO ApplicationMaster: ApplicationAttemptId: 
appattempt_1581290593006_0004_000002
20/02/10 00:19:10 INFO ApplicationMaster: Starting the user application in a 
separate Thread
20/02/10 00:19:10 INFO ApplicationMaster: Waiting for spark context 
initialization...
20/02/10 00:19:11 INFO SparkRunner: PipelineOptions.filesToStage was not 
specified. Defaulting to files from the classpath: will stage 1 files. Enable 
logging at DEBUG level to see which files will be staged.
20/02/10 00:19:12 INFO NewUserESIndexMappingsCreateTask: esIndexName = 
new_users_index_20200204
20/02/10 00:19:12 INFO log: Logging to 
org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
20/02/10 00:19:12 INFO log: requestBody = {
  "aliases": null,
  "mappings": {
    "_source": {
      "excludes": ["j"]
    },
    "properties": {
      "a1": {
        "type": "keyword"
      },
      "a30": {
        "type": "keyword"
      },
      "a": {
        "type": "keyword"
      },
      "c": {
        "type": "keyword"
      },
      "j": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "f": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "i": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "cd": {
        "type": "keyword"
      },
      "cw": {
        "type": "keyword"
      },
      "f3": {
        "type": "keyword"
      },
      "f5": {
        "type": "keyword"
      },
      "lc": {
        "type": "keyword"
      },
      "mp": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "s": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "p": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list",
        "eager_global_ordinals": true
      },
      "is": {
        "type": "text",
        "index_options": "docs",
        "analyzer": "list"
      },
      "staticRank": {
        "type": "long",
        "index": false
      },
      "ts": {
        "type": "long"
      },
      "uid": {
        "type": "keyword"
      }
    }
  },
  "settings": {
    "index": {
      "refresh_interval": "-1",
      "number_of_shards": "8",
      "store": {
        "preload": ["*"],
        "type": "mmapfs"
      },
      "sort": {
        "field": "staticRank",
        "order": "desc"
      },
      "analysis": {
        "analyzer": {
          "list": {
            "type": "custom",
            "tokenizer": "whitespace"
          }
        }
      },
      "number_of_replicas": "4"
    }
  }
}

20/02/10 00:19:12 ERROR NewUserESIndexMappingsCreateTask: index creation failed 
with exception!
org.elasticsearch.client.ResponseException: PUT 
https://vpc-snap-search-friending-b-je4dr3xmx7mdofqcpernhto27a.us-east-1.es.amazonaws.com/new_users_index_20200204:
 HTTP/1.1 400 Bad Request
{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"index
 [new_users_index_20200204/kjPfFcNcRS6aiJVsnFwm3A] already 
exists","index_uuid":"kjPfFcNcRS6aiJVsnFwm3A","index":"new_users_index_20200204"}],"type":"resource_already_exists_exception","reason":"index
 [new_users_index_20200204/kjPfFcNcRS6aiJVsnFwm3A] already 
exists","index_uuid":"kjPfFcNcRS6aiJVsnFwm3A","index":"new_users_index_20200204"},"status":400}
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:354)
        at org.elasticsearch.client.RestClient$1.completed(RestClient.java:343)
        at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
        at 
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
        at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
        at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
        at 
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
        at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
        at java.lang.Thread.run(Thread.java:748)
20/02/10 00:19:12 ERROR ApplicationMaster: User class threw exception: 
java.lang.RuntimeException: Pipeline Finished with no result.
java.lang.RuntimeException: Pipeline Finished with no result.
        at 
com.nokachat.beam.common.pipeline.DataflowJob.run(DataflowJob.java:63)
        at 
com.nokachat.beam.common.pipeline.EMRSparkStartPipeline.main(EMRSparkStartPipeline.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
20/02/10 00:19:12 INFO ApplicationMaster: Final app status: FAILED, exitCode: 
13, (reason: User class threw exception: java.lang.RuntimeException: Pipeline 
Finished with no result.
        at 
com.nokachat.beam.common.pipeline.DataflowJob.run(DataflowJob.java:63)
        at 
com.nokachat.beam.common.pipeline.EMRSparkStartPipeline.main(EMRSparkStartPipeline.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
)
20/02/10 00:19:12 ERROR ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:468)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:779)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:778)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:803)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.RuntimeException: Pipeline Finished with no result.
        at 
com.nokachat.beam.common.pipeline.DataflowJob.run(DataflowJob.java:63)
        at 
com.nokachat.beam.common.pipeline.EMRSparkStartPipeline.main(EMRSparkStartPipeline.java:96)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
20/02/10 00:19:12 INFO ApplicationMaster: Deleting staging directory 
hdfs://ip-172-31-4-238.ec2.internal:8020/user/hadoop/.sparkStaging/application_1581290593006_0004
20/02/10 00:19:12 INFO ShutdownHookManager: Shutdown hook called
End of LogType:stderr

LogType:stdout
Log Upload Time:Mon Feb 10 00:19:13 +0000 2020
LogLength:0
Log Contents:
End of LogType:stdout

Attachment: code.java
Description: Binary data

Reply via email to