[ 
https://issues.apache.org/jira/browse/SPARK-23288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23288:
-----------------------------------
    Component/s: SQL

> Incorrect number of written records in structured streaming
> -----------------------------------------------------------
>
>                 Key: SPARK-23288
>                 URL: https://issues.apache.org/jira/browse/SPARK-23288
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Metrics, metrics
>
> I'm using SparkListener.onTaskEnd() to capture input and output metrics but 
> it seems that number of written records 
> ('taskEnd.taskMetrics().outputMetrics().recordsWritten()') is incorrect. Here 
> is my stream construction:
>  
> {code:java}
> StreamingQuery writeStream = session
>         .readStream()
>         .schema(RecordSchema.fromClass(TestRecord.class))
>         .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>         .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>         .csv(inputFolder.getRoot().toPath().toString())
>         .as(Encoders.bean(TestRecord.class))
>         .flatMap(
>             ((FlatMapFunction<TestRecord, TestVendingRecord>) (u) -> {
>                 List<TestVendingRecord> resultIterable = new ArrayList<>();
>                 try {
>                     TestVendingRecord result = transformer.convert(u);
>                     resultIterable.add(result);
>                 } catch (Throwable t) {
>                     System.err.println("Ooops");
>                     t.printStackTrace();
>                 }
>                 return resultIterable.iterator();
>             }),
>             Encoders.bean(TestVendingRecord.class))
>         .writeStream()
>         .outputMode(OutputMode.Append())
>         .format("parquet")
>         .option("path", outputFolder.getRoot().toPath().toString())
>         .option("checkpointLocation", 
> checkpointFolder.getRoot().toPath().toString())
>         .start();
>     writeStream.processAllAvailable();
>     writeStream.stop();
> {code}
> Tested it with one good and one bad (throwing exception in 
> transformer.convert(u)) input records and it produces following metrics:
>  
> {code:java}
> (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS
> (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0
> (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2
> (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables():
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value =  323
> (TestMain.java:onTaskEnd(84)) - name = number of output rows
> (TestMain.java:onTaskEnd(85)) - value =  2
> (TestMain.java:onTaskEnd(84)) - name = duration total (min, med, max)
> (TestMain.java:onTaskEnd(85)) - value =  364
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.recordsRead
> (TestMain.java:onTaskEnd(85)) - value =  2
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.input.bytesRead
> (TestMain.java:onTaskEnd(85)) - value =  157
> (TestMain.java:onTaskEnd(84)) - name = 
> internal.metrics.resultSerializationTime
> (TestMain.java:onTaskEnd(85)) - value =  3
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.resultSize
> (TestMain.java:onTaskEnd(85)) - value =  2396
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorCpuTime
> (TestMain.java:onTaskEnd(85)) - value =  633807000
> (TestMain.java:onTaskEnd(84)) - name = internal.metrics.executorRunTime
> (TestMain.java:onTaskEnd(85)) - value =  683
> (TestMain.java:onTaskEnd(84)) - name = 
> internal.metrics.executorDeserializeCpuTime
> (TestMain.java:onTaskEnd(85)) - value =  55662000
> (TestMain.java:onTaskEnd(84)) - name = 
> internal.metrics.executorDeserializeTime
> (TestMain.java:onTaskEnd(85)) - value =  58
> (TestMain.java:onTaskEnd(89)) - input records 2
> Streaming query made progress: {
>   "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5",
>   "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7",
>   "name" : null,
>   "timestamp" : "2018-01-26T14:44:05.362Z",
>   "numInputRows" : 2,
>   "processedRowsPerSecond" : 0.8163265306122448,
>   "durationMs" : {
>     "addBatch" : 1994,
>     "getBatch" : 126,
>     "getOffset" : 52,
>     "queryPlanning" : 220,
>     "triggerExecution" : 2450,
>     "walCommit" : 41
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : 
> "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3661035412295337071]",
>     "startOffset" : null,
>     "endOffset" : {
>       "logOffset" : 0
>     },
>     "numInputRows" : 2,
>     "processedRowsPerSecond" : 0.8163265306122448
>   } ],
>   "sink" : {
>     "description" : 
> "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hllyb5_/T/junit3785605384928624065]"
>   }
> }
> {code}
> The number of inputs is correct but the number of output records taken from 
> taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Accumulables 
> (taskEnd.taskInfo().accumulables()) don't have a correct value as well - 
> should be 1 but it shows 2 'number of output rows'.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to