[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext
[ https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venki Korukanti updated SPARK-38679: Description: Add a new api to expose total partition count in the stage belonging to the task in TaskContext, so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. {code:java} rdd.mapPartitions { rowsIter => val partitionId = TaskContext.get().partitionId() val numPartitions = TaskContext.get().numPartitions() var i = 0 rowsIter.map { row => val rowId = partitionId + i * numPartitions i += 1 (rowId, row) } }{code} was: Add a new api to expose total partition count in the stage belonging to the task in TaskContext. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. {code:java} rdd.mapPartitions { rowsIter => val partitionId = TaskContext.get().partitionId() val numPartitions = TaskContext.get().numPartitions() var i = 0 rowsIter.map { row => val rowId = partitionId + i * numPartitions i += 1 (rowId, row) } }{code} > Expose the number partitions in a stage to TaskContext > -- > > Key: SPARK-38679 > URL: https://issues.apache.org/jira/browse/SPARK-38679 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.1 >Reporter: Venki Korukanti >Priority: Major > > Add a new api to expose total partition count in the stage belonging to the > task in TaskContext, so that the task knows what fraction of the computation > is doing. > With this extra information, users can also generate 32bit unique int ids as > below rather than using `monotonically_increasing_id` which generates 64bit > long ids. > > {code:java} > rdd.mapPartitions { rowsIter => > val partitionId = TaskContext.get().partitionId() > val numPartitions = TaskContext.get().numPartitions() > var i = 0 > rowsIter.map { row => > val rowId = partitionId + i * numPartitions > i += 1 > (rowId, row) > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext
[ https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venki Korukanti updated SPARK-38679: Description: Add a new api to expose total partition count in the stage belonging to the task in TaskContext. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. {code:java} rdd.mapPartitions { rowsIter => val partitionId = TaskContext.get().partitionId() val numPartitions = TaskContext.get().numPartitions() var i = 0 rowsIter.map { row => val rowId = partitionId + i * numPartitions i += 1 (rowId, row) } }{code} was: Add a new api to expose total partition count in the stage belonging to the task in TaskContext. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. ``` {{ rdd.mapPartitions { rowsIter =>}} {{ val partitionId = TaskContext.get().partitionId()}} {{ val numPartitions = TaskContext.get().numPartitions()}} {{ var i = 0}} {{ rowsIter.map { row =>}} {{ val rowId = partitionId + i * numPartitions}} {{ i += 1}} {{ (rowId, row)}} } } ``` > Expose the number partitions in a stage to TaskContext > -- > > Key: SPARK-38679 > URL: https://issues.apache.org/jira/browse/SPARK-38679 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.1 >Reporter: Venki Korukanti >Priority: Major > > Add a new api to expose total partition count in the stage belonging to the > task in TaskContext. so that the task knows what fraction of the computation > is doing. > With this extra information, users can also generate 32bit unique int ids as > below rather than using `monotonically_increasing_id` which generates 64bit > long ids. > > {code:java} > rdd.mapPartitions { rowsIter => > val partitionId = TaskContext.get().partitionId() > val numPartitions = TaskContext.get().numPartitions() > var i = 0 > rowsIter.map { row => > val rowId = partitionId + i * numPartitions > i += 1 > (rowId, row) > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38679) Expose the number partitions in a stage to TaskContext
[ https://issues.apache.org/jira/browse/SPARK-38679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venki Korukanti updated SPARK-38679: Description: Add a new api to expose total partition count in the stage belonging to the task in TaskContext. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. ``` {{ rdd.mapPartitions { rowsIter =>}} {{ val partitionId = TaskContext.get().partitionId()}} {{ val numPartitions = TaskContext.get().numPartitions()}} {{ var i = 0}} {{ rowsIter.map { row =>}} {{ val rowId = partitionId + i * numPartitions}} {{ i += 1}} {{ (rowId, row)}} } } ``` was: Add a new api to expose total partition count in a task. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. {{ rdd.mapPartitions { rowsIter =>}} {{ val partitionId = TaskContext.get().partitionId()}} {{ val numPartitions = TaskContext.get().numPartitions()}} {{ var i = 0}} {{ rowsIter.map { row =>}} {{ val rowId = partitionId + i * numPartitions}} {{ i += 1}} {{ (rowId, row)}} {{ }}} {{ }}} > Expose the number partitions in a stage to TaskContext > -- > > Key: SPARK-38679 > URL: https://issues.apache.org/jira/browse/SPARK-38679 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.1 >Reporter: Venki Korukanti >Priority: Major > > Add a new api to expose total partition count in the stage belonging to the > task in TaskContext. so that the task knows what fraction of the computation > is doing. > With this extra information, users can also generate 32bit unique int ids as > below rather than using `monotonically_increasing_id` which generates 64bit > long ids. > > ``` > {{ rdd.mapPartitions { rowsIter =>}} > {{ val partitionId = TaskContext.get().partitionId()}} > {{ val numPartitions = TaskContext.get().numPartitions()}} > {{ var i = 0}} > {{ rowsIter.map { row =>}} > {{ val rowId = partitionId + i * numPartitions}} > {{ i += 1}} > {{ (rowId, row)}} > } > } > ``` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38679) Expose the number partitions in a stage to TaskContext
Venki Korukanti created SPARK-38679: --- Summary: Expose the number partitions in a stage to TaskContext Key: SPARK-38679 URL: https://issues.apache.org/jira/browse/SPARK-38679 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.2.1 Reporter: Venki Korukanti Add a new api to expose total partition count in a task. so that the task knows what fraction of the computation is doing. With this extra information, users can also generate 32bit unique int ids as below rather than using `monotonically_increasing_id` which generates 64bit long ids. {{ rdd.mapPartitions { rowsIter =>}} {{ val partitionId = TaskContext.get().partitionId()}} {{ val numPartitions = TaskContext.get().numPartitions()}} {{ var i = 0}} {{ rowsIter.map { row =>}} {{ val rowId = partitionId + i * numPartitions}} {{ i += 1}} {{ (rowId, row)}} {{ }}} {{ }}} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36236) RocksDB state store: Add additional metrics for better observability into state store operations
Venki Korukanti created SPARK-36236: --- Summary: RocksDB state store: Add additional metrics for better observability into state store operations Key: SPARK-36236 URL: https://issues.apache.org/jira/browse/SPARK-36236 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Venki Korukanti Proposing adding following new metrics to {{customMetrics}} under the {{stateOperators}} in {{StreamingQueryProgress}} event These metrics help have better visibility into the RocksDB based state store in streaming jobs. * {{rocksdbGetCount}} number of get calls to the DB (doesn’t include Gets from WriteBatch - in memory batch used for staging writes) * {{rocksdbPutCount}} number of put calls to the DB (doesn’t include Puts to WriteBatch - in memory batch used for staging writes) * {{rocksdbTotalBytesReadByGet/rocksdbTotalBytesWrittenByPut}}: Number of uncompressed bytes read/written by get/put operations * {{rocksdbReadBlockCacheHitCount/rocksdbReadBlockCacheMissCount}} indicates how much of the block cache in RocksDB is useful or not and avoiding local disk reads * {{rocksdbTotalBytesReadByCompaction/rocksdbTotalBytesWrittenByCompaction}}: How many bytes the compaction process read from disk and written to disk. * {{rocksdbTotalCompactionTime}}: Time (in ns) took for compactions (both background and the optional compaction initiated during the commit) * {{rocksdbWriterStallDuration}} Time (in ns) the writer has stalled due to a background compaction or flushing of the immutable memtables to disk. * {{rocksdbTotalBytesReadThroughIterator}} Some of the stateful operations (such as timeout processing in FlatMapGroupsWithState and watermarking) requires reading entire data in DB through iterator. This metric tells the total size of uncompressed data read using the iterator. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35896) [SS] Include more granular metrics for stateful operators in StreamingQueryProgress
Venki Korukanti created SPARK-35896: --- Summary: [SS] Include more granular metrics for stateful operators in StreamingQueryProgress Key: SPARK-35896 URL: https://issues.apache.org/jira/browse/SPARK-35896 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Venki Korukanti Currently the streaming progress is missing a few important stateful operator metrics in {{StateOperatorProgress}}. Each stateful operator consists of multiple steps. Ex: {{flatMapGroupsWithState}} has two major steps: 1) processing the input and 2) timeout processing to remove entries from the state which have expired. The main motivation is to track down the time it took for each individual step (such as timeout processing, watermark processing etc) and how much data is processed to pinpoint the bottlenecks and compare for reasoning why some microbatches are slow compared to others in the same job. Below are the final metrics common to all stateful operators (the one in _*bold-italic*_ are proposed new). These metrics are in {{StateOperatorProgress}} which is part of {{StreamingQueryProgress}}. * _*operatorName*_ - State operator name. Can help us identify any operator specific slowness and state store usage patterns. Ex. "dedupe" (derived using {{StateStoreWriter.shortName}}) * _numRowsTotal_ - number of rows in the state store across all tasks in a stage where the operator has executed. * _numRowsUpdated_ - number of rows added to or update in the store * _*allUpdatesTimeMs*_ - time taken to add new rows or update existing state store rows across all tasks in a stage where the operator has executed. * _*numRowsRemoved*_ - number of rows deleted from state store as part of the state cleanup mechanism across all tasks in a stage where the operator has executed. This number helps measure the state store deletions and impact on checkpoint commit and other latencies. * _*allRemovalsTimeMs*_ - time taken to remove the rows from the state store as part of state (also includes the iterating through the entire state store to find which rows to delete) across all tasks in a stage where the operator has executed. If we see jobs spending significant time here, it may justify a better layout in the state store to read only the required rows than the entire state store that is read currently. * _*commitTimeMs*_ - time taken to commit the state store changes to external storage for checkpointing. This is cumulative across all tasks in a stage where this operator has executed. * _*numShufflePartitions*_ - number of shuffle partitions this state operator is part of. Currently the metrics like times are aggregated across all tasks in a stage where the operator has executed. Having the number shuffle partitions (corresponds to number of tasks) helps us find the average task contribution to the metric. * _*numStateStores*_ - number of state stores in the operator across all tasks in the stage. Some stateful operators have more than one state store (eg. stream-stream join). Tracking this number helps us find correlations between state stores instances and microbatch latency. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35880) [SS] Track the number of duplicates dropped in streaming dedupe operator
Venki Korukanti created SPARK-35880: --- Summary: [SS] Track the number of duplicates dropped in streaming dedupe operator Key: SPARK-35880 URL: https://issues.apache.org/jira/browse/SPARK-35880 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Venki Korukanti Currently there is no way to find how many duplicates in the input are dropped. Having this metric will help track down incorrect results issues. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35799) Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec
[ https://issues.apache.org/jira/browse/SPARK-35799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venki Korukanti updated SPARK-35799: Description: Metric {{allUpdatesTimeMs}} meant to capture the start to end walltime of the operator {{FlatMapGroupsWithStateExec}}, but currently it just [captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121] the iterator creation time. Fix it to measure similar to how other stateful operators measure. Example one [here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406]. This measurement is not perfect due to the nature of the lazy iterator and also includes the time the consumer operator spent in processing the current operator output, but it should give a good signal when comparing the metric in one microbatch to the metric in another microbatch. was: Metric `allUpdatesTimeMs` meant to capture the start to end walltime of the operator `FlatMapGroupsWithStateExec`, but currently it just [captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121] the iterator creation time. Fix it to measure similar to how other stateful operators measure. Example one [here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406]. This measurement is not perfect due to the nature of the lazy iterator and also includes the time the consumer operator spent in processing the current operator output, but it should give a good signal when comparing the metric in one microbatch to the metric in another microbatch. > Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec > --- > > Key: SPARK-35799 > URL: https://issues.apache.org/jira/browse/SPARK-35799 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.2 >Reporter: Venki Korukanti >Priority: Minor > > Metric {{allUpdatesTimeMs}} meant to capture the start to end walltime of the > operator {{FlatMapGroupsWithStateExec}}, but currently it just > [captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121] > the iterator creation time. > Fix it to measure similar to how other stateful operators measure. Example > one > [here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406]. > This measurement is not perfect due to the nature of the lazy iterator and > also includes the time the consumer operator spent in processing the current > operator output, but it should give a good signal when comparing the metric > in one microbatch to the metric in another microbatch. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35799) Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec
Venki Korukanti created SPARK-35799: --- Summary: Fix the allUpdatesTimeMs metric measuring in FlatMapGroupsWithStateExec Key: SPARK-35799 URL: https://issues.apache.org/jira/browse/SPARK-35799 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Venki Korukanti Metric `allUpdatesTimeMs` meant to capture the start to end walltime of the operator `FlatMapGroupsWithStateExec`, but currently it just [captures|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L121] the iterator creation time. Fix it to measure similar to how other stateful operators measure. Example one [here|https://github.com/apache/spark/blob/79362c4efcb6bd4b575438330a14a6191cca5e4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L406]. This measurement is not perfect due to the nature of the lazy iterator and also includes the time the consumer operator spent in processing the current operator output, but it should give a good signal when comparing the metric in one microbatch to the metric in another microbatch. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35763) Minor refactor of StateStoreCustomMetric
Venki Korukanti created SPARK-35763: --- Summary: Minor refactor of StateStoreCustomMetric Key: SPARK-35763 URL: https://issues.apache.org/jira/browse/SPARK-35763 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Venki Korukanti Code in {{[SymmetricHashJoinStateManager|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L321]}} relies on the subclass implementations of [StateStoreCustomMetric|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L187]. If a new subclass of {{StateStoreCustomMetric}} is added, it requires code changes to {{SymmetricHashJoinStateManager}} and we may miss the update if there is no existing test coverage. To fix this, add new method to trait {{withNewDesc(desc : String)}} to {{StateStoreCustomMetric}} and use it in {{SymmetricHashJoinStateManager}} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org