I have a native R model and doing structured streaming on it. Data comes
from Kafka and goes into dapply method where my model does prediction and
data is written to sink.

Problem:- My model requires caret package. Inside dapply function for every
stream job, caret package is loaded again which adds (~2s) delay.


kafka <- read.stream("kafka",subscribe = "source",
kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic =
"sink", kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
lattice
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s,
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output
= 0.001 s, total = 2.093 s

PFA: rest log file.

Ideally, the packages shouldn't be loaded again. I think the environment is
getting created and destroyed with each query. Is there some solution to
this? or Am I missing something here?


Thanks,

Deepansh
Java ref type org.apache.spark.sql.SparkSession id 1 
Re-using existing Spark Context. Call sparkR.session.stop() or restart R to 
create a new Spark Context
Warning message:
'sparkR.init' is deprecated.
Use 'sparkR.session' instead.
See help("Deprecated") 
Loading required package: lattice

Attaching package: ‘lattice’

The following object is masked from ‘package:SparkR’:

    histogram

Loading required package: ggplot2
18/03/23 11:07:58 INFO StreamExecution: Starting [id = 
1605966f-51e3-4df9-b284-7535e37b6d44, runId = 
ab605bec-3d57-4a61-a45b-5174292ab2ec]. Use /tmp/temporary-%20Gunp5pUYnV to 
store the query checkpoint.
18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [10.117.172.48:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 1
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

18/03/23 11:07:58 INFO ConsumerConfig: ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [10.117.172.48:9092]
        check.crcs = true
        client.id = consumer-1
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 1
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

18/03/23 11:07:59 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:07:59 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:07:59 INFO SessionState: Created local directory: 
/tmp/fa519f67-cbad-4691-a260-744dd9c9a432_resources
18/03/23 11:07:59 INFO SessionState: Created HDFS directory: 
/tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432
18/03/23 11:07:59 INFO SessionState: Created local directory: 
/tmp/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432
18/03/23 11:07:59 INFO SessionState: Created HDFS directory: 
/tmp/hive/d0g00m3/fa519f67-cbad-4691-a260-744dd9c9a432/_tmp_space.db
18/03/23 11:07:59 INFO HiveClientImpl: Warehouse location for Hive client 
(version 1.2.1) is file:/u/users/d0g00m3/spark-warehouse/
18/03/23 11:07:59 INFO StreamExecution: Starting new streaming query.
18/03/23 11:07:59 INFO AbstractCoordinator: Discovered coordinator 
10.117.172.48:9092 (id: 2147483647 rack: null) for group 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0.
18/03/23 11:07:59 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [] for group 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO AbstractCoordinator: (Re-)joining group 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO AbstractCoordinator: Successfully joined group 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0 
with generation 1
18/03/23 11:07:59 INFO ConsumerCoordinator: Setting newly assigned partitions 
[source-0] for group 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-driver-0
18/03/23 11:07:59 INFO KafkaSource: Initial offsets: {"source":{"0":4647070}}
18/03/23 11:07:59 INFO StreamExecution: Committed offsets for batch 0. Metadata 
OffsetSeqMetadata(0,1521803279883,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:07:59 INFO KafkaSource: GetBatch called with start = None, end = 
{"source":{"0":4647070}}
18/03/23 11:07:59 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:00 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(source-0,4647070,4647070,None)
18/03/23 11:08:00 INFO CodeGenerator: Code generated in 222.605008 ms
18/03/23 11:08:00 INFO CodeGenerator: Code generated in 12.444683 ms
18/03/23 11:08:00 INFO SparkContext: Starting job: start at 
NativeMethodAccessorImpl.java:0
18/03/23 11:08:00 INFO DAGScheduler: Got job 0 (start at 
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:00 INFO DAGScheduler: Final stage: ResultStage 0 (start at 
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:00 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:00 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:00 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0), which has no 
missing parents
18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:00 INFO SparkContext: Created broadcast 0 from broadcast at 
DAGScheduler.scala:1006
18/03/23 11:08:00 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (MapPartitionsRDD[6] at start at NativeMethodAccessorImpl.java:0) 
(first 15 tasks are for partitions Vector(0))
18/03/23 11:08:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/03/23 11:08:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/03/23 11:08:00 INFO Executor: Fetching file:/u/users/d0g00m3/testlib.R with 
timestamp 1521803266773
18/03/23 11:08:00 INFO Utils: /u/users/d0g00m3/testlib.R has been previously 
copied to 
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/testlib.R
18/03/23 11:08:00 INFO Executor: Fetching 
spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar 
with timestamp 1521803266547
18/03/23 11:08:00 INFO TransportClientFactory: Successfully created connection 
to /10.246.79.20:57945 after 10 ms (0 ms spent in bootstraps)
18/03/23 11:08:00 INFO Utils: Fetching 
spark://10.246.79.20:57945/jars/org.apache.kafka_kafka-clients-0.10.2.0.jar to 
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp2480019594959116903.tmp
18/03/23 11:08:00 INFO Executor: Adding 
file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.kafka_kafka-clients-0.10.2.0.jar
 to class loader
18/03/23 11:08:00 INFO Executor: Fetching 
spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
 with timestamp 1521803266546
18/03/23 11:08:00 INFO Utils: Fetching 
spark://10.246.79.20:57945/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
 to 
/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/fetchFileTemp8966057625455886086.tmp
18/03/23 11:08:00 INFO Executor: Adding 
file:/tmp/spark-e2d1b965-a3dc-4847-82ce-b09b8f7e2b76/userFiles-f7e87ef1-3a54-4f3f-8ee2-fa10278d0227/org.apache.spark_spark-sql-kafka-0-10_2.11-2.2.0.jar
 to class loader
18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        bootstrap.servers = [10.117.172.48:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

18/03/23 11:08:01 INFO ConsumerConfig: ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        bootstrap.servers = [10.117.172.48:9092]
        check.crcs = true
        client.id = consumer-2
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = 
spark-kafka-source-a46c9dc7-36fc-40d3-a5c3-28d566108ab3--149645343-executor
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:08:01 INFO KafkaSourceRDD: Beginning offset 4647070 is the same as 
ending offset skipping source 0
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 24.246571 ms
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 14.032234 ms
18/03/23 11:08:01 INFO RRunner: Times: boot = 0.718 s, init = 0.009 s, 
broadcast = 0.000 s, read-input = 0.000 s, compute = 0.000 s, write-output = 
0.000 s, total = 0.727 s
18/03/23 11:08:01 INFO CodeGenerator: Code generated in 13.304316 ms
18/03/23 11:08:01 INFO ProducerConfig: ProducerConfig values: 
        acks = 1
        batch.size = 16384
        block.on.buffer.full = false
        bootstrap.servers = [10.117.172.48:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        interceptor.classes = null
        key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.fetch.timeout.ms = 60000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        timeout.ms = 30000
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

18/03/23 11:08:01 INFO AppInfoParser: Kafka version : 0.10.2.0
18/03/23 11:08:01 INFO AppInfoParser: Kafka commitId : 576d93a8dc0cf421
18/03/23 11:08:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1154 
bytes result sent to driver
18/03/23 11:08:01 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 1096 ms on localhost (executor driver) (1/1)
18/03/23 11:08:01 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
18/03/23 11:08:01 INFO DAGScheduler: ResultStage 0 (start at 
NativeMethodAccessorImpl.java:0) finished in 1.128 s
18/03/23 11:08:01 INFO DAGScheduler: Job 0 finished: start at 
NativeMethodAccessorImpl.java:0, took 1.335817 s
18/03/23 11:08:01 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:07:59.157Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1754,
    "getBatch" : 115,
    "getOffset" : 715,
    "queryPlanning" : 135,
    "triggerExecution" : 2782,
    "walCommit" : 43
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : null,
    "endOffset" : {
      "source" : {
        "0" : 4647070
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:02 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:02.197Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 15,
    "triggerExecution" : 16
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647070
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647070
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:04 INFO StreamExecution: Committed offsets for batch 1. Metadata 
OffsetSeqMetadata(0,1521803284620,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:04 INFO KafkaSource: GetBatch called with start = 
Some({"source":{"0":4647070}}), end = {"source":{"0":4647071}}
18/03/23 11:08:04 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:04 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(source-0,4647070,4647071,None)
18/03/23 11:08:04 INFO SparkContext: Starting job: start at 
NativeMethodAccessorImpl.java:0
18/03/23 11:08:04 INFO DAGScheduler: Got job 1 (start at 
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:04 INFO DAGScheduler: Final stage: ResultStage 1 (start at 
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:04 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:04 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:04 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[13] at start at NativeMethodAccessorImpl.java:0), which has 
no missing parents
18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:04 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:1006
18/03/23 11:08:04 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 1 (MapPartitionsRDD[13] at start at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:04 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/03/23 11:08:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:04 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
18/03/23 11:08:04 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running 
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are 
interrupted because of KAFKA-1894
18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:04 INFO BufferedStreamThread: 
18/03/23 11:08:04 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:04 INFO BufferedStreamThread: 
18/03/23 11:08:04 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:04 INFO BufferedStreamThread: 
18/03/23 11:08:04 INFO BufferedStreamThread:     histogram
18/03/23 11:08:04 INFO BufferedStreamThread: 
18/03/23 11:08:04 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:07 INFO BufferedStreamThread:    user  system elapsed 
18/03/23 11:08:07 INFO BufferedStreamThread:   2.038   0.074   2.114 
18/03/23 11:08:07 INFO RRunner: Times: boot = 0.011 s, init = 0.088 s, 
broadcast = 0.000 s, read-input = 0.001 s, compute = 2.176 s, write-output = 
0.001 s, total = 2.277 s
18/03/23 11:08:07 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1154 
bytes result sent to driver
18/03/23 11:08:07 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) 
in 2380 ms on localhost (executor driver) (1/1)
18/03/23 11:08:07 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have 
all completed, from pool 
18/03/23 11:08:07 INFO DAGScheduler: ResultStage 1 (start at 
NativeMethodAccessorImpl.java:0) finished in 2.381 s
18/03/23 11:08:07 INFO DAGScheduler: Job 1 finished: start at 
NativeMethodAccessorImpl.java:0, took 2.395044 s
18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:04.604Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 37.03703703703704,
  "processedRowsPerSecond" : 0.3947887879984209,
  "durationMs" : {
    "addBatch" : 2422,
    "getBatch" : 9,
    "getOffset" : 15,
    "queryPlanning" : 8,
    "triggerExecution" : 2533,
    "walCommit" : 75
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647070
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647071
      }
    },
    "numInputRows" : 1,
    "inputRowsPerSecond" : 37.03703703703704,
    "processedRowsPerSecond" : 0.3947887879984209
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:07.183Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 17,
    "triggerExecution" : 17
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647071
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647071
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:10 INFO StreamExecution: Committed offsets for batch 2. Metadata 
OffsetSeqMetadata(0,1521803290685,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:10 INFO KafkaSource: GetBatch called with start = 
Some({"source":{"0":4647071}}), end = {"source":{"0":4647072}}
18/03/23 11:08:10 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:10 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(source-0,4647071,4647072,None)
18/03/23 11:08:10 INFO SparkContext: Starting job: start at 
NativeMethodAccessorImpl.java:0
18/03/23 11:08:10 INFO DAGScheduler: Got job 2 (start at 
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:10 INFO DAGScheduler: Final stage: ResultStage 2 (start at 
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:10 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:10 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:10 INFO DAGScheduler: Submitting ResultStage 2 
(MapPartitionsRDD[20] at start at NativeMethodAccessorImpl.java:0), which has 
no missing parents
18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 16.9 KB, free 366.2 MB)
18/03/23 11:08:10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 7.8 KB, free 366.2 MB)
18/03/23 11:08:10 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:10 INFO SparkContext: Created broadcast 2 from broadcast at 
DAGScheduler.scala:1006
18/03/23 11:08:10 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 2 (MapPartitionsRDD[20] at start at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/03/23 11:08:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/03/23 11:08:10 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running 
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are 
interrupted because of KAFKA-1894
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread:     histogram
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread:    user  system elapsed 
18/03/23 11:08:12 INFO BufferedStreamThread:   1.937   0.062   1.999 
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, 
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 
0.001 s, total = 2.093 s
18/03/23 11:08:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1154 
bytes result sent to driver
18/03/23 11:08:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) 
in 2122 ms on localhost (executor driver) (1/1)
18/03/23 11:08:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have 
all completed, from pool 
18/03/23 11:08:12 INFO DAGScheduler: ResultStage 2 (start at 
NativeMethodAccessorImpl.java:0) finished in 2.122 s
18/03/23 11:08:12 INFO DAGScheduler: Job 2 finished: start at 
NativeMethodAccessorImpl.java:0, took 2.137442 s
18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:10.660Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 27.027027027027028,
  "processedRowsPerSecond" : 0.4452359750667854,
  "durationMs" : {
    "addBatch" : 2170,
    "getBatch" : 9,
    "getOffset" : 24,
    "queryPlanning" : 9,
    "triggerExecution" : 2246,
    "walCommit" : 29
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647071
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647072
      }
    },
    "numInputRows" : 1,
    "inputRowsPerSecond" : 27.027027027027028,
    "processedRowsPerSecond" : 0.4452359750667854
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:12 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:12.942Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 12,
    "triggerExecution" : 12
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647072
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647072
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:14 INFO StreamExecution: Committed offsets for batch 3. Metadata 
OffsetSeqMetadata(0,1521803294496,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:14 INFO KafkaSource: GetBatch called with start = 
Some({"source":{"0":4647072}}), end = {"source":{"0":4647073}}
18/03/23 11:08:14 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(source-0,4647072,4647073,None)
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
10.246.79.20:46249 in memory (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO SparkContext: Starting job: start at 
NativeMethodAccessorImpl.java:0
18/03/23 11:08:14 INFO DAGScheduler: Got job 3 (start at 
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:14 INFO DAGScheduler: Final stage: ResultStage 3 (start at 
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:14 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:14 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:14 INFO DAGScheduler: Submitting ResultStage 3 
(MapPartitionsRDD[27] at start at NativeMethodAccessorImpl.java:0), which has 
no missing parents
18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:14 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:14 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:14 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:1006
18/03/23 11:08:14 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 3 (MapPartitionsRDD[27] at start at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:14 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
18/03/23 11:08:14 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:14 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
18/03/23 11:08:14 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running 
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are 
interrupted because of KAFKA-1894
18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:14 INFO BufferedStreamThread: 
18/03/23 11:08:14 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:14 INFO BufferedStreamThread: 
18/03/23 11:08:14 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:14 INFO BufferedStreamThread: 
18/03/23 11:08:14 INFO BufferedStreamThread:     histogram
18/03/23 11:08:14 INFO BufferedStreamThread: 
18/03/23 11:08:14 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:16 INFO BufferedStreamThread:    user  system elapsed 
18/03/23 11:08:16 INFO BufferedStreamThread:   1.916   0.068   1.987 
18/03/23 11:08:16 INFO RRunner: Times: boot = 0.009 s, init = 0.019 s, 
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.054 s, write-output = 
0.001 s, total = 2.085 s
18/03/23 11:08:16 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1154 
bytes result sent to driver
18/03/23 11:08:16 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) 
in 2109 ms on localhost (executor driver) (1/1)
18/03/23 11:08:16 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have 
all completed, from pool 
18/03/23 11:08:16 INFO DAGScheduler: ResultStage 3 (start at 
NativeMethodAccessorImpl.java:0) finished in 2.110 s
18/03/23 11:08:16 INFO DAGScheduler: Job 3 finished: start at 
NativeMethodAccessorImpl.java:0, took 2.123274 s
18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:14.485Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 41.666666666666664,
  "processedRowsPerSecond" : 0.44622936189201245,
  "durationMs" : {
    "addBatch" : 2149,
    "getBatch" : 13,
    "getOffset" : 11,
    "queryPlanning" : 9,
    "triggerExecution" : 2241,
    "walCommit" : 54
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647072
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647073
      }
    },
    "numInputRows" : 1,
    "inputRowsPerSecond" : 41.666666666666664,
    "processedRowsPerSecond" : 0.44622936189201245
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:16 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:16.758Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 16,
    "triggerExecution" : 16
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647073
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647073
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:19 INFO StreamExecution: Committed offsets for batch 4. Metadata 
OffsetSeqMetadata(0,1521803299499,Map(spark.sql.shuffle.partitions -> 200))
18/03/23 11:08:19 INFO KafkaSource: GetBatch called with start = 
Some({"source":{"0":4647073}}), end = {"source":{"0":4647074}}
18/03/23 11:08:19 INFO KafkaSource: Partitions added: Map()
18/03/23 11:08:19 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(source-0,4647073,4647074,None)
18/03/23 11:08:19 INFO SparkContext: Starting job: start at 
NativeMethodAccessorImpl.java:0
18/03/23 11:08:19 INFO DAGScheduler: Got job 4 (start at 
NativeMethodAccessorImpl.java:0) with 1 output partitions
18/03/23 11:08:19 INFO DAGScheduler: Final stage: ResultStage 4 (start at 
NativeMethodAccessorImpl.java:0)
18/03/23 11:08:19 INFO DAGScheduler: Parents of final stage: List()
18/03/23 11:08:19 INFO DAGScheduler: Missing parents: List()
18/03/23 11:08:19 INFO DAGScheduler: Submitting ResultStage 4 
(MapPartitionsRDD[34] at start at NativeMethodAccessorImpl.java:0), which has 
no missing parents
18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4 stored as values in 
memory (estimated size 16.9 KB, free 366.3 MB)
18/03/23 11:08:19 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in 
memory (estimated size 7.8 KB, free 366.3 MB)
18/03/23 11:08:19 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 
10.246.79.20:46249 (size: 7.8 KB, free: 366.3 MB)
18/03/23 11:08:19 INFO SparkContext: Created broadcast 4 from broadcast at 
DAGScheduler.scala:1006
18/03/23 11:08:19 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 4 (MapPartitionsRDD[34] at start at 
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
18/03/23 11:08:19 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
18/03/23 11:08:19 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 4, 
localhost, executor driver, partition 0, PROCESS_LOCAL, 5001 bytes)
18/03/23 11:08:19 INFO Executor: Running task 0.0 in stage 4.0 (TID 4)
18/03/23 11:08:19 WARN CachedKafkaConsumer: CachedKafkaConsumer is not running 
in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are 
interrupted because of KAFKA-1894
18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:19 INFO BufferedStreamThread: 
18/03/23 11:08:19 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:19 INFO BufferedStreamThread: 
18/03/23 11:08:19 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:19 INFO BufferedStreamThread: 
18/03/23 11:08:19 INFO BufferedStreamThread:     histogram
18/03/23 11:08:19 INFO BufferedStreamThread: 
18/03/23 11:08:19 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:21 INFO BufferedStreamThread:    user  system elapsed 
18/03/23 11:08:21 INFO BufferedStreamThread:   1.931   0.071   2.002 
18/03/23 11:08:21 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, 
broadcast = 0.000 s, read-input = 0.001 s, compute = 2.069 s, write-output = 
0.002 s, total = 2.098 s
18/03/23 11:08:21 INFO Executor: Finished task 0.0 in stage 4.0 (TID 4). 1154 
bytes result sent to driver
18/03/23 11:08:21 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 4) 
in 2122 ms on localhost (executor driver) (1/1)
18/03/23 11:08:21 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have 
all completed, from pool 
18/03/23 11:08:21 INFO DAGScheduler: ResultStage 4 (start at 
NativeMethodAccessorImpl.java:0) finished in 2.124 s
18/03/23 11:08:21 INFO DAGScheduler: Job 4 finished: start at 
NativeMethodAccessorImpl.java:0, took 2.135279 s
18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:19.488Z",
  "numInputRows" : 1,
  "inputRowsPerSecond" : 40.0,
  "processedRowsPerSecond" : 0.45004500450045004,
  "durationMs" : {
    "addBatch" : 2160,
    "getBatch" : 8,
    "getOffset" : 11,
    "queryPlanning" : 8,
    "triggerExecution" : 2222,
    "walCommit" : 32
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647073
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647074
      }
    },
    "numInputRows" : 1,
    "inputRowsPerSecond" : 40.0,
    "processedRowsPerSecond" : 0.45004500450045004
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
18/03/23 11:08:21 INFO StreamExecution: Streaming query made progress: {
  "id" : "1605966f-51e3-4df9-b284-7535e37b6d44",
  "runId" : "ab605bec-3d57-4a61-a45b-5174292ab2ec",
  "name" : null,
  "timestamp" : "2018-03-23T11:08:21.747Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 12,
    "triggerExecution" : 12
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[source]]",
    "startOffset" : {
      "source" : {
        "0" : 4647074
      }
    },
    "endOffset" : {
      "source" : {
        "0" : 4647074
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "KafkaSink"
  }
}
^C
Execution halted
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to