Hi all,

I just found the reason why my application shut down immediately. It's
because I didn't call the `.waitUntilFinish()` method when I run the
pipeline.

Sorry for bothering you all due to such a stupid mistake! Thanks a lot!
Have a nice week!

Many thanks,
Minreng

On Mon, Sep 21, 2020 at 2:07 AM Minreng Wu <[email protected]> wrote:

> Hi Contributors,
>
> I have a very simple pipeline that just reads data from KafkaIO, then
> prints the parsed data to the console. Below is the main function of my
> program:
>
>> public static void main(String[] args) {
>>     // create pipeline
>>     PipelineOptions pipelineOption = PipelineOptionsFactory.fromArgs(args)
>>             .withoutStrictParsing()
>>             .as(PipelineOptions.class);
>>
>>     Pipeline pipeline = Pipeline.create(pipelineOption);
>>
>>     // define input schema
>>     Schema inputSchema = Schema.builder()
>>             .addStringField("camera")
>>             .addDateTimeField("event_time")
>>             .addInt32Field("car")
>>             .addInt32Field("person")
>>             .build();
>>
>>     // generate stream source
>>     PCollection<Row> rows = pipeline
>>             .apply("read kafka", KafkaIO.<String, String>read()
>>                     .withBootstrapServers("127.0.0.1:9092")
>>                     .withTopic("beamKafkaTest")
>>                     .withKeyDeserializer(StringDeserializer.class)
>>                     .withValueDeserializer(StringDeserializer.class)
>>                     .withReadCommitted()
>>                     .commitOffsetsInFinalize()
>>                     .withConsumerConfigUpdates(ImmutableMap.of("group.id", 
>> "client-1"))
>>                     .withoutMetadata()
>>             )
>>             // parse JSON
>>             .apply("parse JSON", ParDo.of(new DoFn<KV<String, String>, 
>> Row>() {
>>                 @ProcessElement
>>                 public void processElement(ProcessContext c) {
>>                     String jsonData = c.element().getValue();
>>
>>                     // parse json
>>                     JSONObject jsonObject = JSON.parseObject(jsonData);
>>
>>                     // build row
>>                     List<Object> list = new ArrayList<>();
>>                     list.add(jsonObject.get("camera"));
>>                     list.add(dtf.parseDateTime((String) 
>> jsonObject.get("event_time")));
>>                     list.add(jsonObject.get("car"));
>>                     list.add(jsonObject.get("person"));
>>                     Row row = Row.withSchema(inputSchema)
>>                             .addValues(list)
>>                             .build();
>>
>>                     System.out.println(row);
>>
>>                     // emit row
>>                     c.output(row);
>>                 }
>>             }))
>>             // set input schema
>>             .setRowSchema(inputSchema);
>>
>>     // define output schema
>>     Schema outputSchema = Schema.builder()
>>             .addStringField("camera")
>>             .addDateTimeField("event_time")
>>             .addInt32Field("car")
>>             .addInt32Field("person")
>>             .build();
>>
>>     // print results
>>     rows
>>             .apply(
>>                     "log_result",
>>                     MapElements.via(
>>                             new SimpleFunction<Row, Row>() {
>>                                 @Override
>>                                 public Row apply(Row input) {
>>                                     // expect output:
>>                                     // RESULT: [row, 5.0]
>>                                     System.out.println("RESULT: " + 
>> input.getValues());
>>                                     return input;
>>                                 }
>>                             }))
>>             .setRowSchema(
>>                     outputSchema
>>             );
>>
>>     // run
>>     pipeline.run();
>> }
>>
>> This code works well with the direct runner and the Flink runner. But
> when it comes to the Spark runner, it shut down immediately after submitted
> to the spark cluster without any error messages. As expected it should keep
> running and consuming messages until we stop it. But now it just performs
> like a one-pass program. And I tested some of the official examples
> (WordCount, WindowedWordCount) that goes with the bounded sources(TextIO),
> they all worked well with the Spark Runner.
>
> The project structure I used is generated from the maven archetype
> provided on this page
> <https://beam.apache.org/get-started/quickstart-java/#get-the-wordcount-code>.
> The logs of my submitted spark application are attached at the bottom of
> this email. Since it printed the consumer configurations, I guess it had
> already initialized the Kafka consumer, but failed in the following steps.
>
> So my question is: *Why does my KafkaIO program shut down immediately
> when running with the Spark runner, but work well with the direct runner
> and flink runner? Is there any existing example of using KafkaIO with the
> Spark runner?*
>
> Really appreciate your help and advice! Stay safe and happy!
>
> Many thanks,
> Minreng Wu
>
>
> Top of Log
>>
>> Spark Executor Command: 
>> "/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/bin/java" 
>> "-cp" 
>> "/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/conf/:/Users/wumrwds/Development/spark-2.4.5-bin-hadoop2.7/jars/*"
>>  "-Xmx1024M" "-Dspark.driver.port=53118" 
>> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
>> "spark://[email protected]:53118" "--executor-id" "0" 
>> "--hostname" "10.0.10.123" "--cores" "12" "--app-id" 
>> "app-20200921011535-0003" "--worker-url" "spark://[email protected]:53084"
>> ========================================
>>
>> 20/09/21 01:15:32 WARN Utils: Your hostname, Minrengs-MacBook-Pro.local 
>> resolves to a loopback address: 127.0.0.1; using 10.0.10.123 instead (on 
>> interface en0)
>> 20/09/21 01:15:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
>> another address
>> 20/09/21 01:15:33 WARN NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> log4j:WARN No appenders could be found for logger 
>> (org.apache.beam.sdk.options.PipelineOptionsFactory).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for 
>> more info.
>> Using Spark's default log4j profile: 
>> org/apache/spark/log4j-defaults.properties
>> 20/09/21 01:15:35 WARN Checkpoint: Checkpoint directory 
>> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint does not exist
>> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Creating a new 
>> Spark Streaming Context
>> 20/09/21 01:15:35 INFO SparkRunnerStreamingContextFactory: Setting Spark 
>> streaming batchDuration to 500 msec
>> 20/09/21 01:15:35 INFO SparkContextFactory: Creating a brand new Spark 
>> Context.
>> 20/09/21 01:15:35 INFO SparkContext: Running Spark version 2.4.5
>> 20/09/21 01:15:35 INFO SparkContext: Submitted application: BeamSqlApp2
>> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls to: wumrwds
>> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls to: wumrwds
>> 20/09/21 01:15:35 INFO SecurityManager: Changing view acls groups to:
>> 20/09/21 01:15:35 INFO SecurityManager: Changing modify acls groups to:
>> 20/09/21 01:15:35 INFO SecurityManager: SecurityManager: authentication 
>> disabled; ui acls disabled; users  with view permissions: Set(wumrwds); 
>> groups with view permissions: Set(); users  with modify permissions: 
>> Set(wumrwds); groups with modify permissions: Set()
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 'sparkDriver' on 
>> port 53118.
>> 20/09/21 01:15:35 INFO SparkEnv: Registering MapOutputTracker
>> 20/09/21 01:15:35 INFO SparkEnv: Registering BlockManagerMaster
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Using 
>> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
>> information
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: 
>> BlockManagerMasterEndpoint up
>> 20/09/21 01:15:35 INFO DiskBlockManager: Created local directory at 
>> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/blockmgr-e26bb6e9-1c3a-45ee-b176-abb7ca03ae4f
>> 20/09/21 01:15:35 INFO MemoryStore: MemoryStore started with capacity 366.3 
>> MB
>> 20/09/21 01:15:35 INFO SparkEnv: Registering OutputCommitCoordinator
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 'SparkUI' on port 
>> 4040.
>> 20/09/21 01:15:35 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
>> http://10.0.10.123:4040
>> 20/09/21 01:15:35 INFO SparkContext: Added JAR 
>> file:/Users/wumrwds/Git/play/beam-play/word-count-beam/target/word-count-beam-bundled-0.1.jar
>>  at spark://10.0.10.123:53118/jars/word-count-beam-bundled-0.1.jar with 
>> timestamp 1600668935676
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Connecting to 
>> master spark://Minrengs-MacBook-Pro.local:7077...
>> 20/09/21 01:15:35 INFO TransportClientFactory: Successfully created 
>> connection to Minrengs-MacBook-Pro.local/127.0.0.1:7077 after 26 ms (0 ms 
>> spent in bootstraps)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Connected to Spark 
>> cluster with app ID app-20200921011535-0003
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor added: 
>> app-20200921011535-0003/0 on worker-20200921011236-10.0.10.123-53084 
>> (10.0.10.123:53084) with 12 core(s)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: Granted executor ID 
>> app-20200921011535-0003/0 on hostPort 10.0.10.123:53084 with 12 core(s), 
>> 1024.0 MB RAM
>> 20/09/21 01:15:35 INFO Utils: Successfully started service 
>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120.
>> 20/09/21 01:15:35 INFO NettyBlockTransferService: Server created on 
>> 10.0.10.123:53120
>> 20/09/21 01:15:35 INFO BlockManager: Using 
>> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
>> policy
>> 20/09/21 01:15:35 INFO BlockManagerMaster: Registering BlockManager 
>> BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO StandaloneAppClient$ClientEndpoint: Executor updated: 
>> app-20200921011535-0003/0 is now RUNNING
>> 20/09/21 01:15:35 INFO BlockManagerMasterEndpoint: Registering block manager 
>> 10.0.10.123:53120 with 366.3 MB RAM, BlockManagerId(driver, 10.0.10.123, 
>> 53120, None)
>> 20/09/21 01:15:35 INFO BlockManagerMaster: Registered BlockManager 
>> BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO BlockManager: Initialized BlockManager: 
>> BlockManagerId(driver, 10.0.10.123, 53120, None)
>> 20/09/21 01:15:35 INFO StandaloneSchedulerBackend: SchedulerBackend is ready 
>> for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
>> 20/09/21 01:15:35 WARN Checkpoint$CheckpointDir: The specified checkpoint 
>> dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable 
>> filesystem so in case of failures this job may not recover properly or even 
>> at all.
>> 20/09/21 01:15:35 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: 
>> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
>> 20/09/21 01:15:36 INFO MetricsAccumulator: No metrics checkpoint found.
>> 20/09/21 01:15:36 INFO MetricsAccumulator: Instantiated metrics accumulator: 
>> MetricQueryResults()
>> 20/09/21 01:15:36 WARN Checkpoint$CheckpointDir: The specified checkpoint 
>> dir /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd does not match a reliable 
>> filesystem so in case of failures this job may not recover properly or even 
>> at all.
>> 20/09/21 01:15:36 INFO Checkpoint$CheckpointDir: Checkpoint dir set to: 
>> /tmp/beamsqlapp2-wumrwds-0921061534-60408bcd
>> 20/09/21 01:15:36 INFO AggregatorsAccumulator: No accumulator checkpoint 
>> found.
>> 20/09/21 01:15:36 INFO AggregatorsAccumulator: Instantiated aggregators 
>> accumulator:
>> 20/09/21 01:15:36 INFO SparkRunner$Evaluator: Evaluating 
>> Read(KafkaUnboundedSource)
>> 20/09/21 01:15:36 INFO PIDRateEstimator: Created PIDRateEstimator with 
>> proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 100.0
>> 20/09/21 01:15:36 INFO SourceDStream: Read duration set to: PT0.200S
>> 20/09/21 01:15:36 INFO SourceDStream: Max records per batch has not been 
>> limited by neither configuration nor the rate controller, and will remain 
>> unlimited for the current batch (9223372036854775807).
>> 20/09/21 01:15:36 INFO ConsumerConfig: ConsumerConfig values:
>>      allow.auto.create.topics = true
>>      auto.commit.interval.ms = 5000
>>      auto.offset.reset = latest
>>      bootstrap.servers = [10.0.10.123:9092]
>>      check.crcs = true
>>      client.dns.lookup = default
>>      client.id =
>>      client.rack =
>>      connections.max.idle.ms = 540000
>>      default.api.timeout.ms = 60000
>>      enable.auto.commit = false
>>      exclude.internal.topics = true
>>      fetch.max.bytes = 52428800
>>      fetch.max.wait.ms = 500
>>      fetch.min.bytes = 1
>>      group.id = client-1
>>      group.instance.id = null
>>      heartbeat.interval.ms = 3000
>>      interceptor.classes = []
>>      internal.leave.group.on.close = true
>>      isolation.level = read_committed
>>      key.deserializer = class 
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>      max.partition.fetch.bytes = 1048576
>>      max.poll.interval.ms = 300000
>>      max.poll.records = 500
>>      metadata.max.age.ms = 300000
>>      metric.reporters = []
>>      metrics.num.samples = 2
>>      metrics.recording.level = INFO
>>      metrics.sample.window.ms = 30000
>>      partition.assignment.strategy = [class 
>> org.apache.kafka.clients.consumer.RangeAssignor]
>>      receive.buffer.bytes = 524288
>>      reconnect.backoff.max.ms = 1000
>>      reconnect.backoff.ms = 50
>>      request.timeout.ms = 30000
>>      retry.backoff.ms = 100
>>      sasl.client.callback.handler.class = null
>>      sasl.jaas.config = null
>>      sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>      sasl.kerberos.min.time.before.relogin = 60000
>>      sasl.kerberos.service.name = null
>>      sasl.kerberos.ticket.renew.jitter = 0.05
>>      sasl.kerberos.ticket.renew.window.factor = 0.8
>>      sasl.login.callback.handler.class = null
>>      sasl.login.class = null
>>      sasl.login.refresh.buffer.seconds = 300
>>      sasl.login.refresh.min.period.seconds = 60
>>      sasl.login.refresh.window.factor = 0.8
>>      sasl.login.refresh.window.jitter = 0.05
>>      sasl.mechanism = GSSAPI
>>      security.protocol = PLAINTEXT
>>      security.providers = null
>>      send.buffer.bytes = 131072
>>      session.timeout.ms = 10000
>>      ssl.cipher.suites = null
>>      ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>      ssl.endpoint.identification.algorithm = https
>>      ssl.key.password = null
>>      ssl.keymanager.algorithm = SunX509
>>      ssl.keystore.location = null
>>      ssl.keystore.password = null
>>      ssl.keystore.type = JKS
>>      ssl.protocol = TLS
>>      ssl.provider = null
>>      ssl.secure.random.implementation = null
>>      ssl.trustmanager.algorithm = PKIX
>>      ssl.truststore.location = null
>>      ssl.truststore.password = null
>>      ssl.truststore.type = JKS
>>      value.deserializer = class 
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka version: 2.4.1
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b
>> 20/09/21 01:15:36 INFO AppInfoParser: Kafka startTimeMs: 1600668936638
>> 20/09/21 01:15:37 INFO Metadata: [Consumer clientId=consumer-client-1-1, 
>> groupId=client-1] Cluster ID: jYuebrB3RQKaA55bNA1Vsw
>> 20/09/21 01:15:37 INFO KafkaUnboundedSource: Partitions assigned to split 0 
>> (total 1): beamKafkaTest-0
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1@63718b93
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
>> org.apache.beam.examples.BeamSqlApp2$1@4930539b
>> 20/09/21 01:15:37 INFO SparkRunner$Evaluator: Evaluating 
>> org.apache.beam.sdk.transforms.MapElements$1@37b72ea
>> 20/09/21 01:15:37 WARN SparkContext: Spark is not running in local mode, 
>> therefore the checkpoint directory must not be on the local filesystem. 
>> Directory 
>> 'file:/tmp/beamsqlapp2-wumrwds-0921061534-60408bcd/spark-checkpoint' appears 
>> to be on the local filesystem.
>> 20/09/21 01:15:37 INFO SparkRunner: Starting streaming pipeline execution.
>> 20/09/21 01:15:37 INFO SourceDStream: Duration for remembering RDDs set to 
>> 10000 ms for org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated 
>> org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory 
>> Deserialized 1x Replicated
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 
>> 5000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 
>> 10000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and 
>> validated 
>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 
>> 1x Replicated
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated 
>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.MappedDStream@6275db71
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Slide time = 
>> 500 ms
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Storage level 
>> = Serialized 1x Replicated
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Checkpoint 
>> interval = null
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Remember 
>> interval = 500 ms
>> 20/09/21 01:15:37 INFO SparkUnboundedSource$ReadReportDStream: Initialized 
>> and validated 
>> org.apache.beam.runners.spark.io.SparkUnboundedSource$ReadReportDStream@3ef0d3c5
>> 20/09/21 01:15:37 INFO SourceDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO SourceDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO SourceDStream: Remember interval = 10000 ms
>> 20/09/21 01:15:37 INFO SourceDStream: Initialized and validated 
>> org.apache.beam.runners.spark.io.SourceDStream@49db1a95
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Storage level = Memory 
>> Deserialized 1x Replicated
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Checkpoint interval = 
>> 5000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Remember interval = 
>> 10000 ms
>> 20/09/21 01:15:37 INFO InternalMapWithStateDStream: Initialized and 
>> validated 
>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@5b0151fb
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Storage level = Serialized 
>> 1x Replicated
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MapWithStateDStreamImpl: Initialized and validated 
>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@6ff117df
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FlatMappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.FlatMappedDStream@4c293f9c
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.MappedDStream@3430988a
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.TransformedDStream@17ab8c32
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.FilteredDStream@14660807
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.MappedDStream@8f826ef
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.TransformedDStream@7e734051
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.FilteredDStream@1be77a7
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.MappedDStream@25bae116
>> 20/09/21 01:15:37 INFO TransformedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO TransformedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO TransformedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO TransformedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.TransformedDStream@6e41c8a5
>> 20/09/21 01:15:37 INFO FilteredDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO FilteredDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO FilteredDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO FilteredDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.FilteredDStream@362e3875
>> 20/09/21 01:15:37 INFO MappedDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO MappedDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO MappedDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO MappedDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.MappedDStream@22aaf27c
>> 20/09/21 01:15:37 INFO ForEachDStream: Slide time = 500 ms
>> 20/09/21 01:15:37 INFO ForEachDStream: Storage level = Serialized 1x 
>> Replicated
>> 20/09/21 01:15:37 INFO ForEachDStream: Checkpoint interval = null
>> 20/09/21 01:15:37 INFO ForEachDStream: Remember interval = 500 ms
>> 20/09/21 01:15:37 INFO ForEachDStream: Initialized and validated 
>> org.apache.spark.streaming.dstream.ForEachDStream@3bd7e62b
>> 20/09/21 01:15:37 INFO RecurringTimer: Started timer for JobGenerator at 
>> time 1600668938000
>> 20/09/21 01:15:37 INFO JobGenerator: Started JobGenerator at 1600668938000 ms
>> 20/09/21 01:15:37 INFO JobScheduler: Started JobScheduler
>> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext started
>> 20/09/21 01:15:37 INFO StreamingContext: Invoking stop(stopGracefully=false) 
>> from shutdown hook
>> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog shutting 
>> down at time: 1600668937717.
>> 20/09/21 01:15:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer 
>> queue interrupted.
>> 20/09/21 01:15:37 INFO BatchedWriteAheadLog: BatchedWriteAheadLog Writer 
>> thread exiting.
>> 20/09/21 01:15:37 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Stopped 
>> write ahead log manager
>> 20/09/21 01:15:37 INFO ReceiverTracker: ReceiverTracker stopped
>> 20/09/21 01:15:37 INFO JobGenerator: Stopping JobGenerator immediately
>> 20/09/21 01:15:37 INFO RecurringTimer: Stopped timer for JobGenerator after 
>> time -1
>> 20/09/21 01:15:37 INFO CheckpointWriter: CheckpointWriter executor 
>> terminated? true, waited for 0 ms.
>> 20/09/21 01:15:37 INFO JobGenerator: Stopped JobGenerator
>> 20/09/21 01:15:37 INFO JobScheduler: Stopped JobScheduler
>> 20/09/21 01:15:37 INFO StreamingContext: StreamingContext stopped 
>> successfully
>> 20/09/21 01:15:37 INFO SparkContext: Invoking stop() from shutdown hook
>> 20/09/21 01:15:37 INFO SparkUI: Stopped Spark web UI at 
>> http://10.0.10.123:4040
>> 20/09/21 01:15:37 INFO StandaloneSchedulerBackend: Shutting down all 
>> executors
>> 20/09/21 01:15:37 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking 
>> each executor to shut down
>> 20/09/21 01:15:37 INFO MapOutputTrackerMasterEndpoint: 
>> MapOutputTrackerMasterEndpoint stopped!
>> 20/09/21 01:15:37 INFO MemoryStore: MemoryStore cleared
>> 20/09/21 01:15:37 INFO BlockManager: BlockManager stopped
>> 20/09/21 01:15:37 INFO BlockManagerMaster: BlockManagerMaster stopped
>> 20/09/21 01:15:37 INFO 
>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
>> OutputCommitCoordinator stopped!
>> 20/09/21 01:15:37 INFO SparkContext: Successfully stopped SparkContext
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Shutdown hook called
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory 
>> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-fac2b91e-8bfc-4855-b0f5-bbb206bb678b
>> 20/09/21 01:15:37 INFO ShutdownHookManager: Deleting directory 
>> /private/var/folders/kl/98ycp0vj19z082xtn76h07r00000gn/T/spark-e3447ff1-ca22-4574-9864-834fac63eea3
>>
>>

Reply via email to